apapi.bulk

apapi.bulk

Child of Basic Connection class, responsible for Bulk API capabilities

  1"""
  2apapi.bulk
  3
  4Child of Basic Connection class, responsible for Bulk API capabilities
  5"""
  6from __future__ import annotations
  7
  8import json
  9
 10from requests import Response
 11
 12from .basic_connection import BasicConnection
 13from .utils import DEFAULT_DATA, MIMEType
 14
 15
 16class BulkConnection(BasicConnection):
 17    """Anaplan connection with Bulk API functions."""
 18
 19    # Actions
 20    def generic_get_actions(self, model_id: str, action_type: str) -> Response:
 21        """Get the list of available actions of given type."""
 22        return self.request(
 23            "GET",
 24            f"{self._api_main_url}/models/{model_id}/{action_type}",
 25        )
 26
 27    def get_imports(self, model_id: str) -> Response:
 28        """Get the list of available imports."""
 29        return self.generic_get_actions(model_id, "imports")
 30
 31    def get_exports(self, model_id: str) -> Response:
 32        """Get the list of available exports."""
 33        return self.generic_get_actions(model_id, "exports")
 34
 35    def get_actions(self, model_id: str) -> Response:
 36        """Get the list of available deletions."""
 37        return self.generic_get_actions(model_id, "actions")
 38
 39    def get_processes(self, model_id: str) -> Response:
 40        """Get the list of available processes."""
 41        return self.generic_get_actions(model_id, "processes")
 42
 43    def get_files(self, model_id: str) -> Response:
 44        """Get the list of available files."""
 45        return self.generic_get_actions(model_id, "files")
 46
 47    # Action details
 48    def get_import(self, model_id: str, import_id: str) -> Response:
 49        """Get import details."""
 50        return self.request(
 51            "GET", f"{self._api_main_url}/models/{model_id}/imports/{import_id}"
 52        )
 53
 54    def get_export(self, model_id: str, export_id: str) -> Response:
 55        """Get export details."""
 56        return self.request(
 57            "GET", f"{self._api_main_url}/models/{model_id}/exports/{export_id}"
 58        )
 59
 60    def get_process(
 61        self, model_id: str, process_id: str, details: bool = None
 62    ) -> Response:
 63        """Get process details."""
 64        return self.request(
 65            "GET",
 66            f"{self._api_main_url}/models/{model_id}/processes/{process_id}",
 67            {"showImportDataSource": self.details if details is None else details},
 68        )
 69
 70    # Files manipulation
 71    def put_file(self, model_id: str, file_id: str, data: bytes) -> Response:
 72        """Upload file in one go.
 73
 74        **WARNING**: For bigger files (or if this method fails)
 75        BulkConnection.upload_file() should be used instead.
 76        """
 77        return self.request(
 78            "PUT",
 79            f"{self._api_main_url}/models/{model_id}/files/{file_id}",
 80            data=data,
 81            headers={"Content-Type": MIMEType.APP_8STREAM.value},
 82        )
 83
 84    def _set_file_chunk_count(
 85        self, model_id: str, file_id: str, count: int
 86    ) -> Response:
 87        """Set file chunk count before uploading content in chunks.
 88
 89        If you don't know how many chunks the file will have, set count to -1.
 90        Chunks should have size between 1 and 50 MBs.
 91        """
 92        return self.request(
 93            "POST",
 94            f"{self._api_main_url}/models/{model_id}/files/{file_id}",
 95            data=json.dumps({"chunkCount": count}),
 96        )
 97
 98    def _upload_file_chunk(
 99        self,
100        model_id: str,
101        file_id: str,
102        data: bytes,
103        chunk: int,
104        content_type: MIMEType = MIMEType.APP_8STREAM,
105    ) -> Response:
106        """Upload contents of a file chunk."""
107        return self.request(
108            "PUT",
109            f"{self._api_main_url}/models/{model_id}/files/{file_id}/chunks/{chunk}",
110            data=data,
111            headers={"Content-Type": content_type.value},
112        )
113
114    def _set_file_upload_complete(self, model_id: str, file_id: str) -> Response:
115        """Finalize upload in chunks by setting it as complete."""
116        return self.request(
117            "POST",
118            f"{self._api_main_url}/models/{model_id}/files/{file_id}/complete",
119            data=json.dumps({"id": file_id}),
120        )
121
122    def upload_file(
123        self,
124        model_id: str,
125        file_id: str,
126        data: [bytes],
127        content_type: MIMEType = MIMEType.APP_8STREAM,
128    ) -> Response:
129        """Upload file (to be used by an import action) chunk by chunk.
130
131        Tip: For smaller files, much faster method (only one request is sent)
132        BulkConnection.put_file() can be used instead.
133        """
134        self._set_file_chunk_count(model_id, file_id, -1)
135        for chunk_count, chunk in enumerate(data):
136            self._upload_file_chunk(model_id, file_id, chunk, chunk_count, content_type)
137        return self._set_file_upload_complete(model_id, file_id)
138
139    def get_file(self, model_id: str, file_id: str) -> Response:
140        """Download file (uploaded or generated by an export action) in one go.
141
142        **WARNING**: For bigger files (or if this method fails)
143        BulkConnection.download_file() should be used instead.
144        """
145        return self.request(
146            "GET",
147            f"{self._api_main_url}/models/{model_id}/files/{file_id}",
148            headers={"Accept": MIMEType.APP_8STREAM.value},
149        )
150
151    def _get_chunks(self, model_id: str, file_id: str) -> Response:
152        """Get number of chunks available for an export."""
153        url = f"{self._api_main_url}/models/{model_id}/files/{file_id}/chunks"
154        response = self.request("GET", url)
155        if not response.json()["meta"]["paging"]["currentPageSize"]:
156            raise Exception("Missing part in request response", url, response.text)
157        return response
158
159    def _get_chunk(self, model_id: str, file_id: str, chunk: int) -> Response:
160        """Get number of chunks available for an export."""
161        url = f"{self._api_main_url}/models/{model_id}/files/{file_id}/chunks/{chunk}"
162        return self.request("GET", url, headers={"Accept": MIMEType.APP_8STREAM.value})
163
164    def download_file(self, model_id: str, file_id: str) -> [bytes]:
165        """Download file (uploaded or generated by an export action) chunk by chunk.
166
167        Tip: For smaller files, much faster method (only one request is sent)
168        BulkConnection.get_file() can be used instead.
169        """
170        response = self._get_chunks(model_id, file_id)
171        return (
172            self._get_chunk(model_id, file_id, int(chunk_id["id"])).content
173            for chunk_id in response.json()["chunks"]
174        )
175
176    def delete_file(self, model_id: str, file_id: str) -> Response:
177        """Delete previously uploaded file from the model's memory."""
178        return self.request(
179            "DELETE",
180            f"{self._api_main_url}/models/{model_id}/files/{file_id}",
181        )
182
183    # Run action
184    def generic_run_action(
185        self,
186        model_id: str,
187        action_id: str,
188        action_type: str,
189        data: dict = None,
190    ) -> Response:
191        """Run an action of given type.
192
193        Data parameter should be provided only for imports with mapping,
194        and processes that contain such imports. It should be dict of key-value pairs
195        of dimension & item that need to be applied to the execution of such action.
196        """
197        mapping = DEFAULT_DATA.copy()
198        if data is not None:
199            mapping["mappingParameters"] = [
200                {"entityType": key, "entityName": data[key]} for key in data
201            ]
202        return self.request(
203            "POST",
204            f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks",
205            data=json.dumps(mapping),
206        )
207
208    def run_import(self, model_id: str, import_id: str, data: dict = None) -> Response:
209        """Run an import action.
210
211        Data parameter should be provided only for imports with mapping.
212        It should be dict of key-value pairs of dimension & item that need to be applied
213        to the execution of such import.
214        """
215        return self.generic_run_action(model_id, import_id, "imports", data)
216
217    def run_export(self, model_id: str, export_id: str) -> Response:
218        """Run an export action."""
219        return self.generic_run_action(model_id, export_id, "exports")
220
221    def run_action(self, model_id: str, action_id: str) -> Response:
222        """Run a deletion action."""
223        return self.generic_run_action(model_id, action_id, "actions")
224
225    def run_process(
226        self, model_id: str, process_id: str, data: dict = None
227    ) -> Response:
228        """Run an import action.
229
230        Data parameter should be provided only for processes that contain
231        imports with mapping. It should be dict of key-value pairs of dimension & item
232        that need to be applied to the execution of such process.
233        """
234        return self.generic_run_action(model_id, process_id, "processes", data)
235
236    # Get tasks
237    def generic_get_action_tasks(
238        self, model_id: str, action_id: str, action_type: str
239    ) -> Response:
240        """Get the list of action tasks of given type."""
241        return self.request(
242            "GET",
243            f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks",
244        )
245
246    def get_import_tasks(self, model_id: str, import_id: str) -> Response:
247        """Get the list of import tasks."""
248        return self.generic_get_action_tasks(model_id, import_id, "imports")
249
250    def get_export_tasks(self, model_id: str, export_id: str) -> Response:
251        """Get the list of export tasks."""
252        return self.generic_get_action_tasks(model_id, export_id, "exports")
253
254    def get_action_tasks(self, model_id: str, action_id: str) -> Response:
255        """Get the list of deletion tasks."""
256        return self.generic_get_action_tasks(model_id, action_id, "actions")
257
258    def get_process_tasks(self, model_id: str, process_id: str) -> Response:
259        """Get the list of process tasks."""
260        return self.generic_get_action_tasks(model_id, process_id, "processes")
261
262    # Get task
263    def generic_get_action_task(
264        self, model_id: str, action_id: str, task_id: str, action_type: str
265    ) -> Response:
266        """Get the status of an action task of given type."""
267        return self.request(
268            "GET",
269            f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks/{task_id}",
270        )
271
272    def get_import_task(self, model_id: str, import_id: str, task_id: str) -> Response:
273        """Get the status of an import task.
274
275        In case of failure, failure dump can be downloaded using
276        BulkConnection.get_import_dump().
277        """
278        return self.generic_get_action_task(model_id, import_id, task_id, "imports")
279
280    def get_export_task(self, model_id: str, export_id: str, task_id: str) -> Response:
281        """Get the status of an export task."""
282        return self.generic_get_action_task(model_id, export_id, task_id, "exports")
283
284    def get_action_task(self, model_id: str, action_id: str, task_id: str) -> Response:
285        """Get the status of a deletion task."""
286        return self.generic_get_action_task(model_id, action_id, task_id, "actions")
287
288    def get_process_task(
289        self, model_id: str, process_id: str, task_id: str
290    ) -> Response:
291        """Get the status of a process task.
292
293        In case of failure, use nested results to obtain objectId that can be used to
294        download failure dump using BulkConnection.get_process_dump().
295        """
296        return self.generic_get_action_task(model_id, process_id, task_id, "processes")
297
298    # Get dump
299    def get_import_dump(self, model_id: str, import_id: str, task_id: str) -> Response:
300        """Downloads import task failure dump file in one go.
301
302        **WARNING**: For bigger files (or if this method fails)
303        BulkConnection.download_import_dump() should be used instead.
304        """
305        return self.request(
306            "GET",
307            f"{self._api_main_url}/models/{model_id}/imports/{import_id}/tasks/{task_id}/dump",
308            headers={"Accept": MIMEType.APP_8STREAM.value},
309        )
310
311    def download_import_dump(
312        self, model_id: str, import_id: str, task_id: str
313    ) -> bytes:
314        """Downloads import task failure dump file chunk by chunk.
315
316        Tip: For smaller files, much faster method (only one request is sent)
317        BulkConnection.get_import_dump() can be used instead.
318        """
319        url = f"{self._api_main_url}/models/{model_id}/imports/{import_id}/tasks/{task_id}/dump/chunks"
320        response = self.request("GET", url)
321        if not response.json()["meta"]["paging"]["currentPageSize"]:
322            raise Exception("Missing part in request response", url, response.text)
323        return b"".join(
324            self.request(
325                "GET",
326                f"{url}/{chunk_id['id']}",
327                headers={"Accept": MIMEType.APP_8STREAM.value},
328            ).content
329            for chunk_id in response.json()["chunks"]
330        )
331
332    def get_process_dump(
333        self, model_id: str, process_id: str, task_id: str, object_id: str
334    ) -> Response:
335        """Downloads process task failure dump file in one go.
336
337        **WARNING**: For bigger files (or if this method fails)
338        BulkConnection.download_process_dump() should be used instead.
339        """
340        return self.request(
341            "GET",
342            f"{self._api_main_url}/models/{model_id}/processes/{process_id}/tasks/{task_id}/dumps/{object_id}",
343            headers={"Accept": MIMEType.APP_8STREAM.value},
344        )
345
346    def download_process_dump(
347        self, model_id: str, process_id: str, task_id: str, object_id: str
348    ) -> bytes:
349        """Downloads process task failure dump file chunk by chunk.
350
351        Tip: For smaller files, much faster method (only one request is sent)
352        BulkConnection.get_process_dump() can be used instead.
353        """
354        url = f"{self._api_main_url}/models/{model_id}/processes/{process_id}/tasks/{task_id}/dumps/{object_id}/chunks"
355        response = self.request("GET", url)
356        if not response.json()["meta"]["paging"]["currentPageSize"]:
357            raise Exception("Missing part in request response", url, response.text)
358        return b"".join(
359            self.request(
360                "GET",
361                f"{url}/{chunk_id['id']}",
362                headers={"Accept": MIMEType.APP_8STREAM.value},
363            ).content
364            for chunk_id in response.json()["chunks"]
365        )
class BulkConnection(apapi.basic_connection.BasicConnection):
 17class BulkConnection(BasicConnection):
 18    """Anaplan connection with Bulk API functions."""
 19
 20    # Actions
 21    def generic_get_actions(self, model_id: str, action_type: str) -> Response:
 22        """Get the list of available actions of given type."""
 23        return self.request(
 24            "GET",
 25            f"{self._api_main_url}/models/{model_id}/{action_type}",
 26        )
 27
 28    def get_imports(self, model_id: str) -> Response:
 29        """Get the list of available imports."""
 30        return self.generic_get_actions(model_id, "imports")
 31
 32    def get_exports(self, model_id: str) -> Response:
 33        """Get the list of available exports."""
 34        return self.generic_get_actions(model_id, "exports")
 35
 36    def get_actions(self, model_id: str) -> Response:
 37        """Get the list of available deletions."""
 38        return self.generic_get_actions(model_id, "actions")
 39
 40    def get_processes(self, model_id: str) -> Response:
 41        """Get the list of available processes."""
 42        return self.generic_get_actions(model_id, "processes")
 43
 44    def get_files(self, model_id: str) -> Response:
 45        """Get the list of available files."""
 46        return self.generic_get_actions(model_id, "files")
 47
 48    # Action details
 49    def get_import(self, model_id: str, import_id: str) -> Response:
 50        """Get import details."""
 51        return self.request(
 52            "GET", f"{self._api_main_url}/models/{model_id}/imports/{import_id}"
 53        )
 54
 55    def get_export(self, model_id: str, export_id: str) -> Response:
 56        """Get export details."""
 57        return self.request(
 58            "GET", f"{self._api_main_url}/models/{model_id}/exports/{export_id}"
 59        )
 60
 61    def get_process(
 62        self, model_id: str, process_id: str, details: bool = None
 63    ) -> Response:
 64        """Get process details."""
 65        return self.request(
 66            "GET",
 67            f"{self._api_main_url}/models/{model_id}/processes/{process_id}",
 68            {"showImportDataSource": self.details if details is None else details},
 69        )
 70
 71    # Files manipulation
 72    def put_file(self, model_id: str, file_id: str, data: bytes) -> Response:
 73        """Upload file in one go.
 74
 75        **WARNING**: For bigger files (or if this method fails)
 76        BulkConnection.upload_file() should be used instead.
 77        """
 78        return self.request(
 79            "PUT",
 80            f"{self._api_main_url}/models/{model_id}/files/{file_id}",
 81            data=data,
 82            headers={"Content-Type": MIMEType.APP_8STREAM.value},
 83        )
 84
 85    def _set_file_chunk_count(
 86        self, model_id: str, file_id: str, count: int
 87    ) -> Response:
 88        """Set file chunk count before uploading content in chunks.
 89
 90        If you don't know how many chunks the file will have, set count to -1.
 91        Chunks should have size between 1 and 50 MBs.
 92        """
 93        return self.request(
 94            "POST",
 95            f"{self._api_main_url}/models/{model_id}/files/{file_id}",
 96            data=json.dumps({"chunkCount": count}),
 97        )
 98
 99    def _upload_file_chunk(
100        self,
101        model_id: str,
102        file_id: str,
103        data: bytes,
104        chunk: int,
105        content_type: MIMEType = MIMEType.APP_8STREAM,
106    ) -> Response:
107        """Upload contents of a file chunk."""
108        return self.request(
109            "PUT",
110            f"{self._api_main_url}/models/{model_id}/files/{file_id}/chunks/{chunk}",
111            data=data,
112            headers={"Content-Type": content_type.value},
113        )
114
115    def _set_file_upload_complete(self, model_id: str, file_id: str) -> Response:
116        """Finalize upload in chunks by setting it as complete."""
117        return self.request(
118            "POST",
119            f"{self._api_main_url}/models/{model_id}/files/{file_id}/complete",
120            data=json.dumps({"id": file_id}),
121        )
122
123    def upload_file(
124        self,
125        model_id: str,
126        file_id: str,
127        data: [bytes],
128        content_type: MIMEType = MIMEType.APP_8STREAM,
129    ) -> Response:
130        """Upload file (to be used by an import action) chunk by chunk.
131
132        Tip: For smaller files, much faster method (only one request is sent)
133        BulkConnection.put_file() can be used instead.
134        """
135        self._set_file_chunk_count(model_id, file_id, -1)
136        for chunk_count, chunk in enumerate(data):
137            self._upload_file_chunk(model_id, file_id, chunk, chunk_count, content_type)
138        return self._set_file_upload_complete(model_id, file_id)
139
140    def get_file(self, model_id: str, file_id: str) -> Response:
141        """Download file (uploaded or generated by an export action) in one go.
142
143        **WARNING**: For bigger files (or if this method fails)
144        BulkConnection.download_file() should be used instead.
145        """
146        return self.request(
147            "GET",
148            f"{self._api_main_url}/models/{model_id}/files/{file_id}",
149            headers={"Accept": MIMEType.APP_8STREAM.value},
150        )
151
152    def _get_chunks(self, model_id: str, file_id: str) -> Response:
153        """Get number of chunks available for an export."""
154        url = f"{self._api_main_url}/models/{model_id}/files/{file_id}/chunks"
155        response = self.request("GET", url)
156        if not response.json()["meta"]["paging"]["currentPageSize"]:
157            raise Exception("Missing part in request response", url, response.text)
158        return response
159
160    def _get_chunk(self, model_id: str, file_id: str, chunk: int) -> Response:
161        """Get number of chunks available for an export."""
162        url = f"{self._api_main_url}/models/{model_id}/files/{file_id}/chunks/{chunk}"
163        return self.request("GET", url, headers={"Accept": MIMEType.APP_8STREAM.value})
164
165    def download_file(self, model_id: str, file_id: str) -> [bytes]:
166        """Download file (uploaded or generated by an export action) chunk by chunk.
167
168        Tip: For smaller files, much faster method (only one request is sent)
169        BulkConnection.get_file() can be used instead.
170        """
171        response = self._get_chunks(model_id, file_id)
172        return (
173            self._get_chunk(model_id, file_id, int(chunk_id["id"])).content
174            for chunk_id in response.json()["chunks"]
175        )
176
177    def delete_file(self, model_id: str, file_id: str) -> Response:
178        """Delete previously uploaded file from the model's memory."""
179        return self.request(
180            "DELETE",
181            f"{self._api_main_url}/models/{model_id}/files/{file_id}",
182        )
183
184    # Run action
185    def generic_run_action(
186        self,
187        model_id: str,
188        action_id: str,
189        action_type: str,
190        data: dict = None,
191    ) -> Response:
192        """Run an action of given type.
193
194        Data parameter should be provided only for imports with mapping,
195        and processes that contain such imports. It should be dict of key-value pairs
196        of dimension & item that need to be applied to the execution of such action.
197        """
198        mapping = DEFAULT_DATA.copy()
199        if data is not None:
200            mapping["mappingParameters"] = [
201                {"entityType": key, "entityName": data[key]} for key in data
202            ]
203        return self.request(
204            "POST",
205            f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks",
206            data=json.dumps(mapping),
207        )
208
209    def run_import(self, model_id: str, import_id: str, data: dict = None) -> Response:
210        """Run an import action.
211
212        Data parameter should be provided only for imports with mapping.
213        It should be dict of key-value pairs of dimension & item that need to be applied
214        to the execution of such import.
215        """
216        return self.generic_run_action(model_id, import_id, "imports", data)
217
218    def run_export(self, model_id: str, export_id: str) -> Response:
219        """Run an export action."""
220        return self.generic_run_action(model_id, export_id, "exports")
221
222    def run_action(self, model_id: str, action_id: str) -> Response:
223        """Run a deletion action."""
224        return self.generic_run_action(model_id, action_id, "actions")
225
226    def run_process(
227        self, model_id: str, process_id: str, data: dict = None
228    ) -> Response:
229        """Run an import action.
230
231        Data parameter should be provided only for processes that contain
232        imports with mapping. It should be dict of key-value pairs of dimension & item
233        that need to be applied to the execution of such process.
234        """
235        return self.generic_run_action(model_id, process_id, "processes", data)
236
237    # Get tasks
238    def generic_get_action_tasks(
239        self, model_id: str, action_id: str, action_type: str
240    ) -> Response:
241        """Get the list of action tasks of given type."""
242        return self.request(
243            "GET",
244            f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks",
245        )
246
247    def get_import_tasks(self, model_id: str, import_id: str) -> Response:
248        """Get the list of import tasks."""
249        return self.generic_get_action_tasks(model_id, import_id, "imports")
250
251    def get_export_tasks(self, model_id: str, export_id: str) -> Response:
252        """Get the list of export tasks."""
253        return self.generic_get_action_tasks(model_id, export_id, "exports")
254
255    def get_action_tasks(self, model_id: str, action_id: str) -> Response:
256        """Get the list of deletion tasks."""
257        return self.generic_get_action_tasks(model_id, action_id, "actions")
258
259    def get_process_tasks(self, model_id: str, process_id: str) -> Response:
260        """Get the list of process tasks."""
261        return self.generic_get_action_tasks(model_id, process_id, "processes")
262
263    # Get task
264    def generic_get_action_task(
265        self, model_id: str, action_id: str, task_id: str, action_type: str
266    ) -> Response:
267        """Get the status of an action task of given type."""
268        return self.request(
269            "GET",
270            f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks/{task_id}",
271        )
272
273    def get_import_task(self, model_id: str, import_id: str, task_id: str) -> Response:
274        """Get the status of an import task.
275
276        In case of failure, failure dump can be downloaded using
277        BulkConnection.get_import_dump().
278        """
279        return self.generic_get_action_task(model_id, import_id, task_id, "imports")
280
281    def get_export_task(self, model_id: str, export_id: str, task_id: str) -> Response:
282        """Get the status of an export task."""
283        return self.generic_get_action_task(model_id, export_id, task_id, "exports")
284
285    def get_action_task(self, model_id: str, action_id: str, task_id: str) -> Response:
286        """Get the status of a deletion task."""
287        return self.generic_get_action_task(model_id, action_id, task_id, "actions")
288
289    def get_process_task(
290        self, model_id: str, process_id: str, task_id: str
291    ) -> Response:
292        """Get the status of a process task.
293
294        In case of failure, use nested results to obtain objectId that can be used to
295        download failure dump using BulkConnection.get_process_dump().
296        """
297        return self.generic_get_action_task(model_id, process_id, task_id, "processes")
298
299    # Get dump
300    def get_import_dump(self, model_id: str, import_id: str, task_id: str) -> Response:
301        """Downloads import task failure dump file in one go.
302
303        **WARNING**: For bigger files (or if this method fails)
304        BulkConnection.download_import_dump() should be used instead.
305        """
306        return self.request(
307            "GET",
308            f"{self._api_main_url}/models/{model_id}/imports/{import_id}/tasks/{task_id}/dump",
309            headers={"Accept": MIMEType.APP_8STREAM.value},
310        )
311
312    def download_import_dump(
313        self, model_id: str, import_id: str, task_id: str
314    ) -> bytes:
315        """Downloads import task failure dump file chunk by chunk.
316
317        Tip: For smaller files, much faster method (only one request is sent)
318        BulkConnection.get_import_dump() can be used instead.
319        """
320        url = f"{self._api_main_url}/models/{model_id}/imports/{import_id}/tasks/{task_id}/dump/chunks"
321        response = self.request("GET", url)
322        if not response.json()["meta"]["paging"]["currentPageSize"]:
323            raise Exception("Missing part in request response", url, response.text)
324        return b"".join(
325            self.request(
326                "GET",
327                f"{url}/{chunk_id['id']}",
328                headers={"Accept": MIMEType.APP_8STREAM.value},
329            ).content
330            for chunk_id in response.json()["chunks"]
331        )
332
333    def get_process_dump(
334        self, model_id: str, process_id: str, task_id: str, object_id: str
335    ) -> Response:
336        """Downloads process task failure dump file in one go.
337
338        **WARNING**: For bigger files (or if this method fails)
339        BulkConnection.download_process_dump() should be used instead.
340        """
341        return self.request(
342            "GET",
343            f"{self._api_main_url}/models/{model_id}/processes/{process_id}/tasks/{task_id}/dumps/{object_id}",
344            headers={"Accept": MIMEType.APP_8STREAM.value},
345        )
346
347    def download_process_dump(
348        self, model_id: str, process_id: str, task_id: str, object_id: str
349    ) -> bytes:
350        """Downloads process task failure dump file chunk by chunk.
351
352        Tip: For smaller files, much faster method (only one request is sent)
353        BulkConnection.get_process_dump() can be used instead.
354        """
355        url = f"{self._api_main_url}/models/{model_id}/processes/{process_id}/tasks/{task_id}/dumps/{object_id}/chunks"
356        response = self.request("GET", url)
357        if not response.json()["meta"]["paging"]["currentPageSize"]:
358            raise Exception("Missing part in request response", url, response.text)
359        return b"".join(
360            self.request(
361                "GET",
362                f"{url}/{chunk_id['id']}",
363                headers={"Accept": MIMEType.APP_8STREAM.value},
364            ).content
365            for chunk_id in response.json()["chunks"]
366        )

Anaplan connection with Bulk API functions.

def generic_get_actions(self, model_id: str, action_type: str) -> requests.models.Response:
21    def generic_get_actions(self, model_id: str, action_type: str) -> Response:
22        """Get the list of available actions of given type."""
23        return self.request(
24            "GET",
25            f"{self._api_main_url}/models/{model_id}/{action_type}",
26        )

Get the list of available actions of given type.

def get_imports(self, model_id: str) -> requests.models.Response:
28    def get_imports(self, model_id: str) -> Response:
29        """Get the list of available imports."""
30        return self.generic_get_actions(model_id, "imports")

Get the list of available imports.

def get_exports(self, model_id: str) -> requests.models.Response:
32    def get_exports(self, model_id: str) -> Response:
33        """Get the list of available exports."""
34        return self.generic_get_actions(model_id, "exports")

Get the list of available exports.

def get_actions(self, model_id: str) -> requests.models.Response:
36    def get_actions(self, model_id: str) -> Response:
37        """Get the list of available deletions."""
38        return self.generic_get_actions(model_id, "actions")

Get the list of available deletions.

def get_processes(self, model_id: str) -> requests.models.Response:
40    def get_processes(self, model_id: str) -> Response:
41        """Get the list of available processes."""
42        return self.generic_get_actions(model_id, "processes")

Get the list of available processes.

def get_files(self, model_id: str) -> requests.models.Response:
44    def get_files(self, model_id: str) -> Response:
45        """Get the list of available files."""
46        return self.generic_get_actions(model_id, "files")

Get the list of available files.

def get_import(self, model_id: str, import_id: str) -> requests.models.Response:
49    def get_import(self, model_id: str, import_id: str) -> Response:
50        """Get import details."""
51        return self.request(
52            "GET", f"{self._api_main_url}/models/{model_id}/imports/{import_id}"
53        )

Get import details.

def get_export(self, model_id: str, export_id: str) -> requests.models.Response:
55    def get_export(self, model_id: str, export_id: str) -> Response:
56        """Get export details."""
57        return self.request(
58            "GET", f"{self._api_main_url}/models/{model_id}/exports/{export_id}"
59        )

Get export details.

def get_process( self, model_id: str, process_id: str, details: bool = None) -> requests.models.Response:
61    def get_process(
62        self, model_id: str, process_id: str, details: bool = None
63    ) -> Response:
64        """Get process details."""
65        return self.request(
66            "GET",
67            f"{self._api_main_url}/models/{model_id}/processes/{process_id}",
68            {"showImportDataSource": self.details if details is None else details},
69        )

Get process details.

def put_file( self, model_id: str, file_id: str, data: bytes) -> requests.models.Response:
72    def put_file(self, model_id: str, file_id: str, data: bytes) -> Response:
73        """Upload file in one go.
74
75        **WARNING**: For bigger files (or if this method fails)
76        BulkConnection.upload_file() should be used instead.
77        """
78        return self.request(
79            "PUT",
80            f"{self._api_main_url}/models/{model_id}/files/{file_id}",
81            data=data,
82            headers={"Content-Type": MIMEType.APP_8STREAM.value},
83        )

Upload file in one go.

WARNING: For bigger files (or if this method fails) BulkConnection.upload_file() should be used instead.

def upload_file( self, model_id: str, file_id: str, data: [<class 'bytes'>], content_type: apapi.utils.MIMEType = <MIMEType.APP_8STREAM: 'application/octet-stream'>) -> requests.models.Response:
123    def upload_file(
124        self,
125        model_id: str,
126        file_id: str,
127        data: [bytes],
128        content_type: MIMEType = MIMEType.APP_8STREAM,
129    ) -> Response:
130        """Upload file (to be used by an import action) chunk by chunk.
131
132        Tip: For smaller files, much faster method (only one request is sent)
133        BulkConnection.put_file() can be used instead.
134        """
135        self._set_file_chunk_count(model_id, file_id, -1)
136        for chunk_count, chunk in enumerate(data):
137            self._upload_file_chunk(model_id, file_id, chunk, chunk_count, content_type)
138        return self._set_file_upload_complete(model_id, file_id)

Upload file (to be used by an import action) chunk by chunk.

Tip: For smaller files, much faster method (only one request is sent) BulkConnection.put_file() can be used instead.

def get_file(self, model_id: str, file_id: str) -> requests.models.Response:
140    def get_file(self, model_id: str, file_id: str) -> Response:
141        """Download file (uploaded or generated by an export action) in one go.
142
143        **WARNING**: For bigger files (or if this method fails)
144        BulkConnection.download_file() should be used instead.
145        """
146        return self.request(
147            "GET",
148            f"{self._api_main_url}/models/{model_id}/files/{file_id}",
149            headers={"Accept": MIMEType.APP_8STREAM.value},
150        )

Download file (uploaded or generated by an export action) in one go.

WARNING: For bigger files (or if this method fails) BulkConnection.download_file() should be used instead.

def download_file(self, model_id: str, file_id: str) -> [<class 'bytes'>]:
165    def download_file(self, model_id: str, file_id: str) -> [bytes]:
166        """Download file (uploaded or generated by an export action) chunk by chunk.
167
168        Tip: For smaller files, much faster method (only one request is sent)
169        BulkConnection.get_file() can be used instead.
170        """
171        response = self._get_chunks(model_id, file_id)
172        return (
173            self._get_chunk(model_id, file_id, int(chunk_id["id"])).content
174            for chunk_id in response.json()["chunks"]
175        )

Download file (uploaded or generated by an export action) chunk by chunk.

Tip: For smaller files, much faster method (only one request is sent) BulkConnection.get_file() can be used instead.

def delete_file(self, model_id: str, file_id: str) -> requests.models.Response:
177    def delete_file(self, model_id: str, file_id: str) -> Response:
178        """Delete previously uploaded file from the model's memory."""
179        return self.request(
180            "DELETE",
181            f"{self._api_main_url}/models/{model_id}/files/{file_id}",
182        )

Delete previously uploaded file from the model's memory.

def generic_run_action( self, model_id: str, action_id: str, action_type: str, data: dict = None) -> requests.models.Response:
185    def generic_run_action(
186        self,
187        model_id: str,
188        action_id: str,
189        action_type: str,
190        data: dict = None,
191    ) -> Response:
192        """Run an action of given type.
193
194        Data parameter should be provided only for imports with mapping,
195        and processes that contain such imports. It should be dict of key-value pairs
196        of dimension & item that need to be applied to the execution of such action.
197        """
198        mapping = DEFAULT_DATA.copy()
199        if data is not None:
200            mapping["mappingParameters"] = [
201                {"entityType": key, "entityName": data[key]} for key in data
202            ]
203        return self.request(
204            "POST",
205            f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks",
206            data=json.dumps(mapping),
207        )

Run an action of given type.

Data parameter should be provided only for imports with mapping, and processes that contain such imports. It should be dict of key-value pairs of dimension & item that need to be applied to the execution of such action.

def run_import( self, model_id: str, import_id: str, data: dict = None) -> requests.models.Response:
209    def run_import(self, model_id: str, import_id: str, data: dict = None) -> Response:
210        """Run an import action.
211
212        Data parameter should be provided only for imports with mapping.
213        It should be dict of key-value pairs of dimension & item that need to be applied
214        to the execution of such import.
215        """
216        return self.generic_run_action(model_id, import_id, "imports", data)

Run an import action.

Data parameter should be provided only for imports with mapping. It should be dict of key-value pairs of dimension & item that need to be applied to the execution of such import.

def run_export(self, model_id: str, export_id: str) -> requests.models.Response:
218    def run_export(self, model_id: str, export_id: str) -> Response:
219        """Run an export action."""
220        return self.generic_run_action(model_id, export_id, "exports")

Run an export action.

def run_action(self, model_id: str, action_id: str) -> requests.models.Response:
222    def run_action(self, model_id: str, action_id: str) -> Response:
223        """Run a deletion action."""
224        return self.generic_run_action(model_id, action_id, "actions")

Run a deletion action.

def run_process( self, model_id: str, process_id: str, data: dict = None) -> requests.models.Response:
226    def run_process(
227        self, model_id: str, process_id: str, data: dict = None
228    ) -> Response:
229        """Run an import action.
230
231        Data parameter should be provided only for processes that contain
232        imports with mapping. It should be dict of key-value pairs of dimension & item
233        that need to be applied to the execution of such process.
234        """
235        return self.generic_run_action(model_id, process_id, "processes", data)

Run an import action.

Data parameter should be provided only for processes that contain imports with mapping. It should be dict of key-value pairs of dimension & item that need to be applied to the execution of such process.

def generic_get_action_tasks( self, model_id: str, action_id: str, action_type: str) -> requests.models.Response:
238    def generic_get_action_tasks(
239        self, model_id: str, action_id: str, action_type: str
240    ) -> Response:
241        """Get the list of action tasks of given type."""
242        return self.request(
243            "GET",
244            f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks",
245        )

Get the list of action tasks of given type.

def get_import_tasks(self, model_id: str, import_id: str) -> requests.models.Response:
247    def get_import_tasks(self, model_id: str, import_id: str) -> Response:
248        """Get the list of import tasks."""
249        return self.generic_get_action_tasks(model_id, import_id, "imports")

Get the list of import tasks.

def get_export_tasks(self, model_id: str, export_id: str) -> requests.models.Response:
251    def get_export_tasks(self, model_id: str, export_id: str) -> Response:
252        """Get the list of export tasks."""
253        return self.generic_get_action_tasks(model_id, export_id, "exports")

Get the list of export tasks.

def get_action_tasks(self, model_id: str, action_id: str) -> requests.models.Response:
255    def get_action_tasks(self, model_id: str, action_id: str) -> Response:
256        """Get the list of deletion tasks."""
257        return self.generic_get_action_tasks(model_id, action_id, "actions")

Get the list of deletion tasks.

def get_process_tasks(self, model_id: str, process_id: str) -> requests.models.Response:
259    def get_process_tasks(self, model_id: str, process_id: str) -> Response:
260        """Get the list of process tasks."""
261        return self.generic_get_action_tasks(model_id, process_id, "processes")

Get the list of process tasks.

def generic_get_action_task( self, model_id: str, action_id: str, task_id: str, action_type: str) -> requests.models.Response:
264    def generic_get_action_task(
265        self, model_id: str, action_id: str, task_id: str, action_type: str
266    ) -> Response:
267        """Get the status of an action task of given type."""
268        return self.request(
269            "GET",
270            f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks/{task_id}",
271        )

Get the status of an action task of given type.

def get_import_task( self, model_id: str, import_id: str, task_id: str) -> requests.models.Response:
273    def get_import_task(self, model_id: str, import_id: str, task_id: str) -> Response:
274        """Get the status of an import task.
275
276        In case of failure, failure dump can be downloaded using
277        BulkConnection.get_import_dump().
278        """
279        return self.generic_get_action_task(model_id, import_id, task_id, "imports")

Get the status of an import task.

In case of failure, failure dump can be downloaded using BulkConnection.get_import_dump().

def get_export_task( self, model_id: str, export_id: str, task_id: str) -> requests.models.Response:
281    def get_export_task(self, model_id: str, export_id: str, task_id: str) -> Response:
282        """Get the status of an export task."""
283        return self.generic_get_action_task(model_id, export_id, task_id, "exports")

Get the status of an export task.

def get_action_task( self, model_id: str, action_id: str, task_id: str) -> requests.models.Response:
285    def get_action_task(self, model_id: str, action_id: str, task_id: str) -> Response:
286        """Get the status of a deletion task."""
287        return self.generic_get_action_task(model_id, action_id, task_id, "actions")

Get the status of a deletion task.

def get_process_task( self, model_id: str, process_id: str, task_id: str) -> requests.models.Response:
289    def get_process_task(
290        self, model_id: str, process_id: str, task_id: str
291    ) -> Response:
292        """Get the status of a process task.
293
294        In case of failure, use nested results to obtain objectId that can be used to
295        download failure dump using BulkConnection.get_process_dump().
296        """
297        return self.generic_get_action_task(model_id, process_id, task_id, "processes")

Get the status of a process task.

In case of failure, use nested results to obtain objectId that can be used to download failure dump using BulkConnection.get_process_dump().

def get_import_dump( self, model_id: str, import_id: str, task_id: str) -> requests.models.Response:
300    def get_import_dump(self, model_id: str, import_id: str, task_id: str) -> Response:
301        """Downloads import task failure dump file in one go.
302
303        **WARNING**: For bigger files (or if this method fails)
304        BulkConnection.download_import_dump() should be used instead.
305        """
306        return self.request(
307            "GET",
308            f"{self._api_main_url}/models/{model_id}/imports/{import_id}/tasks/{task_id}/dump",
309            headers={"Accept": MIMEType.APP_8STREAM.value},
310        )

Downloads import task failure dump file in one go.

WARNING: For bigger files (or if this method fails) BulkConnection.download_import_dump() should be used instead.

def download_import_dump(self, model_id: str, import_id: str, task_id: str) -> bytes:
312    def download_import_dump(
313        self, model_id: str, import_id: str, task_id: str
314    ) -> bytes:
315        """Downloads import task failure dump file chunk by chunk.
316
317        Tip: For smaller files, much faster method (only one request is sent)
318        BulkConnection.get_import_dump() can be used instead.
319        """
320        url = f"{self._api_main_url}/models/{model_id}/imports/{import_id}/tasks/{task_id}/dump/chunks"
321        response = self.request("GET", url)
322        if not response.json()["meta"]["paging"]["currentPageSize"]:
323            raise Exception("Missing part in request response", url, response.text)
324        return b"".join(
325            self.request(
326                "GET",
327                f"{url}/{chunk_id['id']}",
328                headers={"Accept": MIMEType.APP_8STREAM.value},
329            ).content
330            for chunk_id in response.json()["chunks"]
331        )

Downloads import task failure dump file chunk by chunk.

Tip: For smaller files, much faster method (only one request is sent) BulkConnection.get_import_dump() can be used instead.

def get_process_dump( self, model_id: str, process_id: str, task_id: str, object_id: str) -> requests.models.Response:
333    def get_process_dump(
334        self, model_id: str, process_id: str, task_id: str, object_id: str
335    ) -> Response:
336        """Downloads process task failure dump file in one go.
337
338        **WARNING**: For bigger files (or if this method fails)
339        BulkConnection.download_process_dump() should be used instead.
340        """
341        return self.request(
342            "GET",
343            f"{self._api_main_url}/models/{model_id}/processes/{process_id}/tasks/{task_id}/dumps/{object_id}",
344            headers={"Accept": MIMEType.APP_8STREAM.value},
345        )

Downloads process task failure dump file in one go.

WARNING: For bigger files (or if this method fails) BulkConnection.download_process_dump() should be used instead.

def download_process_dump( self, model_id: str, process_id: str, task_id: str, object_id: str) -> bytes:
347    def download_process_dump(
348        self, model_id: str, process_id: str, task_id: str, object_id: str
349    ) -> bytes:
350        """Downloads process task failure dump file chunk by chunk.
351
352        Tip: For smaller files, much faster method (only one request is sent)
353        BulkConnection.get_process_dump() can be used instead.
354        """
355        url = f"{self._api_main_url}/models/{model_id}/processes/{process_id}/tasks/{task_id}/dumps/{object_id}/chunks"
356        response = self.request("GET", url)
357        if not response.json()["meta"]["paging"]["currentPageSize"]:
358            raise Exception("Missing part in request response", url, response.text)
359        return b"".join(
360            self.request(
361                "GET",
362                f"{url}/{chunk_id['id']}",
363                headers={"Accept": MIMEType.APP_8STREAM.value},
364            ).content
365            for chunk_id in response.json()["chunks"]
366        )

Downloads process task failure dump file chunk by chunk.

Tip: For smaller files, much faster method (only one request is sent) BulkConnection.get_process_dump() can be used instead.