Computation & data access
Sequential computation
compute()
method allows to read data and
compute diagnostics one after each other.Read data and compute statistics. Limited to the provided list of
data if specified.
Parameters
----------
stats
Name of the data to limit the computation to.
ad.compute(stats=["Sigma 0"])
ad.compute()
Parallel computation
Read data and compute statistics. Limited to the provided list of
data if specified.
Parameters
----------
stats
Name of the data to limit the computation to.
freq
Minimal split frequency (day, pass, cycle or any pandas offset aliases [1]_)
to respect.
jobs_number
Number of jobs to create (Default to the maximum possible number of periods
the data can be split into according to the provided frequency).
bar
[Does not work on xarray datasets] Whether to display a progress bar or not.
If None, will display if logging level <= INFO.
dask_client
Dask client on which to submit jobs.
- if a client, use this client
- if a scheduler_file, connect a client to it
- try to get an existing dask client in the environment
- create a local cluster and connect to it
kwargs
Additional parameters determine the time splitting characteristics.
This parameter works with pandas.date_range [2]_ frequencies.
References
----------
.. [1] https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html #timeseries-offset-aliases
.. [2] https://pandas.pydata.org/pandas-docs/stable/reference/api/ pandas.date_range.html
freq
parameter
CLS Table
This parameter is required
xarray Datasets
This parameter is optional and the parallelization is done using the current chunking of the dataset (each chunk usually correspond to a single file)
Setting this parameter triggers a re-chunking of the data.
jobs_number
parameter
is optional and default to the maximum possible number of periods the data can be split into according to the given frequency (or the existing number of chunks for xarray Datasets).
will split the data in a multiple of the provided minimal frequency if less than the possible maximum number of jobs.
Creating a cluster with more
workers
thanjobs_number
will waste resourcesCreating a cluster with more per core
memory
than the memory used by a single of your job will waste resources (and potentially delay the cluster creation)Increasing
jobs_number
will speed-up CPU bounds computationsIncreasing
jobs_number
will slow-down I/O bounds computationsIncreasing
jobs_number
implies more I/O so more stress on the storageIncreasing
jobs_number
will increase parallelization overheadDecreasing
jobs_number
will increase the memory usage of each job
Limitations
For now, some diagnostics are not computed when using this method:
Nadir crossovers statistics
Editings
Section analysis
Median statistics cannot be computed for the following diagnostics:
Geographical box statistics
Scatters
Binned statistics
Median can be computed for the along time diagnostics if the
compute_dask()
freq
parameter is a multiple of theadd_time_stat()
freq
parameter. Some examples ofcompute_dask()
andadd_time_stat()
frequency multiples:
1 day
is a multiple of1 day
1 day
is a multiple of1 hour
1 day
is a multiple of6 hours
1 day
is not a multiple of7 hours
1 cycle
is a multiple of1 pass
1 pass
is not a multiple of3 cycles
Dask clusters
Warning
cores
and
processes
.DaskCluster
utility class which will automatically set some parameters specific to the CLS and CNES
environments and offer an homogeneous interface between local and distributed clusters.Used to create a dask cluster (mostly) independently of environment.
Parameters
----------
cluster_type
Type of cluster wanted.
start_timeout
Timeout (in seconds) used by the wait_for_workers method (default: 120).
Setting it to None will wait indefinitely.
jobs
Number of job to start (min, max).
walltime
Maximum execution time for each worker (Unit is minutes, default to 30).
cores
total number of cores per job
processes
Cut the job into this many processes. Good for GIL workloads or
for nodes with many cores.
memory
total amount of memory per job. Unit is fractional GiB.
name
name of dask workers
queue
queue name
project
project name
resource_spec
resources needed, format may depend on batch system
silence_logs:
Level of logs to print out to stdout. logging.CRITICAL by default.
Use a falsey value like False or None for no change.
interface
network interface for worker communications (like 'eth0' ou 'ib0')
death_timeout
Seconds to wait for a scheduler before closing workers
job_extra_directives
specification of batch system options. Depends on batch system.
If it is a list, it is is used "as is" and passed to jobqueue.
If it is a dictionary, the keys are the batch system name and
the values, the list of options to pass.
worker_extra_args
additional arguments to pass to dask-worker
job_script_prologue
other commands to add to script before launching worker
env_vars
Environment variables to pass to workers. If this is a list,
this names of environment variables to take from os.environ
(current process environment variables).
If this is a dictionary, the key is the variable name, the
value is the value to pass. If the value is None, the actual
value is taken from os.environ.
.. note::
`env_vars` is used to simplify passing environment variables,
in fact it enriches the `job_script_prologue` parameter.
env_vars_treatment
What to do with environment variables given with `env_vars`
local_directory
dask worker local directory for file spilling (used by LOCAL clusters)
log_directory
directory to use for job scheduler logs
python
python executable used to launch Dask workers.
shebang
path to desired interpreter for your batch submission script.
jupyterhub
Set this cluster to be usable in a jupyterhub context.
expand_environment_variables
List of parameters for which to expand environment variables.
If None, ['log_directory', 'worker_extra_args', 'job_script_prologue'] is taken.
If Empty no variable is expanded.
kwargs
Additional cluster parameters.
.close()
method.Local cluster
Example
from octantng.core.dask import DaskCluster
dask_cluster = DaskCluster(
cluster_type="local",
jobs=4,
memory=3,
local_directory="./dask_stuff",
)
client = dask_cluster.client
ad.compute_dask(dask_client=client, jobs_number=4, freq="pass")
dask_cluster.close()
import distributed as dist
dask_cluster = dist.LocalCluster(
n_workers=4,
threads_per_worker=1,
memory_limit="3GiB",
local_directory="./dask_stuff",
)
client = dist.Client(dask_cluster)
ad.compute_dask(dask_client=client, jobs_number=4, freq="pass")
dask_cluster.close()
Slurm cluster
Note
account=XYZ
parameter is optional.interface="ib0"
parameter to improve network
performances.jupyterhub=True
parameter in order to make the dashboard accessible.Example 1
Using CLS resources
from octantng.core.dask import DaskCluster
dask_cluster = DaskCluster(
cluster_type="slurm",
jobs=(5, 10),
memory=3,
walltime=10,
log_directory="./dask_stuff",
)
client = dask_cluster.client
ad.compute_dask(dask_client=client, jobs_number=100, freq="pass")
# Remember to close your cluster and free its resources!
dask_cluster.close()
import dask_jobqueue as djq
dask_cluster = djq.SLURMCluster(
cores=1,
processes=1,
memory="3GiB",
walltime="00:10:00",
local_directory="$TMPDIR",
log_directory="./dask_stuff",
)
djq.adapt(minimum_jobs=5, maximum_jobs=10)
client = djq.Client(dask_cluster)
ad.compute_dask(dask_client=client, jobs_number=100, freq="pass")
# Remember to close your cluster and free its resources!
dask_cluster.close()
Using CNES jupyterhub resources
from octantng.core.dask import DaskCluster
dask_cluster = DaskCluster(
cluster_type="slurm",
account="XYZ",
jobs=(5, 10),
memory=3,
walltime=10,
interface="ib0",
log_directory="./dask_stuff",
# Do not set this if outside jupyterhub!
jupyterhub=True,
)
client = dask_cluster.client
ad.compute_dask(dask_client=client, jobs_number=100, freq="pass")
# Remember to close your cluster and free its resources!
dask_cluster.close()
import dask
import dask_jobqueue as djq
# Do not set this if outside jupyterhub!
dashboard_link = "/user/{JUPYTERHUB_USER}/proxy/{port}/status"
_ = dask.config.set({"distributed.dashboard.link": dashboard_link})
dask_cluster = djq.SLURMCluster(
account="XYZ",
cores=1,
processes=1,
memory="3GiB",
walltime="00:10:00",
interface="ib0",
local_directory="$TMPDIR",
log_directory="./dask_stuff",
)
djq.adapt(minimum_jobs=5, maximum_jobs=10)
client = djq.Client(dask_cluster)
ad.compute_dask(dask_client=client, jobs_number=100, freq="pass")
# Remember to close your cluster and free its resources!
dask_cluster.close()
Example 2
Using CLS resources
from octantng.core.dask import DaskCluster
dask_cluster = DaskCluster(
cluster_type="slurm",
jobs=(3, 3),
cores=4,
processes=2,
memory=12,
walltime=10,
log_directory="./dask_stuff",
)
client = dask_cluster.client
ad.compute_dask(dask_client=client, jobs_number=100, freq="pass")
# Remember to close your cluster and free its resources!
dask_cluster.close()
import dask_jobqueue as djq
dask_cluster = djq.SLURMCluster(
cores=4,
processes=2,
memory="12GiB",
walltime="00:10:00",
local_directory="$TMPDIR",
log_directory="./dask_stuff",
)
djq.adapt(minimum_jobs=3, maximum_jobs=3)
client = djq.Client(dask_cluster)
ad.compute_dask(dask_client=client, jobs_number=100, freq="pass")
# Remember to close your cluster and free its resources!
dask_cluster.close()
Using CNES jupyterhub resources
from octantng.core.dask import DaskCluster
dask_cluster = DaskCluster(
cluster_type="slurm",
account="XYZ",
jobs=(3, 3),
cores=4,
processes=2,
memory=12,
walltime=10,
interface="ib0",
log_directory="./dask_stuff",
# Do not set this if outside jupyterhub!
jupyterhub=True,
)
client = dask_cluster.client
ad.compute_dask(dask_client=client, jobs_number=100, freq="pass")
# Remember to close your cluster and free its resources!
dask_cluster.close()
import dask
import dask_jobqueue as djq
# Do not set this if outside jupyterhub!
dashboard_link = "/user/{JUPYTERHUB_USER}/proxy/{port}/status"
_ = dask.config.set({"distributed.dashboard.link": dashboard_link})
dask_cluster = djq.SLURMCluster(
account="XYZ",
cores=4,
processes=2,
memory="12GiB",
walltime="00:10:00",
interface="ib0",
local_directory="$TMPDIR",
log_directory="./dask_stuff",
)
djq.adapt(minimum_jobs=3, maximum_jobs=3)
client = djq.Client(dask_cluster)
ad.compute_dask(dask_client=client, jobs_number=100, freq="pass")
# Remember to close your cluster and free its resources!
dask_cluster.close()
Computed data access
list_diagnostics()
method.List the subset of diagnostics' respecting provided criteria.
Parameters
----------
dtype
Limit diagnostics list to provided type.
Data types are:
- RAW
- RAW_COMPARISON
- EDITING
- GEOBOX
- TEMPORAL
- BINNED
- BINNED_2D
- HISTOGRAM
- SCATTER
- RATIO
- CROSSOVER
- MISSING_POINTS
- SECTION_ANALYSES
containing
Limit to diagnostic names containing this element.
computed
Limited diagnostics list based on their computation status:
- [Default] None: All diagnostics
- False: Non-computed diagnostics
- True: Computed diagnostics
freq
Limit to diagnostics compatible with a dask computation at the provided
frequency.
Returns
-------
:
List of diagnostic names.
ad.list_diagnostics(dtype="temporal", containing="sla")
['SLA pass', 'SLA (6h)']
Diagnostic
containing its specification and results.get_diagnostic()
and get_diagnostics()
methods.Get the requested diagnostic data container.
If name is not an existing diagnostic, a new raw or raw_comparison diagnostic
will be generated with the provided parameters.
Parameters
----------
name
Name of the diagnostic.
field
Field for which to create a raw_data diagnostic.
x
Field to use as x-axis (raw comparison diagnostic).
y
Field to use as y-axis (raw comparison diagnostic).
z
Field to use as z-axis (raw comparison 3d diagnostic).
kwargs
Additional read_data parameters.
Returns
-------
:
Diagnostic data container.
using the
data
attribute for raw datausing the
get_data()
method for individual diagnostics datausing the
get_data_group()
method for a whole group of diagnostics.
Raw data access.
ad.data
<xarray.Dataset> Size: 33MB Dimensions: (time: 582789) Coordinates: * time (time) datetime64[ns] 5MB 2019-06-01T05:30:29.807497 ... ... Data variables: LONGITUDE (time) float64 5MB 17.01 17.14 17.27 ... -123.5 -123.3 LATITUDE (time) float64 5MB -66.15 -66.15 -66.15 ... 66.15 66.15 wind (time) float64 5MB nan nan nan nan nan ... nan nan nan nan SLA (time) float64 5MB nan nan nan nan nan ... nan nan nan nan SIGMA0.ALTI (time) float64 5MB nan nan nan nan nan ... nan nan nan nan EDITING.CUSTOM (time) int64 5MB 1 1 1 1 1 1 2 1 1 1 ... 1 1 1 1 1 1 1 1 1 1
Individual diagnostic access.
ad.get_data(name="SLA pass", stat="mean")
<xarray.Dataset> Size: 3kB Dimensions: (time: 175) Coordinates: * time (time) datetime64[ns] 1kB 2019-06-01T05:58:35.692381 ... 2019-06... Data variables: SLA (time) float64 1kB -2.283 -0.2079 -2.382 ... 1.91 -2.641 -2.712
Some diagnostics (section analysis here) might have custom outputs.
ad.get_data(name="Analysis 1")
First position | Last position | |||||||||
---|---|---|---|---|---|---|---|---|---|---|
# | Cycle | Pass | Date | Longitude | Latitude | Date | Longitude | Latitude | Length | False % |
1 | 122 | 22 - 23 | 2019-06-02T02:04:53 | 47.98 | -65.09 | 2019-06-02T02:11:56 | 97.61 | -62.04 | 416 | 1.92% |
2 | 122 | 42 - 43 | 2019-06-02T20:47:21 | 112.56 | -62.90 | 2019-06-02T20:54:08 | 161.09 | -64.80 | 401 | 1.50% |
3 | 122 | 98 - 99 | 2019-06-05T01:17:23 | 52.07 | -65.25 | 2019-06-05T01:24:09 | 99.87 | -62.20 | 400 | 0.00% |
4 | 122 | 170 - 171 | 2019-06-07T20:41:41 | 91.96 | -61.05 | 2019-06-07T20:50:53 | 155.09 | -63.29 | 543 | 1.84% |
5 | 122 | 174 - 175 | 2019-06-08T00:29:23 | 52.61 | -64.96 | 2019-06-08T00:36:33 | 103.19 | -62.07 | 424 | 0.00% |
Group of diagnostics access.
ad.get_data_group(name="SLA pass", stat="mean")
<xarray.Dataset> Size: 4kB Dimensions: (x: 175) Coordinates: * x (x) datetime64[ns] 1kB 2019-06-01T05:58:35.692381 ... 2019-0... Data variables: SIGMA0.ALTI (x) float64 1kB 13.99 13.52 13.77 13.81 ... 13.93 13.69 14.82 SLA (x) float64 1kB -2.283 -0.2079 -2.382 ... 1.91 -2.641 -2.712
ad.get_data_group(name="Missing points 1", stat="value")
{MISSING_POINTS: <xarray.Dataset> Size: 19MB
Dimensions: (time: 586885)
Coordinates:
* time (time) datetime64[ns] 5MB 2019-06-01T05:30:29.807497 ... 2019-...
Data variables:
LONGITUDE (time) float64 5MB 17.01 17.14 17.27 17.39 ... 236.3 236.4 236.5
LATITUDE (time) float64 5MB -66.15 -66.15 -66.15 ... 66.15 66.15 66.15
missing (time) bool 587kB False False False False ... False False False
mp_groups (time) int64 5MB 1 1 1 1 1 1 1 1 1 1 1 ... 0 0 0 0 0 0 0 0 0 0 0,
GEOBOX: {'GLOBAL': None, 'LAND': None, 'OCEAN': None},
TEMPORAL: {'GLOBAL': {'PASS': None},
'LAND': {'PASS': None},
'OCEAN': {'PASS': None}},
SECTION_ANALYSES: {}}
Editing computed data and adding new ones
data
property.