Skip to content

fsync

Synchronisation between the local filesystem and Fusion.

Parameters:

Name Type Description Default
fs_fusion filesystem

Fusion filesystem.

required
fs_local filesystem

Local filesystem.

required
products list

List of products.

None
datasets list

List of datasets.

None
catalog str

Fusion catalog.

None
direction str

Direction of synchronisation: upload/download.

'upload'
flatten bool

Flatten the folder structure.

False
dataset_format str

Dataset format for upload/download.

None
n_par int

Specify how many distributions to download in parallel. Defaults to all.

None
show_progress bool

Display a progress bar during data download Defaults to True.

True
local_path str

path to files in the local filesystem, e.g., "s3a://my_bucket/"

''
log_level int

Logging level. Error level by default.

ERROR
log_path str

The folder path where the log is stored. Defaults to ".".

'.'
Source code in py_src/fusion/fs_sync.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
def fsync(  # noqa: PLR0912, PLR0913, PLR0915
    fs_fusion: fsspec.filesystem,
    fs_local: fsspec.filesystem,
    products: Optional[list[str]] = None,
    datasets: Optional[list[str]] = None,
    catalog: Optional[str] = None,
    direction: str = "upload",
    flatten: bool = False,
    dataset_format: Optional[str] = None,
    n_par: Optional[int] = None,
    show_progress: bool = True,
    local_path: str = "",
    log_level: int = logging.ERROR,
    log_path: str = ".",
) -> None:
    """Synchronisation between the local filesystem and Fusion.

    Args:
        fs_fusion (fsspec.filesystem): Fusion filesystem.
        fs_local (fsspec.filesystem): Local filesystem.
        products (list): List of products.
        datasets (list): List of datasets.
        catalog (str): Fusion catalog.
        direction (str): Direction of synchronisation: upload/download.
        flatten (bool): Flatten the folder structure.
        dataset_format (str): Dataset format for upload/download.
        n_par (int, optional): Specify how many distributions to download in parallel. Defaults to all.
        show_progress (bool): Display a progress bar during data download Defaults to True.
        local_path (str): path to files in the local filesystem, e.g., "s3a://my_bucket/"
        log_level (int): Logging level. Error level by default.
        log_path (str): The folder path where the log is stored. Defaults to ".".

    Returns:

    """
    _configure_fsync_logger(log_level, log_path)
    products, datasets, catalog, local_path = _normalize_fsync_inputs(
        products,
        datasets,
        catalog,
        direction,
        local_path,
    )
    datasets = _extend_datasets_from_products(fs_fusion, products, datasets, catalog)

    local_state = pd.DataFrame()
    fusion_state = pd.DataFrame()
    while True:
        try:
            local_state_temp = _get_local_state(
                fs_local,
                fs_fusion,
                datasets,
                catalog,
                dataset_format,
                local_state,
                local_path,
            )
            fusion_state_temp = _get_fusion_df(fs_fusion, datasets, catalog, flatten, dataset_format)
            if not local_state_temp.equals(local_state) or not fusion_state_temp.equals(fusion_state):
                res = _synchronize(
                    fs_fusion,
                    fs_local,
                    local_state_temp,
                    fusion_state_temp,
                    direction,
                    n_par,
                    show_progress,
                    local_path,
                )
                local_state, fusion_state = _handle_fsync_result(
                    res,
                    direction,
                    local_state,
                    fusion_state,
                    local_state_temp,
                    fusion_state_temp,
                )

            else:
                logger.info("All synced, sleeping")
                time.sleep(10)

        except KeyboardInterrupt:  # noqa: PERF203
            if input("Type exit to exit: ") != "exit":
                continue
            break

        except Exception as _:
            logger.error("Exception thrown", exc_info=True)
            continue