Skip to content

Manual - Agg

aggregate.agg¤

AggAPIJSON ¤

Generate clean and API usable json data

AggAPIJSON takes in a dataframe as raw data, cleans it up, and convert it to dictionary that is suitable for JSON format.

Parameters:

Name Type Description Default
fields Optional[Dict[str, str]]

a dictionary that maps the original columns to the desired keys in the result

None
Source code in sm_trendy/aggregate/agg.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class AggAPIJSON:
    """
    Generate clean and API usable json data

    AggAPIJSON takes in a dataframe as raw data,
    cleans it up, and convert it to dictionary
    that is suitable for JSON format.

    :param fields: a dictionary that maps the original
        columns to the desired keys in the result
    """

    def __init__(self, fields: Optional[Dict[str, str]] = None):
        if fields is None:
            fields = {
                "query": "query",
                "extracted_value": "value",
                "date": "date",
            }
        self.fields = fields
        self.keep_columns = list(fields.keys())

    def __call__(
        self, dataframe: pd.DataFrame, sort_by: Optional[str] = None
    ) -> List[Dict]:
        df = dataframe.copy()
        df = df[self.keep_columns]
        df.rename(columns=self.fields, inplace=True)
        records = df.to_dict(orient="records")

        if sort_by is not None:
            records = sorted(records, key=lambda x: x[sort_by])

        return records

AggSerpAPIBundle ¤

Aggregate all data from a whole serpapi config file

In this class, we follow the following flow

  1. Recreate the SerpAPIConfigBundle
  2. Loop through each config in SerpAPIConfigBundle a. retrieve the data from each of the config b. convert the selected data to json c. save it to a new folder with the same path pattern.
Source code in sm_trendy/aggregate/agg.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class AggSerpAPIBundle:
    """
    Aggregate all data from a whole serpapi config file

    In this class, we follow the following flow

    1. Recreate the SerpAPIConfigBundle
    2. Loop through each config in SerpAPIConfigBundle
        a. retrieve the data from each of the config
        b. convert the selected data to json
        c. save it to a new folder with the same path pattern.
    """

    def __init__(self, parent_path: AnyPath):
        self.parent_path = parent_path

    def __call__(self, serpapi_config_path: AnyPath):
        # ReCreate the bundle config for SerpAPI
        scb = SerpAPIConfigBundle(file_path=serpapi_config_path, serpapi_key="")
        scb_parent_folder = scb.global_config["path"]["parent_folder"]

        # Instantiate the data loader
        dll_k = DownloadedLoader(parent_folder=scb_parent_folder, from_format="csv")

        # Loop through the serpapi configs
        logger.info(f"  Looping through {len(scb)} configs")
        for c in scb:
            logger.debug(f"  Aggregating {c}")
            # Path of the raw downloaded data
            c_path = c.path_params.path(parent_folder=scb_parent_folder)
            # Raw dataframe
            c_df = dll_k(c.path_params)

            # aggregate
            c_agg_json = AggAPIJSON()
            c_records = c_agg_json(dataframe=c_df, sort_by="date")

            # save snapshot
            c_snapshot_date = dll_k._latest_snapshots(c_path / "format=csv")
            c_k_target_path = c.path_params.path(parent_folder=self.parent_path)
            logger.debug(
                f"Saving data to {c_k_target_path} with snapshot {c_snapshot_date}..."
            )
            self._store_json(
                snapshot_date=datetime.date.fromisoformat(c_snapshot_date),
                target_folder=c_k_target_path,
                records=c_records,
            )

            # save a copy as latest
            logger.debug(f"Saving data to {c_k_target_path} with snapshot latest ...")
            self._store_json(
                snapshot_date="latest", target_folder=c_k_target_path, records=c_records
            )

    def _store_json(
        self,
        snapshot_date: Union[datetime.date, Literal["latest"]],
        target_folder: AnyPath,
        records: List[Dict],
    ) -> None:
        """
        Save the `records` as json files inside the folder `target_folder`

        :param snapshot_date: a specific snapshot date to use used as a folder name
        :param target_folder: where to save the data
        :param records: data records to be saved
        """
        store_json = StoreJSON(
            target_folder=target_folder,
            snapshot_date=snapshot_date,
        )
        store_json.save(records=records, formats=["json"])

DownloadedLoader ¤

Load downloaded data as parquet

Warning

Currently, this class only downloads the latest snapshot.

!!! todo: Allow downloading any snapshot(s)

Parameters:

Name Type Description Default
parent_folder AnyPath

parent folder of the downloaded dataset

required
from_format Optional[Literal['csv', 'parquet']]

which format to load the data from

'csv'
Source code in sm_trendy/aggregate/agg.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class DownloadedLoader:
    """
    Load downloaded data as parquet

    !!! warning
        Currently, this class only downloads the latest snapshot.

    !!! todo:
        Allow downloading any snapshot(s)

    :param parent_folder: parent folder of the downloaded dataset
    :param from_format: which format to load the data from
    """

    def __init__(
        self,
        parent_folder: AnyPath,
        from_format: Optional[Literal["csv", "parquet"]] = "csv",
    ):
        self.from_format = from_format
        self.parent_folder = parent_folder

    def _data_path(self, path_params: PathParams) -> AnyPath:
        """
        build the full dataset path

        :param path_params: PathParams to calculate the path patterns
        """
        data_folder = path_params.path(parent_folder=self.parent_folder)

        if self.from_format == "csv":
            format_path = data_folder / f"format={self.from_format}"
            latest_snapshot = self._latest_snapshots(format_path)
            path = (
                format_path
                / f"snapshot_date={latest_snapshot}"
                / f"data.{self.from_format}"
            )
        else:
            raise Exception(f"Not yet supported: reading from {self.from_format}")

        return path

    def _load_as_dataframe(self, data_path: AnyPath) -> pd.DataFrame:
        """
        Load the data file as pandas dataframe

        :param data_path: path to the data file
        """
        if self.from_format == "csv":
            df = pd.read_csv(data_path)
        else:
            raise Exception(f"Not yet supported: reading from {self.from_format}")

        return df

    def __call__(self, path_params: PathParams) -> pd.DataFrame:
        """
        load the data specified in a PathParams as pandas dataframe

        :param path_params: PathParams to calculate the path patterns
        """
        data_path = self._data_path(path_params=path_params)
        df = self._load_as_dataframe(data_path)

        return df

    @staticmethod
    def _latest_snapshots(path: AnyPath):
        path_subfolders = list(path.iterdir())
        logger.debug(f"subfolders: {path_subfolders} in {path}")

        re_sd = re.compile(r"snapshot_date=(\d{4}-\d{2}-\d{2})")

        snapshot_dates = sum(
            [re_sd.findall(i.name) for i in path_subfolders], []
        )  # type: List[str]
        logger.debug(f"snapshot_dates: {snapshot_dates}")

        snapshot_dates_latest = sorted(
            snapshot_dates, key=lambda x: datetime.date.fromisoformat(x)
        )[-1]

        return snapshot_dates_latest

__call__(path_params) ¤

load the data specified in a PathParams as pandas dataframe

Parameters:

Name Type Description Default
path_params PathParams

PathParams to calculate the path patterns

required
Source code in sm_trendy/aggregate/agg.py
70
71
72
73
74
75
76
77
78
79
def __call__(self, path_params: PathParams) -> pd.DataFrame:
    """
    load the data specified in a PathParams as pandas dataframe

    :param path_params: PathParams to calculate the path patterns
    """
    data_path = self._data_path(path_params=path_params)
    df = self._load_as_dataframe(data_path)

    return df