apapi.bulk
Child of Basic Connection class, responsible for Bulk API capabilities
1""" 2apapi.bulk 3 4Child of Basic Connection class, responsible for Bulk API capabilities 5""" 6from __future__ import annotations 7 8import json 9 10from requests import Response 11 12from .basic_connection import BasicConnection 13from .utils import DEFAULT_DATA, MIMEType 14 15 16class BulkConnection(BasicConnection): 17 """Anaplan connection with Bulk API functions.""" 18 19 # Actions 20 def generic_get_actions(self, model_id: str, action_type: str) -> Response: 21 """Get the list of available actions of given type.""" 22 return self.request( 23 "GET", 24 f"{self._api_main_url}/models/{model_id}/{action_type}", 25 ) 26 27 def get_imports(self, model_id: str) -> Response: 28 """Get the list of available imports.""" 29 return self.generic_get_actions(model_id, "imports") 30 31 def get_exports(self, model_id: str) -> Response: 32 """Get the list of available exports.""" 33 return self.generic_get_actions(model_id, "exports") 34 35 def get_actions(self, model_id: str) -> Response: 36 """Get the list of available deletions.""" 37 return self.generic_get_actions(model_id, "actions") 38 39 def get_processes(self, model_id: str) -> Response: 40 """Get the list of available processes.""" 41 return self.generic_get_actions(model_id, "processes") 42 43 def get_files(self, model_id: str) -> Response: 44 """Get the list of available files.""" 45 return self.generic_get_actions(model_id, "files") 46 47 # Action details 48 def get_import(self, model_id: str, import_id: str) -> Response: 49 """Get import details.""" 50 return self.request( 51 "GET", f"{self._api_main_url}/models/{model_id}/imports/{import_id}" 52 ) 53 54 def get_export(self, model_id: str, export_id: str) -> Response: 55 """Get export details.""" 56 return self.request( 57 "GET", f"{self._api_main_url}/models/{model_id}/exports/{export_id}" 58 ) 59 60 def get_process( 61 self, model_id: str, process_id: str, details: bool = None 62 ) -> Response: 63 """Get process details.""" 64 return self.request( 65 "GET", 66 f"{self._api_main_url}/models/{model_id}/processes/{process_id}", 67 {"showImportDataSource": self.details if details is None else details}, 68 ) 69 70 # Files manipulation 71 def put_file(self, model_id: str, file_id: str, data: bytes) -> Response: 72 """Upload file in one go. 73 74 **WARNING**: For bigger files (or if this method fails) 75 BulkConnection.upload_file() should be used instead. 76 """ 77 return self.request( 78 "PUT", 79 f"{self._api_main_url}/models/{model_id}/files/{file_id}", 80 data=data, 81 headers={"Content-Type": MIMEType.APP_8STREAM.value}, 82 ) 83 84 def _set_file_chunk_count( 85 self, model_id: str, file_id: str, count: int 86 ) -> Response: 87 """Set file chunk count before uploading content in chunks. 88 89 If you don't know how many chunks the file will have, set count to -1. 90 Chunks should have size between 1 and 50 MBs. 91 """ 92 return self.request( 93 "POST", 94 f"{self._api_main_url}/models/{model_id}/files/{file_id}", 95 data=json.dumps({"chunkCount": count}), 96 ) 97 98 def _upload_file_chunk( 99 self, 100 model_id: str, 101 file_id: str, 102 data: bytes, 103 chunk: int, 104 content_type: MIMEType = MIMEType.APP_8STREAM, 105 ) -> Response: 106 """Upload contents of a file chunk.""" 107 return self.request( 108 "PUT", 109 f"{self._api_main_url}/models/{model_id}/files/{file_id}/chunks/{chunk}", 110 data=data, 111 headers={"Content-Type": content_type.value}, 112 ) 113 114 def _set_file_upload_complete(self, model_id: str, file_id: str) -> Response: 115 """Finalize upload in chunks by setting it as complete.""" 116 return self.request( 117 "POST", 118 f"{self._api_main_url}/models/{model_id}/files/{file_id}/complete", 119 data=json.dumps({"id": file_id}), 120 ) 121 122 def upload_file( 123 self, 124 model_id: str, 125 file_id: str, 126 data: [bytes], 127 content_type: MIMEType = MIMEType.APP_8STREAM, 128 ) -> Response: 129 """Upload file (to be used by an import action) chunk by chunk. 130 131 Tip: For smaller files, much faster method (only one request is sent) 132 BulkConnection.put_file() can be used instead. 133 """ 134 self._set_file_chunk_count(model_id, file_id, -1) 135 for chunk_count, chunk in enumerate(data): 136 self._upload_file_chunk(model_id, file_id, chunk, chunk_count, content_type) 137 return self._set_file_upload_complete(model_id, file_id) 138 139 def get_file(self, model_id: str, file_id: str) -> Response: 140 """Download file (uploaded or generated by an export action) in one go. 141 142 **WARNING**: For bigger files (or if this method fails) 143 BulkConnection.download_file() should be used instead. 144 """ 145 return self.request( 146 "GET", 147 f"{self._api_main_url}/models/{model_id}/files/{file_id}", 148 headers={"Accept": MIMEType.APP_8STREAM.value}, 149 ) 150 151 def _get_chunks(self, model_id: str, file_id: str) -> Response: 152 """Get number of chunks available for an export.""" 153 url = f"{self._api_main_url}/models/{model_id}/files/{file_id}/chunks" 154 response = self.request("GET", url) 155 if not response.json()["meta"]["paging"]["currentPageSize"]: 156 raise Exception("Missing part in request response", url, response.text) 157 return response 158 159 def _get_chunk(self, model_id: str, file_id: str, chunk: int) -> Response: 160 """Get number of chunks available for an export.""" 161 url = f"{self._api_main_url}/models/{model_id}/files/{file_id}/chunks/{chunk}" 162 return self.request("GET", url, headers={"Accept": MIMEType.APP_8STREAM.value}) 163 164 def download_file(self, model_id: str, file_id: str) -> [bytes]: 165 """Download file (uploaded or generated by an export action) chunk by chunk. 166 167 Tip: For smaller files, much faster method (only one request is sent) 168 BulkConnection.get_file() can be used instead. 169 """ 170 response = self._get_chunks(model_id, file_id) 171 return ( 172 self._get_chunk(model_id, file_id, int(chunk_id["id"])).content 173 for chunk_id in response.json()["chunks"] 174 ) 175 176 def delete_file(self, model_id: str, file_id: str) -> Response: 177 """Delete previously uploaded file from the model's memory.""" 178 return self.request( 179 "DELETE", 180 f"{self._api_main_url}/models/{model_id}/files/{file_id}", 181 ) 182 183 # Run action 184 def generic_run_action( 185 self, 186 model_id: str, 187 action_id: str, 188 action_type: str, 189 data: dict = None, 190 ) -> Response: 191 """Run an action of given type. 192 193 Data parameter should be provided only for imports with mapping, 194 and processes that contain such imports. It should be dict of key-value pairs 195 of dimension & item that need to be applied to the execution of such action. 196 """ 197 mapping = DEFAULT_DATA.copy() 198 if data is not None: 199 mapping["mappingParameters"] = [ 200 {"entityType": key, "entityName": data[key]} for key in data 201 ] 202 return self.request( 203 "POST", 204 f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks", 205 data=json.dumps(mapping), 206 ) 207 208 def run_import(self, model_id: str, import_id: str, data: dict = None) -> Response: 209 """Run an import action. 210 211 Data parameter should be provided only for imports with mapping. 212 It should be dict of key-value pairs of dimension & item that need to be applied 213 to the execution of such import. 214 """ 215 return self.generic_run_action(model_id, import_id, "imports", data) 216 217 def run_export(self, model_id: str, export_id: str) -> Response: 218 """Run an export action.""" 219 return self.generic_run_action(model_id, export_id, "exports") 220 221 def run_action(self, model_id: str, action_id: str) -> Response: 222 """Run a deletion action.""" 223 return self.generic_run_action(model_id, action_id, "actions") 224 225 def run_process( 226 self, model_id: str, process_id: str, data: dict = None 227 ) -> Response: 228 """Run an import action. 229 230 Data parameter should be provided only for processes that contain 231 imports with mapping. It should be dict of key-value pairs of dimension & item 232 that need to be applied to the execution of such process. 233 """ 234 return self.generic_run_action(model_id, process_id, "processes", data) 235 236 # Get tasks 237 def generic_get_action_tasks( 238 self, model_id: str, action_id: str, action_type: str 239 ) -> Response: 240 """Get the list of action tasks of given type.""" 241 return self.request( 242 "GET", 243 f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks", 244 ) 245 246 def get_import_tasks(self, model_id: str, import_id: str) -> Response: 247 """Get the list of import tasks.""" 248 return self.generic_get_action_tasks(model_id, import_id, "imports") 249 250 def get_export_tasks(self, model_id: str, export_id: str) -> Response: 251 """Get the list of export tasks.""" 252 return self.generic_get_action_tasks(model_id, export_id, "exports") 253 254 def get_action_tasks(self, model_id: str, action_id: str) -> Response: 255 """Get the list of deletion tasks.""" 256 return self.generic_get_action_tasks(model_id, action_id, "actions") 257 258 def get_process_tasks(self, model_id: str, process_id: str) -> Response: 259 """Get the list of process tasks.""" 260 return self.generic_get_action_tasks(model_id, process_id, "processes") 261 262 # Get task 263 def generic_get_action_task( 264 self, model_id: str, action_id: str, task_id: str, action_type: str 265 ) -> Response: 266 """Get the status of an action task of given type.""" 267 return self.request( 268 "GET", 269 f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks/{task_id}", 270 ) 271 272 def get_import_task(self, model_id: str, import_id: str, task_id: str) -> Response: 273 """Get the status of an import task. 274 275 In case of failure, failure dump can be downloaded using 276 BulkConnection.get_import_dump(). 277 """ 278 return self.generic_get_action_task(model_id, import_id, task_id, "imports") 279 280 def get_export_task(self, model_id: str, export_id: str, task_id: str) -> Response: 281 """Get the status of an export task.""" 282 return self.generic_get_action_task(model_id, export_id, task_id, "exports") 283 284 def get_action_task(self, model_id: str, action_id: str, task_id: str) -> Response: 285 """Get the status of a deletion task.""" 286 return self.generic_get_action_task(model_id, action_id, task_id, "actions") 287 288 def get_process_task( 289 self, model_id: str, process_id: str, task_id: str 290 ) -> Response: 291 """Get the status of a process task. 292 293 In case of failure, use nested results to obtain objectId that can be used to 294 download failure dump using BulkConnection.get_process_dump(). 295 """ 296 return self.generic_get_action_task(model_id, process_id, task_id, "processes") 297 298 # Get dump 299 def get_import_dump(self, model_id: str, import_id: str, task_id: str) -> Response: 300 """Downloads import task failure dump file in one go. 301 302 **WARNING**: For bigger files (or if this method fails) 303 BulkConnection.download_import_dump() should be used instead. 304 """ 305 return self.request( 306 "GET", 307 f"{self._api_main_url}/models/{model_id}/imports/{import_id}/tasks/{task_id}/dump", 308 headers={"Accept": MIMEType.APP_8STREAM.value}, 309 ) 310 311 def download_import_dump( 312 self, model_id: str, import_id: str, task_id: str 313 ) -> bytes: 314 """Downloads import task failure dump file chunk by chunk. 315 316 Tip: For smaller files, much faster method (only one request is sent) 317 BulkConnection.get_import_dump() can be used instead. 318 """ 319 url = f"{self._api_main_url}/models/{model_id}/imports/{import_id}/tasks/{task_id}/dump/chunks" 320 response = self.request("GET", url) 321 if not response.json()["meta"]["paging"]["currentPageSize"]: 322 raise Exception("Missing part in request response", url, response.text) 323 return b"".join( 324 self.request( 325 "GET", 326 f"{url}/{chunk_id['id']}", 327 headers={"Accept": MIMEType.APP_8STREAM.value}, 328 ).content 329 for chunk_id in response.json()["chunks"] 330 ) 331 332 def get_process_dump( 333 self, model_id: str, process_id: str, task_id: str, object_id: str 334 ) -> Response: 335 """Downloads process task failure dump file in one go. 336 337 **WARNING**: For bigger files (or if this method fails) 338 BulkConnection.download_process_dump() should be used instead. 339 """ 340 return self.request( 341 "GET", 342 f"{self._api_main_url}/models/{model_id}/processes/{process_id}/tasks/{task_id}/dumps/{object_id}", 343 headers={"Accept": MIMEType.APP_8STREAM.value}, 344 ) 345 346 def download_process_dump( 347 self, model_id: str, process_id: str, task_id: str, object_id: str 348 ) -> bytes: 349 """Downloads process task failure dump file chunk by chunk. 350 351 Tip: For smaller files, much faster method (only one request is sent) 352 BulkConnection.get_process_dump() can be used instead. 353 """ 354 url = f"{self._api_main_url}/models/{model_id}/processes/{process_id}/tasks/{task_id}/dumps/{object_id}/chunks" 355 response = self.request("GET", url) 356 if not response.json()["meta"]["paging"]["currentPageSize"]: 357 raise Exception("Missing part in request response", url, response.text) 358 return b"".join( 359 self.request( 360 "GET", 361 f"{url}/{chunk_id['id']}", 362 headers={"Accept": MIMEType.APP_8STREAM.value}, 363 ).content 364 for chunk_id in response.json()["chunks"] 365 )
17class BulkConnection(BasicConnection): 18 """Anaplan connection with Bulk API functions.""" 19 20 # Actions 21 def generic_get_actions(self, model_id: str, action_type: str) -> Response: 22 """Get the list of available actions of given type.""" 23 return self.request( 24 "GET", 25 f"{self._api_main_url}/models/{model_id}/{action_type}", 26 ) 27 28 def get_imports(self, model_id: str) -> Response: 29 """Get the list of available imports.""" 30 return self.generic_get_actions(model_id, "imports") 31 32 def get_exports(self, model_id: str) -> Response: 33 """Get the list of available exports.""" 34 return self.generic_get_actions(model_id, "exports") 35 36 def get_actions(self, model_id: str) -> Response: 37 """Get the list of available deletions.""" 38 return self.generic_get_actions(model_id, "actions") 39 40 def get_processes(self, model_id: str) -> Response: 41 """Get the list of available processes.""" 42 return self.generic_get_actions(model_id, "processes") 43 44 def get_files(self, model_id: str) -> Response: 45 """Get the list of available files.""" 46 return self.generic_get_actions(model_id, "files") 47 48 # Action details 49 def get_import(self, model_id: str, import_id: str) -> Response: 50 """Get import details.""" 51 return self.request( 52 "GET", f"{self._api_main_url}/models/{model_id}/imports/{import_id}" 53 ) 54 55 def get_export(self, model_id: str, export_id: str) -> Response: 56 """Get export details.""" 57 return self.request( 58 "GET", f"{self._api_main_url}/models/{model_id}/exports/{export_id}" 59 ) 60 61 def get_process( 62 self, model_id: str, process_id: str, details: bool = None 63 ) -> Response: 64 """Get process details.""" 65 return self.request( 66 "GET", 67 f"{self._api_main_url}/models/{model_id}/processes/{process_id}", 68 {"showImportDataSource": self.details if details is None else details}, 69 ) 70 71 # Files manipulation 72 def put_file(self, model_id: str, file_id: str, data: bytes) -> Response: 73 """Upload file in one go. 74 75 **WARNING**: For bigger files (or if this method fails) 76 BulkConnection.upload_file() should be used instead. 77 """ 78 return self.request( 79 "PUT", 80 f"{self._api_main_url}/models/{model_id}/files/{file_id}", 81 data=data, 82 headers={"Content-Type": MIMEType.APP_8STREAM.value}, 83 ) 84 85 def _set_file_chunk_count( 86 self, model_id: str, file_id: str, count: int 87 ) -> Response: 88 """Set file chunk count before uploading content in chunks. 89 90 If you don't know how many chunks the file will have, set count to -1. 91 Chunks should have size between 1 and 50 MBs. 92 """ 93 return self.request( 94 "POST", 95 f"{self._api_main_url}/models/{model_id}/files/{file_id}", 96 data=json.dumps({"chunkCount": count}), 97 ) 98 99 def _upload_file_chunk( 100 self, 101 model_id: str, 102 file_id: str, 103 data: bytes, 104 chunk: int, 105 content_type: MIMEType = MIMEType.APP_8STREAM, 106 ) -> Response: 107 """Upload contents of a file chunk.""" 108 return self.request( 109 "PUT", 110 f"{self._api_main_url}/models/{model_id}/files/{file_id}/chunks/{chunk}", 111 data=data, 112 headers={"Content-Type": content_type.value}, 113 ) 114 115 def _set_file_upload_complete(self, model_id: str, file_id: str) -> Response: 116 """Finalize upload in chunks by setting it as complete.""" 117 return self.request( 118 "POST", 119 f"{self._api_main_url}/models/{model_id}/files/{file_id}/complete", 120 data=json.dumps({"id": file_id}), 121 ) 122 123 def upload_file( 124 self, 125 model_id: str, 126 file_id: str, 127 data: [bytes], 128 content_type: MIMEType = MIMEType.APP_8STREAM, 129 ) -> Response: 130 """Upload file (to be used by an import action) chunk by chunk. 131 132 Tip: For smaller files, much faster method (only one request is sent) 133 BulkConnection.put_file() can be used instead. 134 """ 135 self._set_file_chunk_count(model_id, file_id, -1) 136 for chunk_count, chunk in enumerate(data): 137 self._upload_file_chunk(model_id, file_id, chunk, chunk_count, content_type) 138 return self._set_file_upload_complete(model_id, file_id) 139 140 def get_file(self, model_id: str, file_id: str) -> Response: 141 """Download file (uploaded or generated by an export action) in one go. 142 143 **WARNING**: For bigger files (or if this method fails) 144 BulkConnection.download_file() should be used instead. 145 """ 146 return self.request( 147 "GET", 148 f"{self._api_main_url}/models/{model_id}/files/{file_id}", 149 headers={"Accept": MIMEType.APP_8STREAM.value}, 150 ) 151 152 def _get_chunks(self, model_id: str, file_id: str) -> Response: 153 """Get number of chunks available for an export.""" 154 url = f"{self._api_main_url}/models/{model_id}/files/{file_id}/chunks" 155 response = self.request("GET", url) 156 if not response.json()["meta"]["paging"]["currentPageSize"]: 157 raise Exception("Missing part in request response", url, response.text) 158 return response 159 160 def _get_chunk(self, model_id: str, file_id: str, chunk: int) -> Response: 161 """Get number of chunks available for an export.""" 162 url = f"{self._api_main_url}/models/{model_id}/files/{file_id}/chunks/{chunk}" 163 return self.request("GET", url, headers={"Accept": MIMEType.APP_8STREAM.value}) 164 165 def download_file(self, model_id: str, file_id: str) -> [bytes]: 166 """Download file (uploaded or generated by an export action) chunk by chunk. 167 168 Tip: For smaller files, much faster method (only one request is sent) 169 BulkConnection.get_file() can be used instead. 170 """ 171 response = self._get_chunks(model_id, file_id) 172 return ( 173 self._get_chunk(model_id, file_id, int(chunk_id["id"])).content 174 for chunk_id in response.json()["chunks"] 175 ) 176 177 def delete_file(self, model_id: str, file_id: str) -> Response: 178 """Delete previously uploaded file from the model's memory.""" 179 return self.request( 180 "DELETE", 181 f"{self._api_main_url}/models/{model_id}/files/{file_id}", 182 ) 183 184 # Run action 185 def generic_run_action( 186 self, 187 model_id: str, 188 action_id: str, 189 action_type: str, 190 data: dict = None, 191 ) -> Response: 192 """Run an action of given type. 193 194 Data parameter should be provided only for imports with mapping, 195 and processes that contain such imports. It should be dict of key-value pairs 196 of dimension & item that need to be applied to the execution of such action. 197 """ 198 mapping = DEFAULT_DATA.copy() 199 if data is not None: 200 mapping["mappingParameters"] = [ 201 {"entityType": key, "entityName": data[key]} for key in data 202 ] 203 return self.request( 204 "POST", 205 f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks", 206 data=json.dumps(mapping), 207 ) 208 209 def run_import(self, model_id: str, import_id: str, data: dict = None) -> Response: 210 """Run an import action. 211 212 Data parameter should be provided only for imports with mapping. 213 It should be dict of key-value pairs of dimension & item that need to be applied 214 to the execution of such import. 215 """ 216 return self.generic_run_action(model_id, import_id, "imports", data) 217 218 def run_export(self, model_id: str, export_id: str) -> Response: 219 """Run an export action.""" 220 return self.generic_run_action(model_id, export_id, "exports") 221 222 def run_action(self, model_id: str, action_id: str) -> Response: 223 """Run a deletion action.""" 224 return self.generic_run_action(model_id, action_id, "actions") 225 226 def run_process( 227 self, model_id: str, process_id: str, data: dict = None 228 ) -> Response: 229 """Run an import action. 230 231 Data parameter should be provided only for processes that contain 232 imports with mapping. It should be dict of key-value pairs of dimension & item 233 that need to be applied to the execution of such process. 234 """ 235 return self.generic_run_action(model_id, process_id, "processes", data) 236 237 # Get tasks 238 def generic_get_action_tasks( 239 self, model_id: str, action_id: str, action_type: str 240 ) -> Response: 241 """Get the list of action tasks of given type.""" 242 return self.request( 243 "GET", 244 f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks", 245 ) 246 247 def get_import_tasks(self, model_id: str, import_id: str) -> Response: 248 """Get the list of import tasks.""" 249 return self.generic_get_action_tasks(model_id, import_id, "imports") 250 251 def get_export_tasks(self, model_id: str, export_id: str) -> Response: 252 """Get the list of export tasks.""" 253 return self.generic_get_action_tasks(model_id, export_id, "exports") 254 255 def get_action_tasks(self, model_id: str, action_id: str) -> Response: 256 """Get the list of deletion tasks.""" 257 return self.generic_get_action_tasks(model_id, action_id, "actions") 258 259 def get_process_tasks(self, model_id: str, process_id: str) -> Response: 260 """Get the list of process tasks.""" 261 return self.generic_get_action_tasks(model_id, process_id, "processes") 262 263 # Get task 264 def generic_get_action_task( 265 self, model_id: str, action_id: str, task_id: str, action_type: str 266 ) -> Response: 267 """Get the status of an action task of given type.""" 268 return self.request( 269 "GET", 270 f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks/{task_id}", 271 ) 272 273 def get_import_task(self, model_id: str, import_id: str, task_id: str) -> Response: 274 """Get the status of an import task. 275 276 In case of failure, failure dump can be downloaded using 277 BulkConnection.get_import_dump(). 278 """ 279 return self.generic_get_action_task(model_id, import_id, task_id, "imports") 280 281 def get_export_task(self, model_id: str, export_id: str, task_id: str) -> Response: 282 """Get the status of an export task.""" 283 return self.generic_get_action_task(model_id, export_id, task_id, "exports") 284 285 def get_action_task(self, model_id: str, action_id: str, task_id: str) -> Response: 286 """Get the status of a deletion task.""" 287 return self.generic_get_action_task(model_id, action_id, task_id, "actions") 288 289 def get_process_task( 290 self, model_id: str, process_id: str, task_id: str 291 ) -> Response: 292 """Get the status of a process task. 293 294 In case of failure, use nested results to obtain objectId that can be used to 295 download failure dump using BulkConnection.get_process_dump(). 296 """ 297 return self.generic_get_action_task(model_id, process_id, task_id, "processes") 298 299 # Get dump 300 def get_import_dump(self, model_id: str, import_id: str, task_id: str) -> Response: 301 """Downloads import task failure dump file in one go. 302 303 **WARNING**: For bigger files (or if this method fails) 304 BulkConnection.download_import_dump() should be used instead. 305 """ 306 return self.request( 307 "GET", 308 f"{self._api_main_url}/models/{model_id}/imports/{import_id}/tasks/{task_id}/dump", 309 headers={"Accept": MIMEType.APP_8STREAM.value}, 310 ) 311 312 def download_import_dump( 313 self, model_id: str, import_id: str, task_id: str 314 ) -> bytes: 315 """Downloads import task failure dump file chunk by chunk. 316 317 Tip: For smaller files, much faster method (only one request is sent) 318 BulkConnection.get_import_dump() can be used instead. 319 """ 320 url = f"{self._api_main_url}/models/{model_id}/imports/{import_id}/tasks/{task_id}/dump/chunks" 321 response = self.request("GET", url) 322 if not response.json()["meta"]["paging"]["currentPageSize"]: 323 raise Exception("Missing part in request response", url, response.text) 324 return b"".join( 325 self.request( 326 "GET", 327 f"{url}/{chunk_id['id']}", 328 headers={"Accept": MIMEType.APP_8STREAM.value}, 329 ).content 330 for chunk_id in response.json()["chunks"] 331 ) 332 333 def get_process_dump( 334 self, model_id: str, process_id: str, task_id: str, object_id: str 335 ) -> Response: 336 """Downloads process task failure dump file in one go. 337 338 **WARNING**: For bigger files (or if this method fails) 339 BulkConnection.download_process_dump() should be used instead. 340 """ 341 return self.request( 342 "GET", 343 f"{self._api_main_url}/models/{model_id}/processes/{process_id}/tasks/{task_id}/dumps/{object_id}", 344 headers={"Accept": MIMEType.APP_8STREAM.value}, 345 ) 346 347 def download_process_dump( 348 self, model_id: str, process_id: str, task_id: str, object_id: str 349 ) -> bytes: 350 """Downloads process task failure dump file chunk by chunk. 351 352 Tip: For smaller files, much faster method (only one request is sent) 353 BulkConnection.get_process_dump() can be used instead. 354 """ 355 url = f"{self._api_main_url}/models/{model_id}/processes/{process_id}/tasks/{task_id}/dumps/{object_id}/chunks" 356 response = self.request("GET", url) 357 if not response.json()["meta"]["paging"]["currentPageSize"]: 358 raise Exception("Missing part in request response", url, response.text) 359 return b"".join( 360 self.request( 361 "GET", 362 f"{url}/{chunk_id['id']}", 363 headers={"Accept": MIMEType.APP_8STREAM.value}, 364 ).content 365 for chunk_id in response.json()["chunks"] 366 )
Anaplan connection with Bulk API functions.
21 def generic_get_actions(self, model_id: str, action_type: str) -> Response: 22 """Get the list of available actions of given type.""" 23 return self.request( 24 "GET", 25 f"{self._api_main_url}/models/{model_id}/{action_type}", 26 )
Get the list of available actions of given type.
28 def get_imports(self, model_id: str) -> Response: 29 """Get the list of available imports.""" 30 return self.generic_get_actions(model_id, "imports")
Get the list of available imports.
32 def get_exports(self, model_id: str) -> Response: 33 """Get the list of available exports.""" 34 return self.generic_get_actions(model_id, "exports")
Get the list of available exports.
36 def get_actions(self, model_id: str) -> Response: 37 """Get the list of available deletions.""" 38 return self.generic_get_actions(model_id, "actions")
Get the list of available deletions.
40 def get_processes(self, model_id: str) -> Response: 41 """Get the list of available processes.""" 42 return self.generic_get_actions(model_id, "processes")
Get the list of available processes.
44 def get_files(self, model_id: str) -> Response: 45 """Get the list of available files.""" 46 return self.generic_get_actions(model_id, "files")
Get the list of available files.
49 def get_import(self, model_id: str, import_id: str) -> Response: 50 """Get import details.""" 51 return self.request( 52 "GET", f"{self._api_main_url}/models/{model_id}/imports/{import_id}" 53 )
Get import details.
55 def get_export(self, model_id: str, export_id: str) -> Response: 56 """Get export details.""" 57 return self.request( 58 "GET", f"{self._api_main_url}/models/{model_id}/exports/{export_id}" 59 )
Get export details.
61 def get_process( 62 self, model_id: str, process_id: str, details: bool = None 63 ) -> Response: 64 """Get process details.""" 65 return self.request( 66 "GET", 67 f"{self._api_main_url}/models/{model_id}/processes/{process_id}", 68 {"showImportDataSource": self.details if details is None else details}, 69 )
Get process details.
72 def put_file(self, model_id: str, file_id: str, data: bytes) -> Response: 73 """Upload file in one go. 74 75 **WARNING**: For bigger files (or if this method fails) 76 BulkConnection.upload_file() should be used instead. 77 """ 78 return self.request( 79 "PUT", 80 f"{self._api_main_url}/models/{model_id}/files/{file_id}", 81 data=data, 82 headers={"Content-Type": MIMEType.APP_8STREAM.value}, 83 )
Upload file in one go.
WARNING: For bigger files (or if this method fails) BulkConnection.upload_file() should be used instead.
123 def upload_file( 124 self, 125 model_id: str, 126 file_id: str, 127 data: [bytes], 128 content_type: MIMEType = MIMEType.APP_8STREAM, 129 ) -> Response: 130 """Upload file (to be used by an import action) chunk by chunk. 131 132 Tip: For smaller files, much faster method (only one request is sent) 133 BulkConnection.put_file() can be used instead. 134 """ 135 self._set_file_chunk_count(model_id, file_id, -1) 136 for chunk_count, chunk in enumerate(data): 137 self._upload_file_chunk(model_id, file_id, chunk, chunk_count, content_type) 138 return self._set_file_upload_complete(model_id, file_id)
Upload file (to be used by an import action) chunk by chunk.
Tip: For smaller files, much faster method (only one request is sent) BulkConnection.put_file() can be used instead.
140 def get_file(self, model_id: str, file_id: str) -> Response: 141 """Download file (uploaded or generated by an export action) in one go. 142 143 **WARNING**: For bigger files (or if this method fails) 144 BulkConnection.download_file() should be used instead. 145 """ 146 return self.request( 147 "GET", 148 f"{self._api_main_url}/models/{model_id}/files/{file_id}", 149 headers={"Accept": MIMEType.APP_8STREAM.value}, 150 )
Download file (uploaded or generated by an export action) in one go.
WARNING: For bigger files (or if this method fails) BulkConnection.download_file() should be used instead.
165 def download_file(self, model_id: str, file_id: str) -> [bytes]: 166 """Download file (uploaded or generated by an export action) chunk by chunk. 167 168 Tip: For smaller files, much faster method (only one request is sent) 169 BulkConnection.get_file() can be used instead. 170 """ 171 response = self._get_chunks(model_id, file_id) 172 return ( 173 self._get_chunk(model_id, file_id, int(chunk_id["id"])).content 174 for chunk_id in response.json()["chunks"] 175 )
Download file (uploaded or generated by an export action) chunk by chunk.
Tip: For smaller files, much faster method (only one request is sent) BulkConnection.get_file() can be used instead.
177 def delete_file(self, model_id: str, file_id: str) -> Response: 178 """Delete previously uploaded file from the model's memory.""" 179 return self.request( 180 "DELETE", 181 f"{self._api_main_url}/models/{model_id}/files/{file_id}", 182 )
Delete previously uploaded file from the model's memory.
185 def generic_run_action( 186 self, 187 model_id: str, 188 action_id: str, 189 action_type: str, 190 data: dict = None, 191 ) -> Response: 192 """Run an action of given type. 193 194 Data parameter should be provided only for imports with mapping, 195 and processes that contain such imports. It should be dict of key-value pairs 196 of dimension & item that need to be applied to the execution of such action. 197 """ 198 mapping = DEFAULT_DATA.copy() 199 if data is not None: 200 mapping["mappingParameters"] = [ 201 {"entityType": key, "entityName": data[key]} for key in data 202 ] 203 return self.request( 204 "POST", 205 f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks", 206 data=json.dumps(mapping), 207 )
Run an action of given type.
Data parameter should be provided only for imports with mapping, and processes that contain such imports. It should be dict of key-value pairs of dimension & item that need to be applied to the execution of such action.
209 def run_import(self, model_id: str, import_id: str, data: dict = None) -> Response: 210 """Run an import action. 211 212 Data parameter should be provided only for imports with mapping. 213 It should be dict of key-value pairs of dimension & item that need to be applied 214 to the execution of such import. 215 """ 216 return self.generic_run_action(model_id, import_id, "imports", data)
Run an import action.
Data parameter should be provided only for imports with mapping. It should be dict of key-value pairs of dimension & item that need to be applied to the execution of such import.
218 def run_export(self, model_id: str, export_id: str) -> Response: 219 """Run an export action.""" 220 return self.generic_run_action(model_id, export_id, "exports")
Run an export action.
222 def run_action(self, model_id: str, action_id: str) -> Response: 223 """Run a deletion action.""" 224 return self.generic_run_action(model_id, action_id, "actions")
Run a deletion action.
226 def run_process( 227 self, model_id: str, process_id: str, data: dict = None 228 ) -> Response: 229 """Run an import action. 230 231 Data parameter should be provided only for processes that contain 232 imports with mapping. It should be dict of key-value pairs of dimension & item 233 that need to be applied to the execution of such process. 234 """ 235 return self.generic_run_action(model_id, process_id, "processes", data)
Run an import action.
Data parameter should be provided only for processes that contain imports with mapping. It should be dict of key-value pairs of dimension & item that need to be applied to the execution of such process.
238 def generic_get_action_tasks( 239 self, model_id: str, action_id: str, action_type: str 240 ) -> Response: 241 """Get the list of action tasks of given type.""" 242 return self.request( 243 "GET", 244 f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks", 245 )
Get the list of action tasks of given type.
247 def get_import_tasks(self, model_id: str, import_id: str) -> Response: 248 """Get the list of import tasks.""" 249 return self.generic_get_action_tasks(model_id, import_id, "imports")
Get the list of import tasks.
251 def get_export_tasks(self, model_id: str, export_id: str) -> Response: 252 """Get the list of export tasks.""" 253 return self.generic_get_action_tasks(model_id, export_id, "exports")
Get the list of export tasks.
255 def get_action_tasks(self, model_id: str, action_id: str) -> Response: 256 """Get the list of deletion tasks.""" 257 return self.generic_get_action_tasks(model_id, action_id, "actions")
Get the list of deletion tasks.
259 def get_process_tasks(self, model_id: str, process_id: str) -> Response: 260 """Get the list of process tasks.""" 261 return self.generic_get_action_tasks(model_id, process_id, "processes")
Get the list of process tasks.
264 def generic_get_action_task( 265 self, model_id: str, action_id: str, task_id: str, action_type: str 266 ) -> Response: 267 """Get the status of an action task of given type.""" 268 return self.request( 269 "GET", 270 f"{self._api_main_url}/models/{model_id}/{action_type}/{action_id}/tasks/{task_id}", 271 )
Get the status of an action task of given type.
273 def get_import_task(self, model_id: str, import_id: str, task_id: str) -> Response: 274 """Get the status of an import task. 275 276 In case of failure, failure dump can be downloaded using 277 BulkConnection.get_import_dump(). 278 """ 279 return self.generic_get_action_task(model_id, import_id, task_id, "imports")
Get the status of an import task.
In case of failure, failure dump can be downloaded using BulkConnection.get_import_dump().
281 def get_export_task(self, model_id: str, export_id: str, task_id: str) -> Response: 282 """Get the status of an export task.""" 283 return self.generic_get_action_task(model_id, export_id, task_id, "exports")
Get the status of an export task.
285 def get_action_task(self, model_id: str, action_id: str, task_id: str) -> Response: 286 """Get the status of a deletion task.""" 287 return self.generic_get_action_task(model_id, action_id, task_id, "actions")
Get the status of a deletion task.
289 def get_process_task( 290 self, model_id: str, process_id: str, task_id: str 291 ) -> Response: 292 """Get the status of a process task. 293 294 In case of failure, use nested results to obtain objectId that can be used to 295 download failure dump using BulkConnection.get_process_dump(). 296 """ 297 return self.generic_get_action_task(model_id, process_id, task_id, "processes")
Get the status of a process task.
In case of failure, use nested results to obtain objectId that can be used to download failure dump using BulkConnection.get_process_dump().
300 def get_import_dump(self, model_id: str, import_id: str, task_id: str) -> Response: 301 """Downloads import task failure dump file in one go. 302 303 **WARNING**: For bigger files (or if this method fails) 304 BulkConnection.download_import_dump() should be used instead. 305 """ 306 return self.request( 307 "GET", 308 f"{self._api_main_url}/models/{model_id}/imports/{import_id}/tasks/{task_id}/dump", 309 headers={"Accept": MIMEType.APP_8STREAM.value}, 310 )
Downloads import task failure dump file in one go.
WARNING: For bigger files (or if this method fails) BulkConnection.download_import_dump() should be used instead.
312 def download_import_dump( 313 self, model_id: str, import_id: str, task_id: str 314 ) -> bytes: 315 """Downloads import task failure dump file chunk by chunk. 316 317 Tip: For smaller files, much faster method (only one request is sent) 318 BulkConnection.get_import_dump() can be used instead. 319 """ 320 url = f"{self._api_main_url}/models/{model_id}/imports/{import_id}/tasks/{task_id}/dump/chunks" 321 response = self.request("GET", url) 322 if not response.json()["meta"]["paging"]["currentPageSize"]: 323 raise Exception("Missing part in request response", url, response.text) 324 return b"".join( 325 self.request( 326 "GET", 327 f"{url}/{chunk_id['id']}", 328 headers={"Accept": MIMEType.APP_8STREAM.value}, 329 ).content 330 for chunk_id in response.json()["chunks"] 331 )
Downloads import task failure dump file chunk by chunk.
Tip: For smaller files, much faster method (only one request is sent) BulkConnection.get_import_dump() can be used instead.
333 def get_process_dump( 334 self, model_id: str, process_id: str, task_id: str, object_id: str 335 ) -> Response: 336 """Downloads process task failure dump file in one go. 337 338 **WARNING**: For bigger files (or if this method fails) 339 BulkConnection.download_process_dump() should be used instead. 340 """ 341 return self.request( 342 "GET", 343 f"{self._api_main_url}/models/{model_id}/processes/{process_id}/tasks/{task_id}/dumps/{object_id}", 344 headers={"Accept": MIMEType.APP_8STREAM.value}, 345 )
Downloads process task failure dump file in one go.
WARNING: For bigger files (or if this method fails) BulkConnection.download_process_dump() should be used instead.
347 def download_process_dump( 348 self, model_id: str, process_id: str, task_id: str, object_id: str 349 ) -> bytes: 350 """Downloads process task failure dump file chunk by chunk. 351 352 Tip: For smaller files, much faster method (only one request is sent) 353 BulkConnection.get_process_dump() can be used instead. 354 """ 355 url = f"{self._api_main_url}/models/{model_id}/processes/{process_id}/tasks/{task_id}/dumps/{object_id}/chunks" 356 response = self.request("GET", url) 357 if not response.json()["meta"]["paging"]["currentPageSize"]: 358 raise Exception("Missing part in request response", url, response.text) 359 return b"".join( 360 self.request( 361 "GET", 362 f"{url}/{chunk_id['id']}", 363 headers={"Accept": MIMEType.APP_8STREAM.value}, 364 ).content 365 for chunk_id in response.json()["chunks"] 366 )
Downloads process task failure dump file chunk by chunk.
Tip: For smaller files, much faster method (only one request is sent) BulkConnection.get_process_dump() can be used instead.