Computation & data access

Data required to compute diagnostics are not read and diagnostics themselves not computed until one of the computation methods is called.
All the methods presented below are available to both NadirData and SwathData data containers.

Sequential computation

The compute() method allows to read data and compute diagnostics one after each other.
Read data and compute statistics. Limited to the provided list of
data if specified.

Parameters
----------
stats
    Name of the data to limit the computation to.
Diagnostics can be computed individually
ad.compute(stats=["Sigma 0"])
Or all at once
ad.compute()

Parallel computation

Parallel computation allows to speed-up computation time and solve memory issues when working with large amount of data.
This kind of computation is done using the compute_dask() and a Dask cluster’s client.
Read data and compute statistics. Limited to the provided list of
data if specified.

Parameters
----------
stats
    Name of the data to limit the computation to.
freq
    Minimal split frequency (day, pass, cycle or any pandas offset aliases [1]_)
    to respect.
jobs_number
    Number of jobs to create (Default to the maximum possible number of periods
    the data can be split into according to the provided frequency).
bar
    [Does not work on xarray datasets] Whether to display a progress bar or not.
    If None, will display if logging level <= INFO.
dask_client
    Dask client on which to submit jobs.

        - if a client, use this client
        - if a scheduler_file, connect a client to it
        - try to get an existing dask client in the environment
        - create a local cluster and connect to it
kwargs
    Additional parameters determine the time splitting characteristics.
    This parameter works with pandas.date_range [2]_ frequencies.

References
----------
.. [1] https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html        #timeseries-offset-aliases
.. [2] https://pandas.pydata.org/pandas-docs/stable/reference/api/        pandas.date_range.html
In order to parallelize, the data is split along the main dimension (usually time).
freq parameter
  • CLS Table

    • This parameter is required

  • xarray Datasets

    • This parameter is optional and the parallelization is done using the current chunking of the dataset (each chunk usually correspond to a single file)

    • Setting this parameter triggers a re-chunking of the data.

jobs_number parameter
  • is optional and default to the maximum possible number of periods the data can be split into according to the given frequency (or the existing number of chunks for xarray Datasets).

  • will split the data in a multiple of the provided minimal frequency if less than the possible maximum number of jobs.

These parameters and the cluster’s parameters depend on each other and must be set according to each other. There are no right or wrong parameters but some general guidelines can be applied to find suitable parameters
  • Creating a cluster with more workers than jobs_number will waste resources

  • Creating a cluster with more per core memory than the memory used by a single of your job will waste resources (and potentially delay the cluster creation)

  • Increasing jobs_number will speed-up CPU bounds computations

  • Increasing jobs_number will slow-down I/O bounds computations

  • Increasing jobs_number implies more I/O so more stress on the storage

  • Increasing jobs_number will increase parallelization overhead

  • Decreasing jobs_number will increase the memory usage of each job

Limitations

Using this computation mode introduce some constraints and limitations you have to take into consideration.
  • For now, some diagnostics are not computed when using this method:

    • Nadir crossovers statistics

    • Editings

    • Section analysis

  • Median statistics cannot be computed for the following diagnostics:

    • Geographical box statistics

    • Scatters

    • Binned statistics

  • Median can be computed for the along time diagnostics if the compute_dask() freq parameter is a multiple of the add_time_stat() freq parameter. Some examples of compute_dask() and add_time_stat() frequency multiples:

    • 1 day is a multiple of 1 day

    • 1 day is a multiple of 1 hour

    • 1 day is a multiple of 6 hours

    • 1 day is not a multiple of 7 hours

    • 1 cycle is a multiple of 1 pass

    • 1 pass is not a multiple of 3 cycles

Dask clusters

Warning

CLS Tables database are not thread safe.
Multi-threaded workers should be avoided when working with this kind of data source.
A single threaded cluster is a cluster having the same number of cores and processes.
Dask clusters can be directly created directly using the dask’s distributed or Dask-Jobqueue libraries or through the DaskCluster utility class which will automatically set some parameters specific to the CLS and CNES environments and offer an homogeneous interface between local and distributed clusters.
Used to create a dask cluster (mostly) independently of environment.

Parameters
----------
cluster_type
    Type of cluster wanted.
start_timeout
    Timeout (in seconds) used by the wait_for_workers method (default: 120).
    Setting it to None will wait indefinitely.
jobs
    Number of job to start (min, max).
walltime
    Maximum execution time for each worker (Unit is minutes, default to 30).
cores
    total number of cores per job
processes
    Cut the job into this many processes. Good for GIL workloads or
    for nodes with many cores.
memory
    total amount of memory per job. Unit is fractional GiB.
name
    name of dask workers
queue
    queue name
project
    project name
resource_spec
    resources needed, format may depend on batch system
silence_logs:
    Level of logs to print out to stdout. logging.CRITICAL by default.
    Use a falsey value like False or None for no change.
interface
    network interface for worker communications (like 'eth0' ou 'ib0')
death_timeout
    Seconds to wait for a scheduler before closing workers
job_extra_directives
    specification of batch system options. Depends on batch system.

    If it is a list, it is is used "as is" and passed to jobqueue.
    If it is a dictionary, the keys are the batch system name and
    the values, the list of options to pass.
worker_extra_args
    additional arguments to pass to dask-worker
job_script_prologue
    other commands to add to script before launching worker
env_vars
    Environment variables to pass to workers. If this is a list,
    this names of environment variables to take from os.environ
    (current process environment variables).

    If this is a dictionary, the key is the variable name, the
    value is the value to pass. If the value is None, the actual
    value is taken from os.environ.

    .. note::
       `env_vars` is used to simplify passing environment variables,
       in fact it enriches the `job_script_prologue` parameter.
env_vars_treatment
    What to do with environment variables given with `env_vars`
local_directory
    dask worker local directory for file spilling (used by LOCAL clusters)
log_directory
    directory to use for job scheduler logs
python
    python executable used to launch Dask workers.
shebang
    path to desired interpreter for your batch submission script.
jupyterhub
    Set this cluster to be usable in a jupyterhub context.
expand_environment_variables
    List of parameters for which to expand environment variables.
    If None, ['log_directory', 'worker_extra_args', 'job_script_prologue'] is taken.
    If Empty no variable is expanded.
kwargs
    Additional cluster parameters.
Dask is a powerful tool allowing you to book and access large amount of CPU and memory resources. These resources are shared among users and need to be freed when you’re done using them.
Freeing cluster’s resources is done by calling the .close() method.

Local cluster

Local clusters allow to use all or a subpart of the CPUs of the computer on which your program is running.
In the following example we create and use a local cluster with 4 single threaded workers each using 3 GB of memory.
Example
from octantng.core.dask import DaskCluster

dask_cluster = DaskCluster(
    cluster_type="local",
    jobs=4,
    memory=3,
    local_directory="./dask_stuff",
)
client = dask_cluster.client

ad.compute_dask(dask_client=client, jobs_number=4, freq="pass")

dask_cluster.close()

Slurm cluster

Note

If used at CLS, the account=XYZ parameter is optional.
If used at CNES, set the interface="ib0" parameter to improve network performances.
If used through CNES jupyterhub, you can provide the jupyterhub=True parameter in order to make the dashboard accessible.
Example 1
In the following example we create and use an adaptive Slurm cluster scaling between 5 and 10 single threaded workers each using 3 GB of memory and having a 10 minutes lifetime.

Using CLS resources

from octantng.core.dask import DaskCluster

dask_cluster = DaskCluster(
    cluster_type="slurm",
    jobs=(5, 10),
    memory=3,
    walltime=10,
    log_directory="./dask_stuff",
)
client = dask_cluster.client

ad.compute_dask(dask_client=client, jobs_number=100, freq="pass")

# Remember to close your cluster and free its resources!
dask_cluster.close()

Using CNES jupyterhub resources

from octantng.core.dask import DaskCluster

dask_cluster = DaskCluster(
    cluster_type="slurm",
    account="XYZ",
    jobs=(5, 10),
    memory=3,
    walltime=10,
    interface="ib0",
    log_directory="./dask_stuff",
    # Do not set this if outside jupyterhub!
    jupyterhub=True,
)
client = dask_cluster.client

ad.compute_dask(dask_client=client, jobs_number=100, freq="pass")

# Remember to close your cluster and free its resources!
dask_cluster.close()
Example 2
In the following example we create and use a Slurm cluster with 12 workers each using 3 GB of memory (grouped in jobs of 4 cores and 12 GB of memory).
These workers are not single threaded.

Using CLS resources

from octantng.core.dask import DaskCluster

dask_cluster = DaskCluster(
    cluster_type="slurm",
    jobs=(3, 3),
    cores=4,
    processes=2,
    memory=12,
    walltime=10,
    log_directory="./dask_stuff",
)
client = dask_cluster.client

ad.compute_dask(dask_client=client, jobs_number=100, freq="pass")

# Remember to close your cluster and free its resources!
dask_cluster.close()

Using CNES jupyterhub resources

from octantng.core.dask import DaskCluster

dask_cluster = DaskCluster(
    cluster_type="slurm",
    account="XYZ",
    jobs=(3, 3),
    cores=4,
    processes=2,
    memory=12,
    walltime=10,
    interface="ib0",
    log_directory="./dask_stuff",
    # Do not set this if outside jupyterhub!
    jupyterhub=True,
)
client = dask_cluster.client

ad.compute_dask(dask_client=client, jobs_number=100, freq="pass")

# Remember to close your cluster and free its resources!
dask_cluster.close()

Computed data access

Existing diagnostics can be listed using the list_diagnostics() method.
List the subset of diagnostics' respecting provided criteria.

Parameters
----------
dtype
    Limit diagnostics list to provided type.
    Data types are:

     - RAW
     - RAW_COMPARISON
     - EDITING
     - GEOBOX
     - TEMPORAL
     - BINNED
     - BINNED_2D
     - HISTOGRAM
     - SCATTER
     - RATIO
     - CROSSOVER
     - MISSING_POINTS
     - SECTION_ANALYSES
containing
    Limit to diagnostic names containing this element.
computed
    Limited diagnostics list based on their computation status:

     - [Default] None: All diagnostics
     - False: Non-computed diagnostics
     - True: Computed diagnostics
freq
    Limit to diagnostics compatible with a dask computation at the provided
    frequency.

Returns
-------
:
    List of diagnostic names.
ad.list_diagnostics(dtype="temporal", containing="sla")
['SLA pass', 'SLA (6h)']
Diagnostics are internally stored in class inheriting from Diagnostic containing its specification and results.
This objects can be accessed using the get_diagnostic() and get_diagnostics() methods.
Get the requested diagnostic data container.

If name is not an existing diagnostic, a new raw or raw_comparison diagnostic
will be generated with the provided parameters.

Parameters
----------
name
    Name of the diagnostic.
field
    Field for which to create a raw_data diagnostic.
x
    Field to use as x-axis (raw comparison diagnostic).
y
    Field to use as y-axis (raw comparison diagnostic).
z
    Field to use as z-axis (raw comparison 3d diagnostic).
kwargs
    Additional read_data parameters.

Returns
-------
:
    Diagnostic data container.
Computed data are contained in xarray datasets.
Data are internally stored as raw data or groups of compatible diagnostics.
For example, diagnostics of 2 time statistics diagnostics using the same frequency will be grouped, computed together and can be accessed in the same dataset.
Once computed, diagnostics data can be accessed in different manner depending on their nature:
  • using the data attribute for raw data

  • using the get_data() method for individual diagnostics data

  • using the get_data_group() method for a whole group of diagnostics.

Raw data access.

ad.data
<xarray.Dataset> Size: 33MB
Dimensions:         (time: 582789)
Coordinates:
  * time            (time) datetime64[ns] 5MB 2019-06-01T05:30:29.807497 ... ...
Data variables:
    LONGITUDE       (time) float64 5MB 17.01 17.14 17.27 ... -123.5 -123.3
    LATITUDE        (time) float64 5MB -66.15 -66.15 -66.15 ... 66.15 66.15
    wind            (time) float64 5MB nan nan nan nan nan ... nan nan nan nan
    SLA             (time) float64 5MB nan nan nan nan nan ... nan nan nan nan
    SIGMA0.ALTI     (time) float64 5MB nan nan nan nan nan ... nan nan nan nan
    EDITING.CUSTOM  (time) int64 5MB 1 1 1 1 1 1 2 1 1 1 ... 1 1 1 1 1 1 1 1 1 1

Individual diagnostic access.

ad.get_data(name="SLA pass", stat="mean")
<xarray.Dataset> Size: 3kB
Dimensions:  (time: 175)
Coordinates:
  * time     (time) datetime64[ns] 1kB 2019-06-01T05:58:35.692381 ... 2019-06...
Data variables:
    SLA      (time) float64 1kB -2.283 -0.2079 -2.382 ... 1.91 -2.641 -2.712

Some diagnostics (section analysis here) might have custom outputs.

ad.get_data(name="Analysis 1")
First position Last position
# Cycle Pass Date Longitude Latitude Date Longitude Latitude Length False %
112222 - 232019-06-02T02:04:5347.98-65.092019-06-02T02:11:5697.61-62.044161.92%
212242 - 432019-06-02T20:47:21112.56-62.902019-06-02T20:54:08161.09-64.804011.50%
312298 - 992019-06-05T01:17:2352.07-65.252019-06-05T01:24:0999.87-62.204000.00%
4122170 - 1712019-06-07T20:41:4191.96-61.052019-06-07T20:50:53155.09-63.295431.84%
5122174 - 1752019-06-08T00:29:2352.61-64.962019-06-08T00:36:33103.19-62.074240.00%

Group of diagnostics access.

ad.get_data_group(name="SLA pass", stat="mean")
<xarray.Dataset> Size: 4kB
Dimensions:      (x: 175)
Coordinates:
  * x            (x) datetime64[ns] 1kB 2019-06-01T05:58:35.692381 ... 2019-0...
Data variables:
    SIGMA0.ALTI  (x) float64 1kB 13.99 13.52 13.77 13.81 ... 13.93 13.69 14.82
    SLA          (x) float64 1kB -2.283 -0.2079 -2.382 ... 1.91 -2.641 -2.712
ad.get_data_group(name="Missing points 1", stat="value")
{MISSING_POINTS: <xarray.Dataset> Size: 19MB
 Dimensions:    (time: 586885)
 Coordinates:
   * time       (time) datetime64[ns] 5MB 2019-06-01T05:30:29.807497 ... 2019-...
 Data variables:
     LONGITUDE  (time) float64 5MB 17.01 17.14 17.27 17.39 ... 236.3 236.4 236.5
     LATITUDE   (time) float64 5MB -66.15 -66.15 -66.15 ... 66.15 66.15 66.15
     missing    (time) bool 587kB False False False False ... False False False
     mp_groups  (time) int64 5MB 1 1 1 1 1 1 1 1 1 1 1 ... 0 0 0 0 0 0 0 0 0 0 0,
 GEOBOX: {'GLOBAL': None, 'LAND': None, 'OCEAN': None},
 TEMPORAL: {'GLOBAL': {'PASS': None},
  'LAND': {'PASS': None},
  'OCEAN': {'PASS': None}},
 SECTION_ANALYSES: {}}

Editing computed data and adding new ones

Raw data can be accessed and edited, through the data container data property.
Accessing this property will return a copy of the data container raw data but assigning a new modified dataset to this property will make everything in it usable for the computation of new diagnostics.
More information can be found here about how to assign new variables to xarray datasets.
A complete example on how to manually edit data is available in this notebook.