Skip to content

Modules

Main Fusion module.

Fusion

Core Fusion class for API access.

Source code in py_src/fusion/fusion.py
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
class Fusion:
    """Core Fusion class for API access."""

    @staticmethod
    def _call_for_dataframe(url: str, session: requests.Session) -> pd.DataFrame:
        """Private function that calls an API endpoint and returns the data as a pandas dataframe.

        Args:
            url (Union[FusionCredentials, Union[str, dict]): URL for an API endpoint with valid parameters.
            session (requests.Session): Specify a proxy if required to access the authentication server. Defaults to {}.

        Returns:
            pandas.DataFrame: a dataframe containing the requested data.
        """
        response = session.get(url)
        response.raise_for_status()
        table = response.json()["resources"]
        ret_df = pd.DataFrame(table).reset_index(drop=True)
        return ret_df

    @staticmethod
    def _call_for_bytes_object(url: str, session: requests.Session) -> BytesIO:
        """Private function that calls an API endpoint and returns the data as a bytes object in memory.

        Args:
            url (Union[FusionCredentials, Union[str, dict]): URL for an API endpoint with valid parameters.
            session (requests.Session): Specify a proxy if required to access the authentication server. Defaults to {}.

        Returns:
            io.BytesIO: in memory file content
        """

        response = session.get(url)
        response.raise_for_status()

        return BytesIO(response.content)

    def __init__(
        self,
        credentials: Union[str, FusionCredentials] = "config/client_credentials.json",
        root_url: str = "https://fusion.jpmorgan.com/api/v1/",
        download_folder: str = "downloads",
        log_level: int = logging.ERROR,
        fs: fsspec.filesystem = None,
        log_path: str = ".",
    ) -> None:
        """Constructor to instantiate a new Fusion object.

        Args:
            credentials (Union[str, FusionCredentials]): A path to a credentials file or a fully populated
            FusionCredentials object. Defaults to 'config/client_credentials.json'.
            root_url (_type_, optional): The API root URL.
                Defaults to "https://fusion.jpmorgan.com/api/v1/".
            download_folder (str, optional): The folder path where downloaded data files
                are saved. Defaults to "downloads".
            log_level (int, optional): Set the logging level. Defaults to logging.ERROR.
            fs (fsspec.filesystem): filesystem.
            log_path (str, optional): The folder path where the log is stored.
        """
        self._default_catalog = "common"

        self.root_url = root_url
        self.download_folder = download_folder
        Path(download_folder).mkdir(parents=True, exist_ok=True)

        if logger.hasHandlers():
            logger.handlers.clear()
        file_handler = logging.FileHandler(filename=f"{log_path}/fusion_sdk.log")
        logging.addLevelName(VERBOSE_LVL, "VERBOSE")
        stdout_handler = logging.StreamHandler(sys.stdout)
        formatter = logging.Formatter(
            "%(asctime)s.%(msecs)03d %(name)s:%(levelname)s %(message)s",
            datefmt="%Y-%m-%d %H:%M:%S",
        )
        stdout_handler.setFormatter(formatter)
        logger.addHandler(stdout_handler)
        logger.addHandler(file_handler)
        logger.setLevel(log_level)

        if isinstance(credentials, FusionCredentials):
            self.credentials = credentials
        elif isinstance(credentials, str):
            self.credentials = FusionCredentials.from_file(Path(credentials))
        else:
            raise ValueError(
                "credentials must be a path to a credentials file or a dictionary containing the required keys"
            )

        self.session = get_session(self.credentials, self.root_url)
        self.fs = fs if fs else get_default_fs()
        self.events: Optional[pd.DataFrame] = None

    def __repr__(self) -> str:
        """Object representation to list all available methods."""
        return "Fusion object \nAvailable methods:\n" + tabulate(
            pd.DataFrame(  # type: ignore
                [
                    [
                        method_name
                        for method_name in dir(Fusion)
                        if callable(getattr(Fusion, method_name)) and not method_name.startswith("_")
                    ]
                    + [p for p in dir(Fusion) if isinstance(getattr(Fusion, p), property)],
                    [
                        getattr(Fusion, method_name).__doc__.split("\n")[0]
                        for method_name in dir(Fusion)
                        if callable(getattr(Fusion, method_name)) and not method_name.startswith("_")
                    ]
                    + [
                        getattr(Fusion, p).__doc__.split("\n")[0]
                        for p in dir(Fusion)
                        if isinstance(getattr(Fusion, p), property)
                    ],
                ]
            ).T.set_index(0),
            tablefmt="psql",
        )

    @property
    def default_catalog(self) -> str:
        """Returns the default catalog.

        Returns:
            None
        """
        return self._default_catalog

    @default_catalog.setter
    def default_catalog(self, catalog: str) -> None:
        """Allow the default catalog, which is "common" to be overridden.

        Args:
            catalog (str): The catalog to use as the default

        Returns:
            None
        """
        self._default_catalog = catalog

    def _use_catalog(self, catalog: Optional[str]) -> str:
        """Determine which catalog to use in an API call.

        Args:
            catalog (str): The catalog value passed as an argument to an API function wrapper.

        Returns:
            str: The catalog to use
        """
        if catalog is None:
            return self.default_catalog

        return catalog

    def get_fusion_filesystem(self) -> FusionHTTPFileSystem:
        """Creates Fusion Filesystem.

        Returns: Fusion Filesystem

        """
        return FusionHTTPFileSystem(client_kwargs={"root_url": self.root_url, "credentials": self.credentials})

    def list_catalogs(self, output: bool = False) -> pd.DataFrame:
        """Lists the catalogs available to the API account.

        Args:
            output (bool, optional): If True then print the dataframe. Defaults to False.

        Returns:
            class:`pandas.DataFrame`: A dataframe with a row for each catalog
        """
        url = f"{self.root_url}catalogs/"
        cat_df = Fusion._call_for_dataframe(url, self.session)

        if output:
            pass

        return cat_df

    def catalog_resources(self, catalog: Optional[str] = None, output: bool = False) -> pd.DataFrame:
        """List the resources contained within the catalog, for example products and datasets.

        Args:
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
            output (bool, optional): If True then print the dataframe. Defaults to False.

        Returns:
           class:`pandas.DataFrame`: A dataframe with a row for each resource within the catalog
        """
        catalog = self._use_catalog(catalog)

        url = f"{self.root_url}catalogs/{catalog}"
        cat_df = Fusion._call_for_dataframe(url, self.session)

        if output:
            pass

        return cat_df

    def list_products(
        self,
        contains: Optional[Union[str, list[str]]] = None,
        id_contains: bool = False,
        catalog: Optional[str] = None,
        output: bool = False,
        max_results: int = -1,
        display_all_columns: bool = False,
    ) -> pd.DataFrame:
        """Get the products contained in a catalog. A product is a grouping of datasets.

        Args:
            contains (Union[str, list], optional): A string or a list of strings that are product
                identifiers to filter the products list. If a list is provided then it will return
                products whose identifier matches any of the strings. Defaults to None.
            id_contains (bool): Filter datasets only where the string(s) are contained in the identifier,
                ignoring description.
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
            output (bool, optional): If True then print the dataframe. Defaults to False.
            max_results (int, optional): Limit the number of rows returned in the dataframe.
                Defaults to -1 which returns all results.
            display_all_columns (bool, optional): If True displays all columns returned by the API,
                otherwise only the key columns are displayed

        Returns:
            class:`pandas.DataFrame`: a dataframe with a row for each product
        """
        catalog = self._use_catalog(catalog)

        url = f"{self.root_url}catalogs/{catalog}/products"
        full_prod_df: pd.DataFrame = Fusion._call_for_dataframe(url, self.session)

        if contains:
            if isinstance(contains, list):
                contains = "|".join(f"{s}" for s in contains)
            if id_contains:
                filtered_df = full_prod_df[full_prod_df["identifier"].str.contains(contains, case=False)]
            else:
                filtered_df = full_prod_df[
                    full_prod_df["identifier"].str.contains(contains, case=False)
                    | full_prod_df["description"].str.contains(contains, case=False)
                ]
        else:
            filtered_df = full_prod_df

        filtered_df["category"] = filtered_df.category.str.join(", ")
        filtered_df["region"] = filtered_df.region.str.join(", ")
        if not display_all_columns:
            filtered_df = filtered_df[
                filtered_df.columns.intersection(
                    [
                        "identifier",
                        "title",
                        "region",
                        "category",
                        "status",
                        "description",
                    ]
                )
            ]

        if max_results > -1:
            filtered_df = filtered_df[0:max_results]

        if output:
            pass

        return filtered_df

    def list_datasets(  # noqa: PLR0913
        self,
        contains: Optional[Union[str, list[str]]] = None,
        id_contains: bool = False,
        product: Optional[Union[str, list[str]]] = None,
        catalog: Optional[str] = None,
        output: bool = False,
        max_results: int = -1,
        display_all_columns: bool = False,
        status: Optional[str] = None,
    ) -> pd.DataFrame:
        """Get the datasets contained in a catalog.

        Args:
            contains (Union[str, list], optional): A string or a list of strings that are dataset
                identifiers to filter the datasets list. If a list is provided then it will return
                datasets whose identifier matches any of the strings. Defaults to None.
            id_contains (bool): Filter datasets only where the string(s) are contained in the identifier,
                ignoring description.
            product (Union[str, list], optional): A string or a list of strings that are product
                identifiers to filter the datasets list. Defaults to None.
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
            output (bool, optional): If True then print the dataframe. Defaults to False.
            max_results (int, optional): Limit the number of rows returned in the dataframe.
                Defaults to -1 which returns all results.
            display_all_columns (bool, optional): If True displays all columns returned by the API,
                otherwise only the key columns are displayed
            status (str, optional): filter the datasets by status, default is to show all results.

        Returns:
            class:`pandas.DataFrame`: a dataframe with a row for each dataset.
        """
        catalog = self._use_catalog(catalog)

        url = f"{self.root_url}catalogs/{catalog}/datasets"
        ds_df = Fusion._call_for_dataframe(url, self.session)

        if contains:
            if isinstance(contains, list):
                contains = "|".join(f"{s}" for s in contains)
            if id_contains:
                ds_df = ds_df[ds_df["identifier"].str.contains(contains, case=False)]
            else:
                ds_df = ds_df[
                    ds_df["identifier"].str.contains(contains, case=False)
                    | ds_df["description"].str.contains(contains, case=False)
                ]

        if product:
            url = f"{self.root_url}catalogs/{catalog}/productDatasets"
            prd_df = Fusion._call_for_dataframe(url, self.session)
            prd_df = (
                prd_df[prd_df["product"] == product]
                if isinstance(product, str)
                else prd_df[prd_df["product"].isin(product)]
            )
            ds_df = ds_df[ds_df["identifier"].isin(prd_df["dataset"])].reset_index(drop=True)

        if max_results > -1:
            ds_df = ds_df[0:max_results]

        ds_df["category"] = ds_df.category.str.join(", ")
        ds_df["region"] = ds_df.region.str.join(", ")
        if not display_all_columns:
            cols = [
                "identifier",
                "title",
                "containerType",
                "region",
                "category",
                "coverageStartDate",
                "coverageEndDate",
                "description",
                "status",
            ]
            cols = [c for c in cols if c in ds_df.columns]
            ds_df = ds_df[cols]

        if status is not None:
            ds_df = ds_df[ds_df["status"] == status]

        if output:
            pass

        return ds_df

    def dataset_resources(self, dataset: str, catalog: Optional[str] = None, output: bool = False) -> pd.DataFrame:
        """List the resources available for a dataset, currently this will always be a datasetseries.

        Args:
            dataset (str): A dataset identifier
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
            output (bool, optional): If True then print the dataframe. Defaults to False.

        Returns:
            class:`pandas.DataFrame`: A dataframe with a row for each resource
        """
        catalog = self._use_catalog(catalog)

        url = f"{self.root_url}catalogs/{catalog}/datasets/{dataset}"
        ds_res_df = Fusion._call_for_dataframe(url, self.session)

        if output:
            pass

        return ds_res_df

    def list_dataset_attributes(
        self,
        dataset: str,
        catalog: Optional[str] = None,
        output: bool = False,
        display_all_columns: bool = False,
    ) -> pd.DataFrame:
        """Returns the list of attributes that are in the dataset.

        Args:
            dataset (str): A dataset identifier
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
            output (bool, optional): If True then print the dataframe. Defaults to False.
            display_all_columns (bool, optional): If True displays all columns returned by the API,
                otherwise only the key columns are displayed

        Returns:
            class:`pandas.DataFrame`: A dataframe with a row for each attribute
        """
        catalog = self._use_catalog(catalog)

        url = f"{self.root_url}catalogs/{catalog}/datasets/{dataset}/attributes"
        ds_attr_df = Fusion._call_for_dataframe(url, self.session).sort_values(by="index").reset_index(drop=True)

        if not display_all_columns:
            ds_attr_df = ds_attr_df[
                ds_attr_df.columns.intersection(
                    [
                        "identifier",
                        "title",
                        "dataType",
                        "isDatasetKey",
                        "description",
                        "source",
                    ]
                )
            ]

        if output:
            pass

        return ds_attr_df

    def list_datasetmembers(
        self,
        dataset: str,
        catalog: Optional[str] = None,
        output: bool = False,
        max_results: int = -1,
    ) -> pd.DataFrame:
        """List the available members in the dataset series.

        Args:
            dataset (str): A dataset identifier
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
            output (bool, optional): If True then print the dataframe. Defaults to False.
            max_results (int, optional): Limit the number of rows returned in the dataframe.
                Defaults to -1 which returns all results.

        Returns:
            class:`pandas.DataFrame`: a dataframe with a row for each dataset member.
        """
        catalog = self._use_catalog(catalog)

        url = f"{self.root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries"
        ds_members_df = Fusion._call_for_dataframe(url, self.session)

        if max_results > -1:
            ds_members_df = ds_members_df[0:max_results]

        if output:
            pass

        return ds_members_df

    def datasetmember_resources(
        self, dataset: str, series: str, catalog: Optional[str] = None, output: bool = False
    ) -> pd.DataFrame:
        """List the available resources for a datasetseries member.

        Args:
            dataset (str): A dataset identifier
            series (str): The datasetseries identifier
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
            output (bool, optional): If True then print the dataframe. Defaults to False.

        Returns:
            class:`pandas.DataFrame`: A dataframe with a row for each datasetseries member resource.
                Currently, this will always be distributions.
        """
        catalog = self._use_catalog(catalog)

        url = f"{self.root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/{series}"
        ds_mem_res_df = Fusion._call_for_dataframe(url, self.session)

        if output:
            pass

        return ds_mem_res_df

    def list_distributions(
        self, dataset: str, series: str, catalog: Optional[str] = None, output: bool = False
    ) -> pd.DataFrame:
        """List the available distributions (downloadable instances of the dataset with a format type).

        Args:
            dataset (str): A dataset identifier
            series (str): The datasetseries identifier
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
            output (bool, optional): If True then print the dataframe. Defaults to False.

        Returns:
            class:`pandas.DataFrame`: A dataframe with a row for each distribution.
        """
        catalog = self._use_catalog(catalog)

        url = f"{self.root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/{series}/distributions"
        distros_df = Fusion._call_for_dataframe(url, self.session)

        if output:
            pass

        return distros_df

    def _resolve_distro_tuples(
        self,
        dataset: str,
        dt_str: str = "latest",
        dataset_format: str = "parquet",
        catalog: Optional[str] = None,
    ) -> list[tuple[str, str, str, str]]:
        """Resolve distribution tuples given specification params.

        A private utility function to generate a list of distribution tuples.
        Each tuple is a distribution, identified by catalog, dataset id,
        datasetseries member id, and the file format.

        Args:
            dataset (str): A dataset identifier
            dt_str (str, optional): Either a single date or a range identified by a start or end date,
                or both separated with a ":". Defaults to 'latest' which will return the most recent
                instance of the dataset.
            dataset_format (str, optional): The file format, e.g. CSV or Parquet. Defaults to 'parquet'.
            catalog (str, optional): A catalog identifier. Defaults to 'common'.

        Returns:
            list: a list of tuples, one for each distribution
        """
        catalog = self._use_catalog(catalog)

        datasetseries_list = self.list_datasetmembers(dataset, catalog)
        if len(datasetseries_list) == 0:
            raise AssertionError(f"There are no dataset members for dataset {dataset} in catalog {catalog}")

        if datasetseries_list.empty:
            raise APIResponseError(  # pragma: no cover
                f"No data available for dataset {dataset}. "
                f"Check that a valid dataset identifier and date/date range has been set."
            )

        if dt_str == "latest":
            dt_str = datasetseries_list.iloc[datasetseries_list["createdDate"].to_numpy().argmax()]["identifier"]

        parsed_dates = normalise_dt_param_str(dt_str)
        if len(parsed_dates) == 1:
            parsed_dates = (parsed_dates[0], parsed_dates[0])

        if parsed_dates[0]:
            datasetseries_list = datasetseries_list[
                pd.Series([pd.to_datetime(i) for i in datasetseries_list["identifier"]])
                >= pd.to_datetime(parsed_dates[0])
            ].reset_index()

        if parsed_dates[1]:
            datasetseries_list = datasetseries_list[
                pd.Series([pd.to_datetime(i) for i in datasetseries_list["identifier"]])
                <= pd.to_datetime(parsed_dates[1])
            ].reset_index()

        if len(datasetseries_list) == 0:
            raise APIResponseError(  # pragma: no cover
                f"No data available for dataset {dataset} in catalog {catalog}.\n"
                f"Check that a valid dataset identifier and date/date range has been set."
            )

        required_series = list(datasetseries_list["@id"])
        tups = [(catalog, dataset, series, dataset_format) for series in required_series]

        return tups

    def download(  # noqa: PLR0912, PLR0913
        self,
        dataset: str,
        dt_str: str = "latest",
        dataset_format: str = "parquet",
        catalog: Optional[str] = None,
        n_par: Optional[int] = None,
        show_progress: bool = True,
        force_download: bool = False,
        download_folder: Optional[str] = None,
        return_paths: bool = False,
        partitioning: Optional[str] = None,
        preserve_original_name: bool = False,
    ) -> Optional[list[tuple[bool, str, Optional[str]]]]:
        """Downloads the requested distributions of a dataset to disk.

        Args:
            dataset (str): A dataset identifier
            dt_str (str, optional): Either a single date or a range identified by a start or end date,
                or both separated with a ":". Defaults to 'latest' which will return the most recent
                instance of the dataset.
            dataset_format (str, optional): The file format, e.g. CSV or Parquet. Defaults to 'parquet'.
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
            n_par (int, optional): Specify how many distributions to download in parallel.
                Defaults to all cpus available.
            show_progress (bool, optional): Display a progress bar during data download Defaults to True.
            force_download (bool, optional): If True then will always download a file even
                if it is already on disk. Defaults to True.
            download_folder (str, optional): The path, absolute or relative, where downloaded files are saved.
                Defaults to download_folder as set in __init__
            return_paths (bool, optional): Return paths and success statuses of the downloaded files.
            partitioning (str, optional): Partitioning specification.
            preserve_original_name (bool, optional): Preserve the original name of the file. Defaults to False.

        Returns:

        """
        catalog = self._use_catalog(catalog)

        valid_date_range = re.compile(r"^(\d{4}\d{2}\d{2})$|^((\d{4}\d{2}\d{2})?([:])(\d{4}\d{2}\d{2})?)$")

        if valid_date_range.match(dt_str) or dt_str == "latest":
            required_series = self._resolve_distro_tuples(dataset, dt_str, dataset_format, catalog)
        else:
            # sample data is limited to csv
            if dt_str == "sample":
                dataset_format = "csv"
            required_series = [(catalog, dataset, dt_str, dataset_format)]

        if dataset_format not in RECOGNIZED_FORMATS + ["raw"]:
            raise ValueError(f"Dataset format {dataset_format} is not supported")

        if not download_folder:
            download_folder = self.download_folder

        download_folders = [download_folder] * len(required_series)

        if partitioning == "hive":
            members = [series[2].strip("/") for series in required_series]
            download_folders = [
                f"{download_folders[i]}/{series[0]}/{series[1]}/{members[i]}"
                for i, series in enumerate(required_series)
            ]

        for d in download_folders:
            if not self.fs.exists(d):
                self.fs.mkdir(d, create_parents=True)

        n_par = cpu_count(n_par)
        download_spec = [
            {
                "lfs": self.fs,
                "rpath": distribution_to_url(
                    self.root_url,
                    series[1],
                    series[2],
                    series[3],
                    series[0],
                    is_download=True,
                ),
                "lpath": distribution_to_filename(
                    download_folders[i],
                    series[1],
                    series[2],
                    series[3],
                    series[0],
                    partitioning=partitioning,
                ),
                "overwrite": force_download,
                "preserve_original_name": preserve_original_name,
            }
            for i, series in enumerate(required_series)
        ]

        logger.log(
            VERBOSE_LVL,
            f"Beginning {len(download_spec)} downloads in batches of {n_par}",
        )
        if show_progress:
            with joblib_progress("Downloading", total=len(download_spec)):
                res = Parallel(n_jobs=n_par)(
                    delayed(self.get_fusion_filesystem().download)(**spec) for spec in download_spec
                )
        else:
            res = Parallel(n_jobs=n_par)(
                delayed(self.get_fusion_filesystem().download)(**spec) for spec in download_spec
            )

        if (len(res) > 0) and (not all(r[0] for r in res)):
            for r in res:
                if not r[0]:
                    warnings.warn(f"The download of {r[1]} was not successful", stacklevel=2)
        return res if return_paths else None

    def to_df(  # noqa: PLR0913
        self,
        dataset: str,
        dt_str: str = "latest",
        dataset_format: str = "parquet",
        catalog: Optional[str] = None,
        n_par: Optional[int] = None,
        show_progress: bool = True,
        columns: Optional[list[str]] = None,
        filters: Optional[PyArrowFilterT] = None,
        force_download: bool = False,
        download_folder: Optional[str] = None,
        dataframe_type: str = "pandas",
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Gets distributions for a specified date or date range and returns the data as a dataframe.

        Args:
            dataset (str): A dataset identifier
            dt_str (str, optional): Either a single date or a range identified by a start or end date,
                or both separated with a ":". Defaults to 'latest' which will return the most recent
                instance of the dataset.
            dataset_format (str, optional): The file format, e.g. CSV or Parquet. Defaults to 'parquet'.
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
            n_par (int, optional): Specify how many distributions to download in parallel.
                Defaults to all cpus available.
            show_progress (bool, optional): Display a progress bar during data download Defaults to True.
            columns (List, optional): A list of columns to return from a parquet file. Defaults to None
            filters (List, optional): List[Tuple] or List[List[Tuple]] or None (default)
                Rows which do not match the filter predicate will be removed from scanned data.
                Partition keys embedded in a nested directory structure will be exploited to avoid
                loading files at all if they contain no matching rows. If use_legacy_dataset is True,
                filters can only reference partition keys and only a hive-style directory structure
                is supported. When setting use_legacy_dataset to False, also within-file level filtering
                and different partitioning schemes are supported.
                More on https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
            force_download (bool, optional): If True then will always download a file even
                if it is already on disk. Defaults to False.
            download_folder (str, optional): The path, absolute or relative, where downloaded files are saved.
                Defaults to download_folder as set in __init__
            dataframe_type (str, optional): Type
        Returns:
            class:`pandas.DataFrame`: a dataframe containing the requested data.
                If multiple dataset instances are retrieved then these are concatenated first.
        """
        catalog = self._use_catalog(catalog)

        # sample data is limited to csv
        if dt_str == "sample":
            dataset_format = "csv"

        if not download_folder:
            download_folder = self.download_folder
        download_res = self.download(
            dataset,
            dt_str,
            dataset_format,
            catalog,
            n_par,
            show_progress,
            force_download,
            download_folder,
            return_paths=True,
        )

        if not download_res:
            raise ValueError("Must specify 'return_paths=True' in download call to use this function")

        if not all(res[0] for res in download_res):
            failed_res = [res for res in download_res if not res[0]]
            raise Exception(
                f"Not all downloads were successfully completed. "
                f"Re-run to collect missing files. The following failed:\n{failed_res}"
            )

        files = [res[1] for res in download_res]

        pd_read_fn_map = {
            "csv": read_csv,
            "parquet": read_parquet,
            "parq": read_parquet,
            "json": read_json,
            "raw": read_csv,
        }

        pd_read_default_kwargs: dict[str, dict[str, object]] = {
            "csv": {
                "columns": columns,
                "filters": filters,
                "fs": self.fs,
                "dataframe_type": dataframe_type,
            },
            "parquet": {
                "columns": columns,
                "filters": filters,
                "fs": self.fs,
                "dataframe_type": dataframe_type,
            },
            "json": {
                "columns": columns,
                "filters": filters,
                "fs": self.fs,
                "dataframe_type": dataframe_type,
            },
            "raw": {
                "columns": columns,
                "filters": filters,
                "fs": self.fs,
                "dataframe_type": dataframe_type,
            },
        }

        pd_read_default_kwargs["parq"] = pd_read_default_kwargs["parquet"]

        pd_reader = pd_read_fn_map.get(dataset_format)
        pd_read_kwargs = pd_read_default_kwargs.get(dataset_format, {})
        if not pd_reader:
            raise Exception(f"No pandas function to read file in format {dataset_format}")

        pd_read_kwargs.update(kwargs)

        if len(files) == 0:
            raise APIResponseError(
                f"No series members for dataset: {dataset} "
                f"in date or date range: {dt_str} and format: {dataset_format}"
            )
        if dataset_format in ["parquet", "parq"]:
            data_df = pd_reader(files, **pd_read_kwargs)  # type: ignore
        elif dataset_format == "raw":
            dataframes = (
                pd.concat(
                    [pd_reader(ZipFile(f).open(p), **pd_read_kwargs) for p in ZipFile(f).namelist()],  # type: ignore
                    ignore_index=True,
                )
                for f in files
            )
            data_df = pd.concat(dataframes, ignore_index=True)
        else:
            dataframes = (pd_reader(f, **pd_read_kwargs) for f in files)  # type: ignore
            if dataframe_type == "pandas":
                data_df = pd.concat(dataframes, ignore_index=True)
            if dataframe_type == "polars":
                import polars as pl

                data_df = pl.concat(dataframes, how="diagonal")  # type: ignore

        return data_df

    def to_bytes(
        self,
        dataset: str,
        series_member: str,
        dataset_format: str = "parquet",
        catalog: Optional[str] = None,
    ) -> BytesIO:
        """Returns an instance of dataset (the distribution) as a bytes object.

        Args:
            dataset (str): A dataset identifier
            series_member (str,): A dataset series member identifier
            dataset_format (str, optional): The file format, e.g. CSV or Parquet. Defaults to 'parquet'.
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
        """

        catalog = self._use_catalog(catalog)

        url = distribution_to_url(
            self.root_url,
            dataset,
            series_member,
            dataset_format,
            catalog,
        )

        return Fusion._call_for_bytes_object(url, self.session)

    def to_table(  # noqa: PLR0913
        self,
        dataset: str,
        dt_str: str = "latest",
        dataset_format: str = "parquet",
        catalog: Optional[str] = None,
        n_par: Optional[int] = None,
        show_progress: bool = True,
        columns: Optional[list[str]] = None,
        filters: Optional[PyArrowFilterT] = None,
        force_download: bool = False,
        download_folder: Optional[str] = None,
        **kwargs: Any,
    ) -> pa.Table:
        """Gets distributions for a specified date or date range and returns the data as an arrow table.

        Args:
            dataset (str): A dataset identifier
            dt_str (str, optional): Either a single date or a range identified by a start or end date,
                or both separated with a ":". Defaults to 'latest' which will return the most recent
                instance of the dataset.
            dataset_format (str, optional): The file format, e.g. CSV or Parquet. Defaults to 'parquet'.
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
            n_par (int, optional): Specify how many distributions to download in parallel.
                Defaults to all cpus available.
            show_progress (bool, optional): Display a progress bar during data download Defaults to True.
            columns (List, optional): A list of columns to return from a parquet file. Defaults to None
            filters (List, optional): List[Tuple] or List[List[Tuple]] or None (default)
                Rows which do not match the filter predicate will be removed from scanned data.
                Partition keys embedded in a nested directory structure will be exploited to avoid
                loading files at all if they contain no matching rows. If use_legacy_dataset is True,
                filters can only reference partition keys and only a hive-style directory structure
                is supported. When setting use_legacy_dataset to False, also within-file level filtering
                and different partitioning schemes are supported.
                More on https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
            force_download (bool, optional): If True then will always download a file even
                if it is already on disk. Defaults to False.
            download_folder (str, optional): The path, absolute or relative, where downloaded files are saved.
                Defaults to download_folder as set in __init__
        Returns:
            class:`pyarrow.Table`: a dataframe containing the requested data.
                If multiple dataset instances are retrieved then these are concatenated first.
        """
        catalog = self._use_catalog(catalog)
        n_par = cpu_count(n_par)
        if not download_folder:
            download_folder = self.download_folder
        download_res = self.download(
            dataset,
            dt_str,
            dataset_format,
            catalog,
            n_par,
            show_progress,
            force_download,
            download_folder,
            return_paths=True,
        )

        if not download_res:
            raise ValueError("Must specify 'return_paths=True' in download call to use this function")

        if not all(res[0] for res in download_res):
            failed_res = [res for res in download_res if not res[0]]
            raise RuntimeError(
                f"Not all downloads were successfully completed. "
                f"Re-run to collect missing files. The following failed:\n{failed_res}"
            )

        files = [res[1] for res in download_res]

        read_fn_map = {
            "csv": csv_to_table,
            "parquet": parquet_to_table,
            "parq": parquet_to_table,
            "json": json_to_table,
            "raw": csv_to_table,
        }

        read_default_kwargs: dict[str, dict[str, object]] = {
            "csv": {"columns": columns, "filters": filters, "fs": self.fs},
            "parquet": {"columns": columns, "filters": filters, "fs": self.fs},
            "json": {"columns": columns, "filters": filters, "fs": self.fs},
            "raw": {"columns": columns, "filters": filters, "fs": self.fs},
        }

        read_default_kwargs["parq"] = read_default_kwargs["parquet"]

        reader = read_fn_map.get(dataset_format)
        read_kwargs = read_default_kwargs.get(dataset_format, {})
        if not reader:
            raise AssertionError(f"No function to read file in format {dataset_format}")

        read_kwargs.update(kwargs)

        if len(files) == 0:
            raise APIResponseError(
                f"No series members for dataset: {dataset} "
                f"in date or date range: {dt_str} and format: {dataset_format}"
            )
        if dataset_format in ["parquet", "parq"]:
            tbl = reader(files, **read_kwargs)  # type: ignore
        else:
            tbl = (reader(f, **read_kwargs) for f in files)  # type: ignore
            tbl = pa.concat_tables(tbl)

        return tbl

    def upload(  # noqa: PLR0913
        self,
        path: str,
        dataset: Optional[str] = None,
        dt_str: str = "latest",
        catalog: Optional[str] = None,
        n_par: Optional[int] = None,
        show_progress: bool = True,
        return_paths: bool = False,
        multipart: bool = True,
        chunk_size: int = 5 * 2**20,
        from_date: Optional[str] = None,
        to_date: Optional[str] = None,
        preserve_original_name: Optional[bool] = False,
    ) -> Optional[list[tuple[bool, str, Optional[str]]]]:
        """Uploads the requested files/files to Fusion.

        Args:
            path (str): path to a file or a folder with files
            dataset (str, optional): Dataset name to which the file will be uplaoded (for single file only).
                                    If not provided the dataset will be implied from file's name.
            dt_str (str, optional): A file name. Can be any string but is usually a date.
                                    Defaults to 'latest' which will return the most recent.
                                    Relevant for a single file upload only. If not provided the dataset will
                                    be implied from file's name.
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
            n_par (int, optional): Specify how many distributions to download in parallel.
                Defaults to all cpus available.
            show_progress (bool, optional): Display a progress bar during data download Defaults to True.
            return_paths (bool, optional): Return paths and success statuses of the downloaded files.
            multipart (bool, optional): Is multipart upload.
            chunk_size (int, optional): Maximum chunk size.
            from_date (str, optional): start of the data date range contained in the distribution,
                defaults to upoad date
            to_date (str, optional): end of the data date range contained in the distribution,
                defaults to upload date.
            preserve_original_name (bool, optional): Preserve the original name of the file. Defaults to False.

        Returns:


        """
        catalog = self._use_catalog(catalog)

        if not self.fs.exists(path):
            raise RuntimeError("The provided path does not exist")

        fs_fusion = self.get_fusion_filesystem()
        if self.fs.info(path)["type"] == "directory":
            file_path_lst = self.fs.find(path)
            local_file_validation = validate_file_names(file_path_lst, fs_fusion)
            file_path_lst = [f for flag, f in zip(local_file_validation, file_path_lst) if flag]
            file_name = [f.split("/")[-1] for f in file_path_lst]
            is_raw_lst = is_dataset_raw(file_path_lst, fs_fusion)
            local_url_eqiv = [path_to_url(i, r) for i, r in zip(file_path_lst, is_raw_lst)]
        else:
            file_path_lst = [path]
            if not catalog or not dataset:
                local_file_validation = validate_file_names(file_path_lst, fs_fusion)
                file_path_lst = [f for flag, f in zip(local_file_validation, file_path_lst) if flag]
                is_raw_lst = is_dataset_raw(file_path_lst, fs_fusion)
                local_url_eqiv = [path_to_url(i, r) for i, r in zip(file_path_lst, is_raw_lst)]
                if preserve_original_name:
                    raise ValueError("preserve_original_name can only be used when catalog and dataset are provided.")
            else:
                date_identifier = re.compile(r"^(\d{4})(\d{2})(\d{2})$")
                if date_identifier.match(dt_str):
                    dt_str = dt_str if dt_str != "latest" else pd.Timestamp("today").date().strftime("%Y%m%d")
                    dt_str = pd.Timestamp(dt_str).date().strftime("%Y%m%d")

                if catalog not in fs_fusion.ls("") or dataset not in [
                    i.split("/")[-1] for i in fs_fusion.ls(f"{catalog}/datasets")
                ]:
                    msg = (
                        f"File file has not been uploaded, one of the catalog: {catalog} "
                        f"or dataset: {dataset} does not exit."
                    )
                    warnings.warn(msg, stacklevel=2)
                    return [(False, path, msg)]
                file_format = path.split(".")[-1]
                file_name = [path.split("/")[-1]]
                file_format = "raw" if file_format not in RECOGNIZED_FORMATS else file_format

                local_url_eqiv = [
                    "/".join(distribution_to_url("", dataset, dt_str, file_format, catalog, False).split("/")[1:])
                ]

        if not preserve_original_name:
            data_map_df = pd.DataFrame([file_path_lst, local_url_eqiv]).T
            data_map_df.columns = pd.Index(["path", "url"])
        else:
            data_map_df = pd.DataFrame([file_path_lst, local_url_eqiv, file_name]).T
            data_map_df.columns = pd.Index(["path", "url", "file_name"])

        n_par = cpu_count(n_par)
        parallel = len(data_map_df) > 1
        res = upload_files(
            fs_fusion,
            self.fs,
            data_map_df,
            parallel=parallel,
            n_par=n_par,
            multipart=multipart,
            chunk_size=chunk_size,
            show_progress=show_progress,
            from_date=from_date,
            to_date=to_date,
        )

        if not all(r[0] for r in res):
            failed_res = [r for r in res if not r[0]]
            msg = f"Not all uploads were successfully completed. The following failed:\n{failed_res}"
            logger.warning(msg)
            warnings.warn(msg, stacklevel=2)

        return res if return_paths else None

    def from_bytes(  # noqa: PLR0913
        self,
        data: BytesIO,
        dataset: Optional[str] = None,
        series_member: str = "latest",
        catalog: Optional[str] = None,
        distribution: str = "parquet",
        show_progress: bool = True,
        return_paths: bool = False,
        chunk_size: int = 5 * 2**20,
        from_date: Optional[str] = None,
        to_date: Optional[str] = None,
        file_name: Optional[str] = None,
        **kwargs: Any,  # noqa: ARG002
    ) -> Optional[list[tuple[bool, str, Optional[str]]]]:
        """Uploads data from an object in memory.

        Args:
            data (str): an object in memory to upload
            dataset (str, optional): Dataset name to which the file will be uploaded (for single file only).
                                    If not provided the dataset will be implied from file's name.
            series_member (str, optional): A single date or label. Defaults to 'latest' which will return
                the most recent.
            catalog (str, optional): A catalog identifier. Defaults to 'common'.
            distribution (str, optional): A distribution type, e.g. a file format or raw
            show_progress (bool, optional): Display a progress bar during data download Defaults to True.
            return_paths (bool, optional): Return paths and success statuses of the downloaded files.
            chunk_size (int, optional): Maximum chunk size.
            from_date (str, optional): start of the data date range contained in the distribution,
                defaults to upload date
            to_date (str, optional): end of the data date range contained in the distribution, defaults to upload date.
            file_name (str, optional): file name to be used for the uploaded file. Defaults to Fusion standard naming.

        Returns:


        """
        catalog = self._use_catalog(catalog)

        fs_fusion = self.get_fusion_filesystem()
        if distribution not in RECOGNIZED_FORMATS + ["raw"]:
            raise ValueError(f"Dataset format {distribution} is not supported")

        is_raw = js.loads(fs_fusion.cat(f"{catalog}/datasets/{dataset}"))["isRawData"]
        local_url_eqiv = path_to_url(f"{dataset}__{catalog}__{series_member}.{distribution}", is_raw)

        data_map_df = pd.DataFrame(["", local_url_eqiv, file_name]).T
        data_map_df.columns = ["path", "url", "file_name"]  # type: ignore

        res = upload_files(
            fs_fusion,
            data,
            data_map_df,
            parallel=False,
            n_par=1,
            multipart=False,
            chunk_size=chunk_size,
            show_progress=show_progress,
            from_date=from_date,
            to_date=to_date,
        )

        if not all(r[0] for r in res):
            failed_res = [r for r in res if not r[0]]
            msg = f"Not all uploads were successfully completed. The following failed:\n{failed_res}"
            logger.warning(msg)
            warnings.warn(msg, stacklevel=2)

        return res if return_paths else None

    def listen_to_events(
        self,
        last_event_id: Optional[str] = None,
        catalog: Optional[str] = None,
        url: str = "https://fusion.jpmorgan.com/api/v1/",
    ) -> Union[None, pd.DataFrame]:
        """Run server sent event listener in the background. Retrieve results by running get_events.

        Args:
            last_event_id (str): Last event ID (exclusive).
            catalog (str): catalog.
            url (str): subscription url.
        Returns:
            Union[None, class:`pandas.DataFrame`]: If in_background is True then the function returns no output.
                If in_background is set to False then pandas DataFrame is output upon keyboard termination.
        """

        catalog = self._use_catalog(catalog)
        import asyncio
        import json
        import threading

        from aiohttp_sse_client import client as sse_client

        from .utils import get_client

        kwargs: dict[str, Any] = {}
        if last_event_id:
            kwargs = {"headers": {"Last-Event-ID": last_event_id}}

        async def async_events() -> None:
            """Events sync function.

            Returns:
                None
            """
            timeout = 1e100
            session = await get_client(self.credentials, timeout=timeout)
            async with sse_client.EventSource(
                f"{url}catalogs/{catalog}/notifications/subscribe",
                session=session,
                **kwargs,
            ) as messages:
                try:
                    async for msg in messages:
                        event = json.loads(msg.data)
                        if self.events is None:
                            self.events = pd.DataFrame()
                        else:
                            self.events = pd.concat([self.events, pd.DataFrame(event)], ignore_index=True)
                except TimeoutError as ex:
                    raise ex from None
                except BaseException:
                    raise

        _ = self.list_catalogs()  # refresh token
        if "headers" in kwargs:
            kwargs["headers"].update({"authorization": f"bearer {self.credentials.bearer_token}"})
        else:
            kwargs["headers"] = {
                "authorization": f"bearer {self.credentials.bearer_token}",
            }
        if "http" in self.credentials.proxies:
            kwargs["proxy"] = self.credentials.proxies["http"]
        elif "https" in self.credentials.proxies:
            kwargs["proxy"] = self.credentials.proxies["https"]
        th = threading.Thread(target=asyncio.run, args=(async_events(),), daemon=True)
        th.start()
        return None

    def get_events(
        self,
        last_event_id: Optional[str] = None,
        catalog: Optional[str] = None,
        in_background: bool = True,
        url: str = "https://fusion.jpmorgan.com/api/v1/",
    ) -> Union[None, pd.DataFrame]:
        """Run server sent event listener and print out the new events. Keyboard terminate to stop.

        Args:
            last_event_id (str): id of the last event.
            catalog (str): catalog.
            in_background (bool): execute event monitoring in the background (default = True).
            url (str): subscription url.
        Returns:
            Union[None, class:`pandas.DataFrame`]: If in_background is True then the function returns no output.
                If in_background is set to False then pandas DataFrame is output upon keyboard termination.
        """

        catalog = self._use_catalog(catalog)
        if not in_background:
            from sseclient import SSEClient

            _ = self.list_catalogs()  # refresh token
            messages = SSEClient(
                session=self.session,
                url=f"{url}catalogs/{catalog}/notifications/subscribe",
                last_id=last_event_id,
                headers={
                    "authorization": f"bearer {self.credentials.bearer_token}",
                },
            )
            lst = []
            try:
                for msg in messages:
                    event = js.loads(msg.data)
                    if event["type"] != "HeartBeatNotification":
                        lst.append(event)
            except KeyboardInterrupt:
                return pd.DataFrame(lst)
            except Exception as e:
                raise e
            finally:
                return None  # noqa: B012, SIM107
        else:
            return self.events

default_catalog: str property writable

Returns the default catalog.

Returns:

Type Description
str

None

__init__(credentials='config/client_credentials.json', root_url='https://fusion.jpmorgan.com/api/v1/', download_folder='downloads', log_level=logging.ERROR, fs=None, log_path='.')

Constructor to instantiate a new Fusion object.

Parameters:

Name Type Description Default
credentials Union[str, FusionCredentials]

A path to a credentials file or a fully populated

'config/client_credentials.json'
root_url _type_

The API root URL. Defaults to "https://fusion.jpmorgan.com/api/v1/".

'https://fusion.jpmorgan.com/api/v1/'
download_folder str

The folder path where downloaded data files are saved. Defaults to "downloads".

'downloads'
log_level int

Set the logging level. Defaults to logging.ERROR.

ERROR
fs filesystem

filesystem.

None
log_path str

The folder path where the log is stored.

'.'
Source code in py_src/fusion/fusion.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def __init__(
    self,
    credentials: Union[str, FusionCredentials] = "config/client_credentials.json",
    root_url: str = "https://fusion.jpmorgan.com/api/v1/",
    download_folder: str = "downloads",
    log_level: int = logging.ERROR,
    fs: fsspec.filesystem = None,
    log_path: str = ".",
) -> None:
    """Constructor to instantiate a new Fusion object.

    Args:
        credentials (Union[str, FusionCredentials]): A path to a credentials file or a fully populated
        FusionCredentials object. Defaults to 'config/client_credentials.json'.
        root_url (_type_, optional): The API root URL.
            Defaults to "https://fusion.jpmorgan.com/api/v1/".
        download_folder (str, optional): The folder path where downloaded data files
            are saved. Defaults to "downloads".
        log_level (int, optional): Set the logging level. Defaults to logging.ERROR.
        fs (fsspec.filesystem): filesystem.
        log_path (str, optional): The folder path where the log is stored.
    """
    self._default_catalog = "common"

    self.root_url = root_url
    self.download_folder = download_folder
    Path(download_folder).mkdir(parents=True, exist_ok=True)

    if logger.hasHandlers():
        logger.handlers.clear()
    file_handler = logging.FileHandler(filename=f"{log_path}/fusion_sdk.log")
    logging.addLevelName(VERBOSE_LVL, "VERBOSE")
    stdout_handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter(
        "%(asctime)s.%(msecs)03d %(name)s:%(levelname)s %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    stdout_handler.setFormatter(formatter)
    logger.addHandler(stdout_handler)
    logger.addHandler(file_handler)
    logger.setLevel(log_level)

    if isinstance(credentials, FusionCredentials):
        self.credentials = credentials
    elif isinstance(credentials, str):
        self.credentials = FusionCredentials.from_file(Path(credentials))
    else:
        raise ValueError(
            "credentials must be a path to a credentials file or a dictionary containing the required keys"
        )

    self.session = get_session(self.credentials, self.root_url)
    self.fs = fs if fs else get_default_fs()
    self.events: Optional[pd.DataFrame] = None

__repr__()

Object representation to list all available methods.

Source code in py_src/fusion/fusion.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def __repr__(self) -> str:
    """Object representation to list all available methods."""
    return "Fusion object \nAvailable methods:\n" + tabulate(
        pd.DataFrame(  # type: ignore
            [
                [
                    method_name
                    for method_name in dir(Fusion)
                    if callable(getattr(Fusion, method_name)) and not method_name.startswith("_")
                ]
                + [p for p in dir(Fusion) if isinstance(getattr(Fusion, p), property)],
                [
                    getattr(Fusion, method_name).__doc__.split("\n")[0]
                    for method_name in dir(Fusion)
                    if callable(getattr(Fusion, method_name)) and not method_name.startswith("_")
                ]
                + [
                    getattr(Fusion, p).__doc__.split("\n")[0]
                    for p in dir(Fusion)
                    if isinstance(getattr(Fusion, p), property)
                ],
            ]
        ).T.set_index(0),
        tablefmt="psql",
    )

catalog_resources(catalog=None, output=False)

List the resources contained within the catalog, for example products and datasets.

Parameters:

Name Type Description Default
catalog str

A catalog identifier. Defaults to 'common'.

None
output bool

If True then print the dataframe. Defaults to False.

False

Returns:

Name Type Description
class DataFrame

pandas.DataFrame: A dataframe with a row for each resource within the catalog

Source code in py_src/fusion/fusion.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def catalog_resources(self, catalog: Optional[str] = None, output: bool = False) -> pd.DataFrame:
    """List the resources contained within the catalog, for example products and datasets.

    Args:
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
        output (bool, optional): If True then print the dataframe. Defaults to False.

    Returns:
       class:`pandas.DataFrame`: A dataframe with a row for each resource within the catalog
    """
    catalog = self._use_catalog(catalog)

    url = f"{self.root_url}catalogs/{catalog}"
    cat_df = Fusion._call_for_dataframe(url, self.session)

    if output:
        pass

    return cat_df

dataset_resources(dataset, catalog=None, output=False)

List the resources available for a dataset, currently this will always be a datasetseries.

Parameters:

Name Type Description Default
dataset str

A dataset identifier

required
catalog str

A catalog identifier. Defaults to 'common'.

None
output bool

If True then print the dataframe. Defaults to False.

False

Returns:

Name Type Description
class DataFrame

pandas.DataFrame: A dataframe with a row for each resource

Source code in py_src/fusion/fusion.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
def dataset_resources(self, dataset: str, catalog: Optional[str] = None, output: bool = False) -> pd.DataFrame:
    """List the resources available for a dataset, currently this will always be a datasetseries.

    Args:
        dataset (str): A dataset identifier
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
        output (bool, optional): If True then print the dataframe. Defaults to False.

    Returns:
        class:`pandas.DataFrame`: A dataframe with a row for each resource
    """
    catalog = self._use_catalog(catalog)

    url = f"{self.root_url}catalogs/{catalog}/datasets/{dataset}"
    ds_res_df = Fusion._call_for_dataframe(url, self.session)

    if output:
        pass

    return ds_res_df

datasetmember_resources(dataset, series, catalog=None, output=False)

List the available resources for a datasetseries member.

Parameters:

Name Type Description Default
dataset str

A dataset identifier

required
series str

The datasetseries identifier

required
catalog str

A catalog identifier. Defaults to 'common'.

None
output bool

If True then print the dataframe. Defaults to False.

False

Returns:

Name Type Description
class DataFrame

pandas.DataFrame: A dataframe with a row for each datasetseries member resource. Currently, this will always be distributions.

Source code in py_src/fusion/fusion.py
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
def datasetmember_resources(
    self, dataset: str, series: str, catalog: Optional[str] = None, output: bool = False
) -> pd.DataFrame:
    """List the available resources for a datasetseries member.

    Args:
        dataset (str): A dataset identifier
        series (str): The datasetseries identifier
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
        output (bool, optional): If True then print the dataframe. Defaults to False.

    Returns:
        class:`pandas.DataFrame`: A dataframe with a row for each datasetseries member resource.
            Currently, this will always be distributions.
    """
    catalog = self._use_catalog(catalog)

    url = f"{self.root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/{series}"
    ds_mem_res_df = Fusion._call_for_dataframe(url, self.session)

    if output:
        pass

    return ds_mem_res_df

download(dataset, dt_str='latest', dataset_format='parquet', catalog=None, n_par=None, show_progress=True, force_download=False, download_folder=None, return_paths=False, partitioning=None, preserve_original_name=False)

Downloads the requested distributions of a dataset to disk.

Parameters:

Name Type Description Default
dataset str

A dataset identifier

required
dt_str str

Either a single date or a range identified by a start or end date, or both separated with a ":". Defaults to 'latest' which will return the most recent instance of the dataset.

'latest'
dataset_format str

The file format, e.g. CSV or Parquet. Defaults to 'parquet'.

'parquet'
catalog str

A catalog identifier. Defaults to 'common'.

None
n_par int

Specify how many distributions to download in parallel. Defaults to all cpus available.

None
show_progress bool

Display a progress bar during data download Defaults to True.

True
force_download bool

If True then will always download a file even if it is already on disk. Defaults to True.

False
download_folder str

The path, absolute or relative, where downloaded files are saved. Defaults to download_folder as set in init

None
return_paths bool

Return paths and success statuses of the downloaded files.

False
partitioning str

Partitioning specification.

None
preserve_original_name bool

Preserve the original name of the file. Defaults to False.

False
Source code in py_src/fusion/fusion.py
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
def download(  # noqa: PLR0912, PLR0913
    self,
    dataset: str,
    dt_str: str = "latest",
    dataset_format: str = "parquet",
    catalog: Optional[str] = None,
    n_par: Optional[int] = None,
    show_progress: bool = True,
    force_download: bool = False,
    download_folder: Optional[str] = None,
    return_paths: bool = False,
    partitioning: Optional[str] = None,
    preserve_original_name: bool = False,
) -> Optional[list[tuple[bool, str, Optional[str]]]]:
    """Downloads the requested distributions of a dataset to disk.

    Args:
        dataset (str): A dataset identifier
        dt_str (str, optional): Either a single date or a range identified by a start or end date,
            or both separated with a ":". Defaults to 'latest' which will return the most recent
            instance of the dataset.
        dataset_format (str, optional): The file format, e.g. CSV or Parquet. Defaults to 'parquet'.
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
        n_par (int, optional): Specify how many distributions to download in parallel.
            Defaults to all cpus available.
        show_progress (bool, optional): Display a progress bar during data download Defaults to True.
        force_download (bool, optional): If True then will always download a file even
            if it is already on disk. Defaults to True.
        download_folder (str, optional): The path, absolute or relative, where downloaded files are saved.
            Defaults to download_folder as set in __init__
        return_paths (bool, optional): Return paths and success statuses of the downloaded files.
        partitioning (str, optional): Partitioning specification.
        preserve_original_name (bool, optional): Preserve the original name of the file. Defaults to False.

    Returns:

    """
    catalog = self._use_catalog(catalog)

    valid_date_range = re.compile(r"^(\d{4}\d{2}\d{2})$|^((\d{4}\d{2}\d{2})?([:])(\d{4}\d{2}\d{2})?)$")

    if valid_date_range.match(dt_str) or dt_str == "latest":
        required_series = self._resolve_distro_tuples(dataset, dt_str, dataset_format, catalog)
    else:
        # sample data is limited to csv
        if dt_str == "sample":
            dataset_format = "csv"
        required_series = [(catalog, dataset, dt_str, dataset_format)]

    if dataset_format not in RECOGNIZED_FORMATS + ["raw"]:
        raise ValueError(f"Dataset format {dataset_format} is not supported")

    if not download_folder:
        download_folder = self.download_folder

    download_folders = [download_folder] * len(required_series)

    if partitioning == "hive":
        members = [series[2].strip("/") for series in required_series]
        download_folders = [
            f"{download_folders[i]}/{series[0]}/{series[1]}/{members[i]}"
            for i, series in enumerate(required_series)
        ]

    for d in download_folders:
        if not self.fs.exists(d):
            self.fs.mkdir(d, create_parents=True)

    n_par = cpu_count(n_par)
    download_spec = [
        {
            "lfs": self.fs,
            "rpath": distribution_to_url(
                self.root_url,
                series[1],
                series[2],
                series[3],
                series[0],
                is_download=True,
            ),
            "lpath": distribution_to_filename(
                download_folders[i],
                series[1],
                series[2],
                series[3],
                series[0],
                partitioning=partitioning,
            ),
            "overwrite": force_download,
            "preserve_original_name": preserve_original_name,
        }
        for i, series in enumerate(required_series)
    ]

    logger.log(
        VERBOSE_LVL,
        f"Beginning {len(download_spec)} downloads in batches of {n_par}",
    )
    if show_progress:
        with joblib_progress("Downloading", total=len(download_spec)):
            res = Parallel(n_jobs=n_par)(
                delayed(self.get_fusion_filesystem().download)(**spec) for spec in download_spec
            )
    else:
        res = Parallel(n_jobs=n_par)(
            delayed(self.get_fusion_filesystem().download)(**spec) for spec in download_spec
        )

    if (len(res) > 0) and (not all(r[0] for r in res)):
        for r in res:
            if not r[0]:
                warnings.warn(f"The download of {r[1]} was not successful", stacklevel=2)
    return res if return_paths else None

from_bytes(data, dataset=None, series_member='latest', catalog=None, distribution='parquet', show_progress=True, return_paths=False, chunk_size=5 * 2 ** 20, from_date=None, to_date=None, file_name=None, **kwargs)

Uploads data from an object in memory.

Parameters:

Name Type Description Default
data str

an object in memory to upload

required
dataset str

Dataset name to which the file will be uploaded (for single file only). If not provided the dataset will be implied from file's name.

None
series_member str

A single date or label. Defaults to 'latest' which will return the most recent.

'latest'
catalog str

A catalog identifier. Defaults to 'common'.

None
distribution str

A distribution type, e.g. a file format or raw

'parquet'
show_progress bool

Display a progress bar during data download Defaults to True.

True
return_paths bool

Return paths and success statuses of the downloaded files.

False
chunk_size int

Maximum chunk size.

5 * 2 ** 20
from_date str

start of the data date range contained in the distribution, defaults to upload date

None
to_date str

end of the data date range contained in the distribution, defaults to upload date.

None
file_name str

file name to be used for the uploaded file. Defaults to Fusion standard naming.

None
Source code in py_src/fusion/fusion.py
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
def from_bytes(  # noqa: PLR0913
    self,
    data: BytesIO,
    dataset: Optional[str] = None,
    series_member: str = "latest",
    catalog: Optional[str] = None,
    distribution: str = "parquet",
    show_progress: bool = True,
    return_paths: bool = False,
    chunk_size: int = 5 * 2**20,
    from_date: Optional[str] = None,
    to_date: Optional[str] = None,
    file_name: Optional[str] = None,
    **kwargs: Any,  # noqa: ARG002
) -> Optional[list[tuple[bool, str, Optional[str]]]]:
    """Uploads data from an object in memory.

    Args:
        data (str): an object in memory to upload
        dataset (str, optional): Dataset name to which the file will be uploaded (for single file only).
                                If not provided the dataset will be implied from file's name.
        series_member (str, optional): A single date or label. Defaults to 'latest' which will return
            the most recent.
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
        distribution (str, optional): A distribution type, e.g. a file format or raw
        show_progress (bool, optional): Display a progress bar during data download Defaults to True.
        return_paths (bool, optional): Return paths and success statuses of the downloaded files.
        chunk_size (int, optional): Maximum chunk size.
        from_date (str, optional): start of the data date range contained in the distribution,
            defaults to upload date
        to_date (str, optional): end of the data date range contained in the distribution, defaults to upload date.
        file_name (str, optional): file name to be used for the uploaded file. Defaults to Fusion standard naming.

    Returns:


    """
    catalog = self._use_catalog(catalog)

    fs_fusion = self.get_fusion_filesystem()
    if distribution not in RECOGNIZED_FORMATS + ["raw"]:
        raise ValueError(f"Dataset format {distribution} is not supported")

    is_raw = js.loads(fs_fusion.cat(f"{catalog}/datasets/{dataset}"))["isRawData"]
    local_url_eqiv = path_to_url(f"{dataset}__{catalog}__{series_member}.{distribution}", is_raw)

    data_map_df = pd.DataFrame(["", local_url_eqiv, file_name]).T
    data_map_df.columns = ["path", "url", "file_name"]  # type: ignore

    res = upload_files(
        fs_fusion,
        data,
        data_map_df,
        parallel=False,
        n_par=1,
        multipart=False,
        chunk_size=chunk_size,
        show_progress=show_progress,
        from_date=from_date,
        to_date=to_date,
    )

    if not all(r[0] for r in res):
        failed_res = [r for r in res if not r[0]]
        msg = f"Not all uploads were successfully completed. The following failed:\n{failed_res}"
        logger.warning(msg)
        warnings.warn(msg, stacklevel=2)

    return res if return_paths else None

get_events(last_event_id=None, catalog=None, in_background=True, url='https://fusion.jpmorgan.com/api/v1/')

Run server sent event listener and print out the new events. Keyboard terminate to stop.

Parameters:

Name Type Description Default
last_event_id str

id of the last event.

None
catalog str

catalog.

None
in_background bool

execute event monitoring in the background (default = True).

True
url str

subscription url.

'https://fusion.jpmorgan.com/api/v1/'
Source code in py_src/fusion/fusion.py
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
def get_events(
    self,
    last_event_id: Optional[str] = None,
    catalog: Optional[str] = None,
    in_background: bool = True,
    url: str = "https://fusion.jpmorgan.com/api/v1/",
) -> Union[None, pd.DataFrame]:
    """Run server sent event listener and print out the new events. Keyboard terminate to stop.

    Args:
        last_event_id (str): id of the last event.
        catalog (str): catalog.
        in_background (bool): execute event monitoring in the background (default = True).
        url (str): subscription url.
    Returns:
        Union[None, class:`pandas.DataFrame`]: If in_background is True then the function returns no output.
            If in_background is set to False then pandas DataFrame is output upon keyboard termination.
    """

    catalog = self._use_catalog(catalog)
    if not in_background:
        from sseclient import SSEClient

        _ = self.list_catalogs()  # refresh token
        messages = SSEClient(
            session=self.session,
            url=f"{url}catalogs/{catalog}/notifications/subscribe",
            last_id=last_event_id,
            headers={
                "authorization": f"bearer {self.credentials.bearer_token}",
            },
        )
        lst = []
        try:
            for msg in messages:
                event = js.loads(msg.data)
                if event["type"] != "HeartBeatNotification":
                    lst.append(event)
        except KeyboardInterrupt:
            return pd.DataFrame(lst)
        except Exception as e:
            raise e
        finally:
            return None  # noqa: B012, SIM107
    else:
        return self.events

get_fusion_filesystem()

Creates Fusion Filesystem.

Returns: Fusion Filesystem

Source code in py_src/fusion/fusion.py
205
206
207
208
209
210
211
def get_fusion_filesystem(self) -> FusionHTTPFileSystem:
    """Creates Fusion Filesystem.

    Returns: Fusion Filesystem

    """
    return FusionHTTPFileSystem(client_kwargs={"root_url": self.root_url, "credentials": self.credentials})

list_catalogs(output=False)

Lists the catalogs available to the API account.

Parameters:

Name Type Description Default
output bool

If True then print the dataframe. Defaults to False.

False

Returns:

Name Type Description
class DataFrame

pandas.DataFrame: A dataframe with a row for each catalog

Source code in py_src/fusion/fusion.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def list_catalogs(self, output: bool = False) -> pd.DataFrame:
    """Lists the catalogs available to the API account.

    Args:
        output (bool, optional): If True then print the dataframe. Defaults to False.

    Returns:
        class:`pandas.DataFrame`: A dataframe with a row for each catalog
    """
    url = f"{self.root_url}catalogs/"
    cat_df = Fusion._call_for_dataframe(url, self.session)

    if output:
        pass

    return cat_df

list_dataset_attributes(dataset, catalog=None, output=False, display_all_columns=False)

Returns the list of attributes that are in the dataset.

Parameters:

Name Type Description Default
dataset str

A dataset identifier

required
catalog str

A catalog identifier. Defaults to 'common'.

None
output bool

If True then print the dataframe. Defaults to False.

False
display_all_columns bool

If True displays all columns returned by the API, otherwise only the key columns are displayed

False

Returns:

Name Type Description
class DataFrame

pandas.DataFrame: A dataframe with a row for each attribute

Source code in py_src/fusion/fusion.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
def list_dataset_attributes(
    self,
    dataset: str,
    catalog: Optional[str] = None,
    output: bool = False,
    display_all_columns: bool = False,
) -> pd.DataFrame:
    """Returns the list of attributes that are in the dataset.

    Args:
        dataset (str): A dataset identifier
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
        output (bool, optional): If True then print the dataframe. Defaults to False.
        display_all_columns (bool, optional): If True displays all columns returned by the API,
            otherwise only the key columns are displayed

    Returns:
        class:`pandas.DataFrame`: A dataframe with a row for each attribute
    """
    catalog = self._use_catalog(catalog)

    url = f"{self.root_url}catalogs/{catalog}/datasets/{dataset}/attributes"
    ds_attr_df = Fusion._call_for_dataframe(url, self.session).sort_values(by="index").reset_index(drop=True)

    if not display_all_columns:
        ds_attr_df = ds_attr_df[
            ds_attr_df.columns.intersection(
                [
                    "identifier",
                    "title",
                    "dataType",
                    "isDatasetKey",
                    "description",
                    "source",
                ]
            )
        ]

    if output:
        pass

    return ds_attr_df

list_datasetmembers(dataset, catalog=None, output=False, max_results=-1)

List the available members in the dataset series.

Parameters:

Name Type Description Default
dataset str

A dataset identifier

required
catalog str

A catalog identifier. Defaults to 'common'.

None
output bool

If True then print the dataframe. Defaults to False.

False
max_results int

Limit the number of rows returned in the dataframe. Defaults to -1 which returns all results.

-1

Returns:

Name Type Description
class DataFrame

pandas.DataFrame: a dataframe with a row for each dataset member.

Source code in py_src/fusion/fusion.py
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
def list_datasetmembers(
    self,
    dataset: str,
    catalog: Optional[str] = None,
    output: bool = False,
    max_results: int = -1,
) -> pd.DataFrame:
    """List the available members in the dataset series.

    Args:
        dataset (str): A dataset identifier
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
        output (bool, optional): If True then print the dataframe. Defaults to False.
        max_results (int, optional): Limit the number of rows returned in the dataframe.
            Defaults to -1 which returns all results.

    Returns:
        class:`pandas.DataFrame`: a dataframe with a row for each dataset member.
    """
    catalog = self._use_catalog(catalog)

    url = f"{self.root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries"
    ds_members_df = Fusion._call_for_dataframe(url, self.session)

    if max_results > -1:
        ds_members_df = ds_members_df[0:max_results]

    if output:
        pass

    return ds_members_df

list_datasets(contains=None, id_contains=False, product=None, catalog=None, output=False, max_results=-1, display_all_columns=False, status=None)

Get the datasets contained in a catalog.

Parameters:

Name Type Description Default
contains Union[str, list]

A string or a list of strings that are dataset identifiers to filter the datasets list. If a list is provided then it will return datasets whose identifier matches any of the strings. Defaults to None.

None
id_contains bool

Filter datasets only where the string(s) are contained in the identifier, ignoring description.

False
product Union[str, list]

A string or a list of strings that are product identifiers to filter the datasets list. Defaults to None.

None
catalog str

A catalog identifier. Defaults to 'common'.

None
output bool

If True then print the dataframe. Defaults to False.

False
max_results int

Limit the number of rows returned in the dataframe. Defaults to -1 which returns all results.

-1
display_all_columns bool

If True displays all columns returned by the API, otherwise only the key columns are displayed

False
status str

filter the datasets by status, default is to show all results.

None

Returns:

Name Type Description
class DataFrame

pandas.DataFrame: a dataframe with a row for each dataset.

Source code in py_src/fusion/fusion.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def list_datasets(  # noqa: PLR0913
    self,
    contains: Optional[Union[str, list[str]]] = None,
    id_contains: bool = False,
    product: Optional[Union[str, list[str]]] = None,
    catalog: Optional[str] = None,
    output: bool = False,
    max_results: int = -1,
    display_all_columns: bool = False,
    status: Optional[str] = None,
) -> pd.DataFrame:
    """Get the datasets contained in a catalog.

    Args:
        contains (Union[str, list], optional): A string or a list of strings that are dataset
            identifiers to filter the datasets list. If a list is provided then it will return
            datasets whose identifier matches any of the strings. Defaults to None.
        id_contains (bool): Filter datasets only where the string(s) are contained in the identifier,
            ignoring description.
        product (Union[str, list], optional): A string or a list of strings that are product
            identifiers to filter the datasets list. Defaults to None.
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
        output (bool, optional): If True then print the dataframe. Defaults to False.
        max_results (int, optional): Limit the number of rows returned in the dataframe.
            Defaults to -1 which returns all results.
        display_all_columns (bool, optional): If True displays all columns returned by the API,
            otherwise only the key columns are displayed
        status (str, optional): filter the datasets by status, default is to show all results.

    Returns:
        class:`pandas.DataFrame`: a dataframe with a row for each dataset.
    """
    catalog = self._use_catalog(catalog)

    url = f"{self.root_url}catalogs/{catalog}/datasets"
    ds_df = Fusion._call_for_dataframe(url, self.session)

    if contains:
        if isinstance(contains, list):
            contains = "|".join(f"{s}" for s in contains)
        if id_contains:
            ds_df = ds_df[ds_df["identifier"].str.contains(contains, case=False)]
        else:
            ds_df = ds_df[
                ds_df["identifier"].str.contains(contains, case=False)
                | ds_df["description"].str.contains(contains, case=False)
            ]

    if product:
        url = f"{self.root_url}catalogs/{catalog}/productDatasets"
        prd_df = Fusion._call_for_dataframe(url, self.session)
        prd_df = (
            prd_df[prd_df["product"] == product]
            if isinstance(product, str)
            else prd_df[prd_df["product"].isin(product)]
        )
        ds_df = ds_df[ds_df["identifier"].isin(prd_df["dataset"])].reset_index(drop=True)

    if max_results > -1:
        ds_df = ds_df[0:max_results]

    ds_df["category"] = ds_df.category.str.join(", ")
    ds_df["region"] = ds_df.region.str.join(", ")
    if not display_all_columns:
        cols = [
            "identifier",
            "title",
            "containerType",
            "region",
            "category",
            "coverageStartDate",
            "coverageEndDate",
            "description",
            "status",
        ]
        cols = [c for c in cols if c in ds_df.columns]
        ds_df = ds_df[cols]

    if status is not None:
        ds_df = ds_df[ds_df["status"] == status]

    if output:
        pass

    return ds_df

list_distributions(dataset, series, catalog=None, output=False)

List the available distributions (downloadable instances of the dataset with a format type).

Parameters:

Name Type Description Default
dataset str

A dataset identifier

required
series str

The datasetseries identifier

required
catalog str

A catalog identifier. Defaults to 'common'.

None
output bool

If True then print the dataframe. Defaults to False.

False

Returns:

Name Type Description
class DataFrame

pandas.DataFrame: A dataframe with a row for each distribution.

Source code in py_src/fusion/fusion.py
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
def list_distributions(
    self, dataset: str, series: str, catalog: Optional[str] = None, output: bool = False
) -> pd.DataFrame:
    """List the available distributions (downloadable instances of the dataset with a format type).

    Args:
        dataset (str): A dataset identifier
        series (str): The datasetseries identifier
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
        output (bool, optional): If True then print the dataframe. Defaults to False.

    Returns:
        class:`pandas.DataFrame`: A dataframe with a row for each distribution.
    """
    catalog = self._use_catalog(catalog)

    url = f"{self.root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/{series}/distributions"
    distros_df = Fusion._call_for_dataframe(url, self.session)

    if output:
        pass

    return distros_df

list_products(contains=None, id_contains=False, catalog=None, output=False, max_results=-1, display_all_columns=False)

Get the products contained in a catalog. A product is a grouping of datasets.

Parameters:

Name Type Description Default
contains Union[str, list]

A string or a list of strings that are product identifiers to filter the products list. If a list is provided then it will return products whose identifier matches any of the strings. Defaults to None.

None
id_contains bool

Filter datasets only where the string(s) are contained in the identifier, ignoring description.

False
catalog str

A catalog identifier. Defaults to 'common'.

None
output bool

If True then print the dataframe. Defaults to False.

False
max_results int

Limit the number of rows returned in the dataframe. Defaults to -1 which returns all results.

-1
display_all_columns bool

If True displays all columns returned by the API, otherwise only the key columns are displayed

False

Returns:

Name Type Description
class DataFrame

pandas.DataFrame: a dataframe with a row for each product

Source code in py_src/fusion/fusion.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def list_products(
    self,
    contains: Optional[Union[str, list[str]]] = None,
    id_contains: bool = False,
    catalog: Optional[str] = None,
    output: bool = False,
    max_results: int = -1,
    display_all_columns: bool = False,
) -> pd.DataFrame:
    """Get the products contained in a catalog. A product is a grouping of datasets.

    Args:
        contains (Union[str, list], optional): A string or a list of strings that are product
            identifiers to filter the products list. If a list is provided then it will return
            products whose identifier matches any of the strings. Defaults to None.
        id_contains (bool): Filter datasets only where the string(s) are contained in the identifier,
            ignoring description.
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
        output (bool, optional): If True then print the dataframe. Defaults to False.
        max_results (int, optional): Limit the number of rows returned in the dataframe.
            Defaults to -1 which returns all results.
        display_all_columns (bool, optional): If True displays all columns returned by the API,
            otherwise only the key columns are displayed

    Returns:
        class:`pandas.DataFrame`: a dataframe with a row for each product
    """
    catalog = self._use_catalog(catalog)

    url = f"{self.root_url}catalogs/{catalog}/products"
    full_prod_df: pd.DataFrame = Fusion._call_for_dataframe(url, self.session)

    if contains:
        if isinstance(contains, list):
            contains = "|".join(f"{s}" for s in contains)
        if id_contains:
            filtered_df = full_prod_df[full_prod_df["identifier"].str.contains(contains, case=False)]
        else:
            filtered_df = full_prod_df[
                full_prod_df["identifier"].str.contains(contains, case=False)
                | full_prod_df["description"].str.contains(contains, case=False)
            ]
    else:
        filtered_df = full_prod_df

    filtered_df["category"] = filtered_df.category.str.join(", ")
    filtered_df["region"] = filtered_df.region.str.join(", ")
    if not display_all_columns:
        filtered_df = filtered_df[
            filtered_df.columns.intersection(
                [
                    "identifier",
                    "title",
                    "region",
                    "category",
                    "status",
                    "description",
                ]
            )
        ]

    if max_results > -1:
        filtered_df = filtered_df[0:max_results]

    if output:
        pass

    return filtered_df

listen_to_events(last_event_id=None, catalog=None, url='https://fusion.jpmorgan.com/api/v1/')

Run server sent event listener in the background. Retrieve results by running get_events.

Parameters:

Name Type Description Default
last_event_id str

Last event ID (exclusive).

None
catalog str

catalog.

None
url str

subscription url.

'https://fusion.jpmorgan.com/api/v1/'
Source code in py_src/fusion/fusion.py
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
def listen_to_events(
    self,
    last_event_id: Optional[str] = None,
    catalog: Optional[str] = None,
    url: str = "https://fusion.jpmorgan.com/api/v1/",
) -> Union[None, pd.DataFrame]:
    """Run server sent event listener in the background. Retrieve results by running get_events.

    Args:
        last_event_id (str): Last event ID (exclusive).
        catalog (str): catalog.
        url (str): subscription url.
    Returns:
        Union[None, class:`pandas.DataFrame`]: If in_background is True then the function returns no output.
            If in_background is set to False then pandas DataFrame is output upon keyboard termination.
    """

    catalog = self._use_catalog(catalog)
    import asyncio
    import json
    import threading

    from aiohttp_sse_client import client as sse_client

    from .utils import get_client

    kwargs: dict[str, Any] = {}
    if last_event_id:
        kwargs = {"headers": {"Last-Event-ID": last_event_id}}

    async def async_events() -> None:
        """Events sync function.

        Returns:
            None
        """
        timeout = 1e100
        session = await get_client(self.credentials, timeout=timeout)
        async with sse_client.EventSource(
            f"{url}catalogs/{catalog}/notifications/subscribe",
            session=session,
            **kwargs,
        ) as messages:
            try:
                async for msg in messages:
                    event = json.loads(msg.data)
                    if self.events is None:
                        self.events = pd.DataFrame()
                    else:
                        self.events = pd.concat([self.events, pd.DataFrame(event)], ignore_index=True)
            except TimeoutError as ex:
                raise ex from None
            except BaseException:
                raise

    _ = self.list_catalogs()  # refresh token
    if "headers" in kwargs:
        kwargs["headers"].update({"authorization": f"bearer {self.credentials.bearer_token}"})
    else:
        kwargs["headers"] = {
            "authorization": f"bearer {self.credentials.bearer_token}",
        }
    if "http" in self.credentials.proxies:
        kwargs["proxy"] = self.credentials.proxies["http"]
    elif "https" in self.credentials.proxies:
        kwargs["proxy"] = self.credentials.proxies["https"]
    th = threading.Thread(target=asyncio.run, args=(async_events(),), daemon=True)
    th.start()
    return None

to_bytes(dataset, series_member, dataset_format='parquet', catalog=None)

Returns an instance of dataset (the distribution) as a bytes object.

Parameters:

Name Type Description Default
dataset str

A dataset identifier

required
series_member (str)

A dataset series member identifier

required
dataset_format str

The file format, e.g. CSV or Parquet. Defaults to 'parquet'.

'parquet'
catalog str

A catalog identifier. Defaults to 'common'.

None
Source code in py_src/fusion/fusion.py
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
def to_bytes(
    self,
    dataset: str,
    series_member: str,
    dataset_format: str = "parquet",
    catalog: Optional[str] = None,
) -> BytesIO:
    """Returns an instance of dataset (the distribution) as a bytes object.

    Args:
        dataset (str): A dataset identifier
        series_member (str,): A dataset series member identifier
        dataset_format (str, optional): The file format, e.g. CSV or Parquet. Defaults to 'parquet'.
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
    """

    catalog = self._use_catalog(catalog)

    url = distribution_to_url(
        self.root_url,
        dataset,
        series_member,
        dataset_format,
        catalog,
    )

    return Fusion._call_for_bytes_object(url, self.session)

to_df(dataset, dt_str='latest', dataset_format='parquet', catalog=None, n_par=None, show_progress=True, columns=None, filters=None, force_download=False, download_folder=None, dataframe_type='pandas', **kwargs)

Gets distributions for a specified date or date range and returns the data as a dataframe.

Parameters:

Name Type Description Default
dataset str

A dataset identifier

required
dt_str str

Either a single date or a range identified by a start or end date, or both separated with a ":". Defaults to 'latest' which will return the most recent instance of the dataset.

'latest'
dataset_format str

The file format, e.g. CSV or Parquet. Defaults to 'parquet'.

'parquet'
catalog str

A catalog identifier. Defaults to 'common'.

None
n_par int

Specify how many distributions to download in parallel. Defaults to all cpus available.

None
show_progress bool

Display a progress bar during data download Defaults to True.

True
columns List

A list of columns to return from a parquet file. Defaults to None

None
filters List

List[Tuple] or List[List[Tuple]] or None (default) Rows which do not match the filter predicate will be removed from scanned data. Partition keys embedded in a nested directory structure will be exploited to avoid loading files at all if they contain no matching rows. If use_legacy_dataset is True, filters can only reference partition keys and only a hive-style directory structure is supported. When setting use_legacy_dataset to False, also within-file level filtering and different partitioning schemes are supported. More on https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html

None
force_download bool

If True then will always download a file even if it is already on disk. Defaults to False.

False
download_folder str

The path, absolute or relative, where downloaded files are saved. Defaults to download_folder as set in init

None
dataframe_type str

Type

'pandas'
Source code in py_src/fusion/fusion.py
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
def to_df(  # noqa: PLR0913
    self,
    dataset: str,
    dt_str: str = "latest",
    dataset_format: str = "parquet",
    catalog: Optional[str] = None,
    n_par: Optional[int] = None,
    show_progress: bool = True,
    columns: Optional[list[str]] = None,
    filters: Optional[PyArrowFilterT] = None,
    force_download: bool = False,
    download_folder: Optional[str] = None,
    dataframe_type: str = "pandas",
    **kwargs: Any,
) -> pd.DataFrame:
    """Gets distributions for a specified date or date range and returns the data as a dataframe.

    Args:
        dataset (str): A dataset identifier
        dt_str (str, optional): Either a single date or a range identified by a start or end date,
            or both separated with a ":". Defaults to 'latest' which will return the most recent
            instance of the dataset.
        dataset_format (str, optional): The file format, e.g. CSV or Parquet. Defaults to 'parquet'.
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
        n_par (int, optional): Specify how many distributions to download in parallel.
            Defaults to all cpus available.
        show_progress (bool, optional): Display a progress bar during data download Defaults to True.
        columns (List, optional): A list of columns to return from a parquet file. Defaults to None
        filters (List, optional): List[Tuple] or List[List[Tuple]] or None (default)
            Rows which do not match the filter predicate will be removed from scanned data.
            Partition keys embedded in a nested directory structure will be exploited to avoid
            loading files at all if they contain no matching rows. If use_legacy_dataset is True,
            filters can only reference partition keys and only a hive-style directory structure
            is supported. When setting use_legacy_dataset to False, also within-file level filtering
            and different partitioning schemes are supported.
            More on https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
        force_download (bool, optional): If True then will always download a file even
            if it is already on disk. Defaults to False.
        download_folder (str, optional): The path, absolute or relative, where downloaded files are saved.
            Defaults to download_folder as set in __init__
        dataframe_type (str, optional): Type
    Returns:
        class:`pandas.DataFrame`: a dataframe containing the requested data.
            If multiple dataset instances are retrieved then these are concatenated first.
    """
    catalog = self._use_catalog(catalog)

    # sample data is limited to csv
    if dt_str == "sample":
        dataset_format = "csv"

    if not download_folder:
        download_folder = self.download_folder
    download_res = self.download(
        dataset,
        dt_str,
        dataset_format,
        catalog,
        n_par,
        show_progress,
        force_download,
        download_folder,
        return_paths=True,
    )

    if not download_res:
        raise ValueError("Must specify 'return_paths=True' in download call to use this function")

    if not all(res[0] for res in download_res):
        failed_res = [res for res in download_res if not res[0]]
        raise Exception(
            f"Not all downloads were successfully completed. "
            f"Re-run to collect missing files. The following failed:\n{failed_res}"
        )

    files = [res[1] for res in download_res]

    pd_read_fn_map = {
        "csv": read_csv,
        "parquet": read_parquet,
        "parq": read_parquet,
        "json": read_json,
        "raw": read_csv,
    }

    pd_read_default_kwargs: dict[str, dict[str, object]] = {
        "csv": {
            "columns": columns,
            "filters": filters,
            "fs": self.fs,
            "dataframe_type": dataframe_type,
        },
        "parquet": {
            "columns": columns,
            "filters": filters,
            "fs": self.fs,
            "dataframe_type": dataframe_type,
        },
        "json": {
            "columns": columns,
            "filters": filters,
            "fs": self.fs,
            "dataframe_type": dataframe_type,
        },
        "raw": {
            "columns": columns,
            "filters": filters,
            "fs": self.fs,
            "dataframe_type": dataframe_type,
        },
    }

    pd_read_default_kwargs["parq"] = pd_read_default_kwargs["parquet"]

    pd_reader = pd_read_fn_map.get(dataset_format)
    pd_read_kwargs = pd_read_default_kwargs.get(dataset_format, {})
    if not pd_reader:
        raise Exception(f"No pandas function to read file in format {dataset_format}")

    pd_read_kwargs.update(kwargs)

    if len(files) == 0:
        raise APIResponseError(
            f"No series members for dataset: {dataset} "
            f"in date or date range: {dt_str} and format: {dataset_format}"
        )
    if dataset_format in ["parquet", "parq"]:
        data_df = pd_reader(files, **pd_read_kwargs)  # type: ignore
    elif dataset_format == "raw":
        dataframes = (
            pd.concat(
                [pd_reader(ZipFile(f).open(p), **pd_read_kwargs) for p in ZipFile(f).namelist()],  # type: ignore
                ignore_index=True,
            )
            for f in files
        )
        data_df = pd.concat(dataframes, ignore_index=True)
    else:
        dataframes = (pd_reader(f, **pd_read_kwargs) for f in files)  # type: ignore
        if dataframe_type == "pandas":
            data_df = pd.concat(dataframes, ignore_index=True)
        if dataframe_type == "polars":
            import polars as pl

            data_df = pl.concat(dataframes, how="diagonal")  # type: ignore

    return data_df

to_table(dataset, dt_str='latest', dataset_format='parquet', catalog=None, n_par=None, show_progress=True, columns=None, filters=None, force_download=False, download_folder=None, **kwargs)

Gets distributions for a specified date or date range and returns the data as an arrow table.

Parameters:

Name Type Description Default
dataset str

A dataset identifier

required
dt_str str

Either a single date or a range identified by a start or end date, or both separated with a ":". Defaults to 'latest' which will return the most recent instance of the dataset.

'latest'
dataset_format str

The file format, e.g. CSV or Parquet. Defaults to 'parquet'.

'parquet'
catalog str

A catalog identifier. Defaults to 'common'.

None
n_par int

Specify how many distributions to download in parallel. Defaults to all cpus available.

None
show_progress bool

Display a progress bar during data download Defaults to True.

True
columns List

A list of columns to return from a parquet file. Defaults to None

None
filters List

List[Tuple] or List[List[Tuple]] or None (default) Rows which do not match the filter predicate will be removed from scanned data. Partition keys embedded in a nested directory structure will be exploited to avoid loading files at all if they contain no matching rows. If use_legacy_dataset is True, filters can only reference partition keys and only a hive-style directory structure is supported. When setting use_legacy_dataset to False, also within-file level filtering and different partitioning schemes are supported. More on https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html

None
force_download bool

If True then will always download a file even if it is already on disk. Defaults to False.

False
download_folder str

The path, absolute or relative, where downloaded files are saved. Defaults to download_folder as set in init

None
Source code in py_src/fusion/fusion.py
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
def to_table(  # noqa: PLR0913
    self,
    dataset: str,
    dt_str: str = "latest",
    dataset_format: str = "parquet",
    catalog: Optional[str] = None,
    n_par: Optional[int] = None,
    show_progress: bool = True,
    columns: Optional[list[str]] = None,
    filters: Optional[PyArrowFilterT] = None,
    force_download: bool = False,
    download_folder: Optional[str] = None,
    **kwargs: Any,
) -> pa.Table:
    """Gets distributions for a specified date or date range and returns the data as an arrow table.

    Args:
        dataset (str): A dataset identifier
        dt_str (str, optional): Either a single date or a range identified by a start or end date,
            or both separated with a ":". Defaults to 'latest' which will return the most recent
            instance of the dataset.
        dataset_format (str, optional): The file format, e.g. CSV or Parquet. Defaults to 'parquet'.
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
        n_par (int, optional): Specify how many distributions to download in parallel.
            Defaults to all cpus available.
        show_progress (bool, optional): Display a progress bar during data download Defaults to True.
        columns (List, optional): A list of columns to return from a parquet file. Defaults to None
        filters (List, optional): List[Tuple] or List[List[Tuple]] or None (default)
            Rows which do not match the filter predicate will be removed from scanned data.
            Partition keys embedded in a nested directory structure will be exploited to avoid
            loading files at all if they contain no matching rows. If use_legacy_dataset is True,
            filters can only reference partition keys and only a hive-style directory structure
            is supported. When setting use_legacy_dataset to False, also within-file level filtering
            and different partitioning schemes are supported.
            More on https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
        force_download (bool, optional): If True then will always download a file even
            if it is already on disk. Defaults to False.
        download_folder (str, optional): The path, absolute or relative, where downloaded files are saved.
            Defaults to download_folder as set in __init__
    Returns:
        class:`pyarrow.Table`: a dataframe containing the requested data.
            If multiple dataset instances are retrieved then these are concatenated first.
    """
    catalog = self._use_catalog(catalog)
    n_par = cpu_count(n_par)
    if not download_folder:
        download_folder = self.download_folder
    download_res = self.download(
        dataset,
        dt_str,
        dataset_format,
        catalog,
        n_par,
        show_progress,
        force_download,
        download_folder,
        return_paths=True,
    )

    if not download_res:
        raise ValueError("Must specify 'return_paths=True' in download call to use this function")

    if not all(res[0] for res in download_res):
        failed_res = [res for res in download_res if not res[0]]
        raise RuntimeError(
            f"Not all downloads were successfully completed. "
            f"Re-run to collect missing files. The following failed:\n{failed_res}"
        )

    files = [res[1] for res in download_res]

    read_fn_map = {
        "csv": csv_to_table,
        "parquet": parquet_to_table,
        "parq": parquet_to_table,
        "json": json_to_table,
        "raw": csv_to_table,
    }

    read_default_kwargs: dict[str, dict[str, object]] = {
        "csv": {"columns": columns, "filters": filters, "fs": self.fs},
        "parquet": {"columns": columns, "filters": filters, "fs": self.fs},
        "json": {"columns": columns, "filters": filters, "fs": self.fs},
        "raw": {"columns": columns, "filters": filters, "fs": self.fs},
    }

    read_default_kwargs["parq"] = read_default_kwargs["parquet"]

    reader = read_fn_map.get(dataset_format)
    read_kwargs = read_default_kwargs.get(dataset_format, {})
    if not reader:
        raise AssertionError(f"No function to read file in format {dataset_format}")

    read_kwargs.update(kwargs)

    if len(files) == 0:
        raise APIResponseError(
            f"No series members for dataset: {dataset} "
            f"in date or date range: {dt_str} and format: {dataset_format}"
        )
    if dataset_format in ["parquet", "parq"]:
        tbl = reader(files, **read_kwargs)  # type: ignore
    else:
        tbl = (reader(f, **read_kwargs) for f in files)  # type: ignore
        tbl = pa.concat_tables(tbl)

    return tbl

upload(path, dataset=None, dt_str='latest', catalog=None, n_par=None, show_progress=True, return_paths=False, multipart=True, chunk_size=5 * 2 ** 20, from_date=None, to_date=None, preserve_original_name=False)

Uploads the requested files/files to Fusion.

Parameters:

Name Type Description Default
path str

path to a file or a folder with files

required
dataset str

Dataset name to which the file will be uplaoded (for single file only). If not provided the dataset will be implied from file's name.

None
dt_str str

A file name. Can be any string but is usually a date. Defaults to 'latest' which will return the most recent. Relevant for a single file upload only. If not provided the dataset will be implied from file's name.

'latest'
catalog str

A catalog identifier. Defaults to 'common'.

None
n_par int

Specify how many distributions to download in parallel. Defaults to all cpus available.

None
show_progress bool

Display a progress bar during data download Defaults to True.

True
return_paths bool

Return paths and success statuses of the downloaded files.

False
multipart bool

Is multipart upload.

True
chunk_size int

Maximum chunk size.

5 * 2 ** 20
from_date str

start of the data date range contained in the distribution, defaults to upoad date

None
to_date str

end of the data date range contained in the distribution, defaults to upload date.

None
preserve_original_name bool

Preserve the original name of the file. Defaults to False.

False
Source code in py_src/fusion/fusion.py
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
def upload(  # noqa: PLR0913
    self,
    path: str,
    dataset: Optional[str] = None,
    dt_str: str = "latest",
    catalog: Optional[str] = None,
    n_par: Optional[int] = None,
    show_progress: bool = True,
    return_paths: bool = False,
    multipart: bool = True,
    chunk_size: int = 5 * 2**20,
    from_date: Optional[str] = None,
    to_date: Optional[str] = None,
    preserve_original_name: Optional[bool] = False,
) -> Optional[list[tuple[bool, str, Optional[str]]]]:
    """Uploads the requested files/files to Fusion.

    Args:
        path (str): path to a file or a folder with files
        dataset (str, optional): Dataset name to which the file will be uplaoded (for single file only).
                                If not provided the dataset will be implied from file's name.
        dt_str (str, optional): A file name. Can be any string but is usually a date.
                                Defaults to 'latest' which will return the most recent.
                                Relevant for a single file upload only. If not provided the dataset will
                                be implied from file's name.
        catalog (str, optional): A catalog identifier. Defaults to 'common'.
        n_par (int, optional): Specify how many distributions to download in parallel.
            Defaults to all cpus available.
        show_progress (bool, optional): Display a progress bar during data download Defaults to True.
        return_paths (bool, optional): Return paths and success statuses of the downloaded files.
        multipart (bool, optional): Is multipart upload.
        chunk_size (int, optional): Maximum chunk size.
        from_date (str, optional): start of the data date range contained in the distribution,
            defaults to upoad date
        to_date (str, optional): end of the data date range contained in the distribution,
            defaults to upload date.
        preserve_original_name (bool, optional): Preserve the original name of the file. Defaults to False.

    Returns:


    """
    catalog = self._use_catalog(catalog)

    if not self.fs.exists(path):
        raise RuntimeError("The provided path does not exist")

    fs_fusion = self.get_fusion_filesystem()
    if self.fs.info(path)["type"] == "directory":
        file_path_lst = self.fs.find(path)
        local_file_validation = validate_file_names(file_path_lst, fs_fusion)
        file_path_lst = [f for flag, f in zip(local_file_validation, file_path_lst) if flag]
        file_name = [f.split("/")[-1] for f in file_path_lst]
        is_raw_lst = is_dataset_raw(file_path_lst, fs_fusion)
        local_url_eqiv = [path_to_url(i, r) for i, r in zip(file_path_lst, is_raw_lst)]
    else:
        file_path_lst = [path]
        if not catalog or not dataset:
            local_file_validation = validate_file_names(file_path_lst, fs_fusion)
            file_path_lst = [f for flag, f in zip(local_file_validation, file_path_lst) if flag]
            is_raw_lst = is_dataset_raw(file_path_lst, fs_fusion)
            local_url_eqiv = [path_to_url(i, r) for i, r in zip(file_path_lst, is_raw_lst)]
            if preserve_original_name:
                raise ValueError("preserve_original_name can only be used when catalog and dataset are provided.")
        else:
            date_identifier = re.compile(r"^(\d{4})(\d{2})(\d{2})$")
            if date_identifier.match(dt_str):
                dt_str = dt_str if dt_str != "latest" else pd.Timestamp("today").date().strftime("%Y%m%d")
                dt_str = pd.Timestamp(dt_str).date().strftime("%Y%m%d")

            if catalog not in fs_fusion.ls("") or dataset not in [
                i.split("/")[-1] for i in fs_fusion.ls(f"{catalog}/datasets")
            ]:
                msg = (
                    f"File file has not been uploaded, one of the catalog: {catalog} "
                    f"or dataset: {dataset} does not exit."
                )
                warnings.warn(msg, stacklevel=2)
                return [(False, path, msg)]
            file_format = path.split(".")[-1]
            file_name = [path.split("/")[-1]]
            file_format = "raw" if file_format not in RECOGNIZED_FORMATS else file_format

            local_url_eqiv = [
                "/".join(distribution_to_url("", dataset, dt_str, file_format, catalog, False).split("/")[1:])
            ]

    if not preserve_original_name:
        data_map_df = pd.DataFrame([file_path_lst, local_url_eqiv]).T
        data_map_df.columns = pd.Index(["path", "url"])
    else:
        data_map_df = pd.DataFrame([file_path_lst, local_url_eqiv, file_name]).T
        data_map_df.columns = pd.Index(["path", "url", "file_name"])

    n_par = cpu_count(n_par)
    parallel = len(data_map_df) > 1
    res = upload_files(
        fs_fusion,
        self.fs,
        data_map_df,
        parallel=parallel,
        n_par=n_par,
        multipart=multipart,
        chunk_size=chunk_size,
        show_progress=show_progress,
        from_date=from_date,
        to_date=to_date,
    )

    if not all(r[0] for r in res):
        failed_res = [r for r in res if not r[0]]
        msg = f"Not all uploads were successfully completed. The following failed:\n{failed_res}"
        logger.warning(msg)
        warnings.warn(msg, stacklevel=2)

    return res if return_paths else None

Synchronisation between the local filesystem and Fusion.

Parameters:

Name Type Description Default
fs_fusion filesystem

Fusion filesystem.

required
fs_local filesystem

Local filesystem.

required
products list

List of products.

None
datasets list

List of datasets.

None
catalog str

Fusion catalog.

None
direction str

Direction of synchronisation: upload/download.

'upload'
flatten bool

Flatten the folder structure.

False
dataset_format str

Dataset format for upload/download.

None
n_par int

Specify how many distributions to download in parallel. Defaults to all.

None
show_progress bool

Display a progress bar during data download Defaults to True.

True
local_path str

path to files in the local filesystem, e.g., "s3a://my_bucket/"

''
log_level int

Logging level. Error level by default.

ERROR
log_path str

The folder path where the log is stored. Defaults to ".".

'.'
Source code in py_src/fusion/fs_sync.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def fsync(  # noqa: PLR0913
    fs_fusion: fsspec.filesystem,
    fs_local: fsspec.filesystem,
    products: Optional[list[str]] = None,
    datasets: Optional[list[str]] = None,
    catalog: Optional[str] = None,
    direction: str = "upload",
    flatten: bool = False,
    dataset_format: Optional[str] = None,
    n_par: Optional[int] = None,
    show_progress: bool = True,
    local_path: str = "",
    log_level: int = logging.ERROR,
    log_path: str = ".",
) -> None:
    """Synchronisation between the local filesystem and Fusion.

    Args:
        fs_fusion (fsspec.filesystem): Fusion filesystem.
        fs_local (fsspec.filesystem): Local filesystem.
        products (list): List of products.
        datasets (list): List of datasets.
        catalog (str): Fusion catalog.
        direction (str): Direction of synchronisation: upload/download.
        flatten (bool): Flatten the folder structure.
        dataset_format (str): Dataset format for upload/download.
        n_par (int, optional): Specify how many distributions to download in parallel. Defaults to all.
        show_progress (bool): Display a progress bar during data download Defaults to True.
        local_path (str): path to files in the local filesystem, e.g., "s3a://my_bucket/"
        log_level (int): Logging level. Error level by default.
        log_path (str): The folder path where the log is stored. Defaults to ".".

    Returns:

    """

    if logger.hasHandlers():
        logger.handlers.clear()
    file_handler = logging.FileHandler(filename="{}/{}".format(log_path, "fusion_fsync.log"))
    logging.addLevelName(VERBOSE_LVL, "VERBOSE")
    stdout_handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter(
        "%(asctime)s.%(msecs)03d %(name)s:%(levelname)s %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    stdout_handler.setFormatter(formatter)
    logger.addHandler(stdout_handler)
    logger.addHandler(file_handler)
    logger.setLevel(log_level)

    catalog = catalog if catalog else "common"
    datasets = datasets if datasets else []
    products = products if products else []

    assert len(products) > 0 or len(datasets) > 0, "At least one list products or datasets should be non-empty."
    assert direction in [
        "upload",
        "download",
    ], "The direction must be either upload or download."

    if len(local_path) > 0 and local_path[-1] != "/":
        local_path += "/"

    for product in products:
        res = json.loads(fs_fusion.cat(f"{catalog}/products/{product}").decode())
        datasets += [r["identifier"] for r in res["resources"]]

    assert len(datasets) > 0, "The supplied products did not contain any datasets."

    local_state = pd.DataFrame()
    fusion_state = pd.DataFrame()
    while True:
        try:
            local_state_temp = _get_local_state(
                fs_local,
                fs_fusion,
                datasets,
                catalog,
                dataset_format,
                local_state,
                local_path,
            )
            fusion_state_temp = _get_fusion_df(fs_fusion, datasets, catalog, flatten, dataset_format)
            if not local_state_temp.equals(local_state) or not fusion_state_temp.equals(fusion_state):
                res = _synchronize(
                    fs_fusion,
                    fs_local,
                    local_state_temp,
                    fusion_state_temp,
                    direction,
                    n_par,
                    show_progress,
                    local_path,
                )
                if len(res) == 0 or all(i[0] for i in res):
                    local_state = local_state_temp
                    fusion_state = fusion_state_temp

                if not all(r[0] for r in res):
                    failed_res = [r for r in res if not r[0]]
                    msg = f"Not all {direction}s were successfully completed. The following failed:\n{failed_res}"
                    errs = [r for r in res if not r[2]]
                    logger.warning(msg)
                    logger.warning(errs)
                    warnings.warn(msg, stacklevel=2)

            else:
                logger.info("All synced, sleeping")
                time.sleep(10)

        except KeyboardInterrupt:  # noqa: PERF203
            if input("Type exit to exit: ") != "exit":
                continue
            break

        except Exception as _:
            logger.error("Exception thrown", exc_info=True)
            continue