Source code for redis.commands.timeseries.commands

from typing import Dict, List, Optional, Tuple, Union

from redis.exceptions import DataError
from redis.typing import KeyT, Number


[docs]class TimeSeriesCommands: """RedisTimeSeries Commands."""
[docs] def create( self, key: KeyT, retention_msecs: Optional[int] = None, uncompressed: Optional[bool] = False, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, ): """ Create a new time-series. Args: key: time-series key retention_msecs: Maximum age for samples compared to highest reported timestamp (in milliseconds). If None or 0 is passed then the series is not trimmed at all. uncompressed: Changes data storage from compressed (by default) to uncompressed labels: Set of label-value pairs that represent metadata labels of the key. chunk_size: Memory size, in bytes, allocated for each data chunk. Must be a multiple of 8 in the range [128 .. 1048576]. duplicate_policy: Policy for handling multiple samples with identical timestamps. Can be one of: - 'block': an error will occur for any out of order sample. - 'first': ignore the new value. - 'last': override with latest value. - 'min': only override if the value is lower than the existing value. - 'max': only override if the value is higher than the existing value. For more information: """ # noqa params = [key] self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) self._append_duplicate_policy(params, CREATE_CMD, duplicate_policy) self._append_labels(params, labels) return self.execute_command(CREATE_CMD, *params)
[docs] def alter( self, key: KeyT, retention_msecs: Optional[int] = None, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, ): """ Update the retention, chunk size, duplicate policy, and labels of an existing time series. Args: key: time-series key retention_msecs: Maximum retention period, compared to maximal existing timestamp (in milliseconds). If None or 0 is passed then the series is not trimmed at all. labels: Set of label-value pairs that represent metadata labels of the key. chunk_size: Memory size, in bytes, allocated for each data chunk. Must be a multiple of 8 in the range [128 .. 1048576]. duplicate_policy: Policy for handling multiple samples with identical timestamps. Can be one of: - 'block': an error will occur for any out of order sample. - 'first': ignore the new value. - 'last': override with latest value. - 'min': only override if the value is lower than the existing value. - 'max': only override if the value is higher than the existing value. For more information: """ # noqa params = [key] self._append_retention(params, retention_msecs) self._append_chunk_size(params, chunk_size) self._append_duplicate_policy(params, ALTER_CMD, duplicate_policy) self._append_labels(params, labels) return self.execute_command(ALTER_CMD, *params)
[docs] def add( self, key: KeyT, timestamp: Union[int, str], value: Number, retention_msecs: Optional[int] = None, uncompressed: Optional[bool] = False, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, ): """ Append (or create and append) a new sample to a time series. Args: key: time-series key timestamp: Timestamp of the sample. * can be used for automatic timestamp (using the system clock). value: Numeric data value of the sample retention_msecs: Maximum retention period, compared to maximal existing timestamp (in milliseconds). If None or 0 is passed then the series is not trimmed at all. uncompressed: Changes data storage from compressed (by default) to uncompressed labels: Set of label-value pairs that represent metadata labels of the key. chunk_size: Memory size, in bytes, allocated for each data chunk. Must be a multiple of 8 in the range [128 .. 1048576]. duplicate_policy: Policy for handling multiple samples with identical timestamps. Can be one of: - 'block': an error will occur for any out of order sample. - 'first': ignore the new value. - 'last': override with latest value. - 'min': only override if the value is lower than the existing value. - 'max': only override if the value is higher than the existing value. For more information: """ # noqa params = [key, timestamp, value] self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) self._append_duplicate_policy(params, ADD_CMD, duplicate_policy) self._append_labels(params, labels) return self.execute_command(ADD_CMD, *params)
[docs] def madd(self, ktv_tuples: List[Tuple[KeyT, Union[int, str], Number]]): """ Append (or create and append) a new `value` to series `key` with `timestamp`. Expects a list of `tuples` as (`key`,`timestamp`, `value`). Return value is an array with timestamps of insertions. For more information: """ # noqa params = [] for ktv in ktv_tuples: params.extend(ktv) return self.execute_command(MADD_CMD, *params)
[docs] def incrby( self, key: KeyT, value: Number, timestamp: Optional[Union[int, str]] = None, retention_msecs: Optional[int] = None, uncompressed: Optional[bool] = False, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, ): """ Increment (or create an time-series and increment) the latest sample's of a series. This command can be used as a counter or gauge that automatically gets history as a time series. Args: key: time-series key value: Numeric data value of the sample timestamp: Timestamp of the sample. * can be used for automatic timestamp (using the system clock). retention_msecs: Maximum age for samples compared to last event time (in milliseconds). If None or 0 is passed then the series is not trimmed at all. uncompressed: Changes data storage from compressed (by default) to uncompressed labels: Set of label-value pairs that represent metadata labels of the key. chunk_size: Memory size, in bytes, allocated for each data chunk. For more information: """ # noqa params = [key, value] self._append_timestamp(params, timestamp) self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) self._append_labels(params, labels) return self.execute_command(INCRBY_CMD, *params)
[docs] def decrby( self, key: KeyT, value: Number, timestamp: Optional[Union[int, str]] = None, retention_msecs: Optional[int] = None, uncompressed: Optional[bool] = False, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, ): """ Decrement (or create an time-series and decrement) the latest sample's of a series. This command can be used as a counter or gauge that automatically gets history as a time series. Args: key: time-series key value: Numeric data value of the sample timestamp: Timestamp of the sample. * can be used for automatic timestamp (using the system clock). retention_msecs: Maximum age for samples compared to last event time (in milliseconds). If None or 0 is passed then the series is not trimmed at all. uncompressed: Changes data storage from compressed (by default) to uncompressed labels: Set of label-value pairs that represent metadata labels of the key. chunk_size: Memory size, in bytes, allocated for each data chunk. For more information: """ # noqa params = [key, value] self._append_timestamp(params, timestamp) self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) self._append_labels(params, labels) return self.execute_command(DECRBY_CMD, *params)
[docs] def delete(self, key: KeyT, from_time: int, to_time: int): """ Delete all samples between two timestamps for a given time series. Args: key: time-series key. from_time: Start timestamp for the range deletion. to_time: End timestamp for the range deletion. For more information: """ # noqa return self.execute_command(DEL_CMD, key, from_time, to_time)
[docs] def createrule( self, source_key: KeyT, dest_key: KeyT, aggregation_type: str, bucket_size_msec: int, align_timestamp: Optional[int] = None, ): """ Create a compaction rule from values added to `source_key` into `dest_key`. Args: source_key: Key name for source time series dest_key: Key name for destination (compacted) time series aggregation_type: Aggregation type: One of the following: [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] bucket_size_msec: Duration of each bucket, in milliseconds align_timestamp: Assure that there is a bucket that starts at exactly align_timestamp and align all other buckets accordingly. For more information: """ # noqa params = [source_key, dest_key] self._append_aggregation(params, aggregation_type, bucket_size_msec) if align_timestamp is not None: params.append(align_timestamp) return self.execute_command(CREATERULE_CMD, *params)
[docs] def deleterule(self, source_key: KeyT, dest_key: KeyT): """ Delete a compaction rule from `source_key` to `dest_key`.. For more information: """ # noqa return self.execute_command(DELETERULE_CMD, source_key, dest_key)
def __range_params( self, key: KeyT, from_time: Union[int, str], to_time: Union[int, str], count: Optional[int], aggregation_type: Optional[str], bucket_size_msec: Optional[int], filter_by_ts: Optional[List[int]], filter_by_min_value: Optional[int], filter_by_max_value: Optional[int], align: Optional[Union[int, str]], latest: Optional[bool], bucket_timestamp: Optional[str], empty: Optional[bool], ): """Create TS.RANGE and TS.REVRANGE arguments.""" params = [key, from_time, to_time] self._append_latest(params, latest) self._append_filer_by_ts(params, filter_by_ts) self._append_filer_by_value(params, filter_by_min_value, filter_by_max_value) self._append_count(params, count) self._append_align(params, align) self._append_aggregation(params, aggregation_type, bucket_size_msec) self._append_bucket_timestamp(params, bucket_timestamp) self._append_empty(params, empty) return params
[docs] def range( self, key: KeyT, from_time: Union[int, str], to_time: Union[int, str], count: Optional[int] = None, aggregation_type: Optional[str] = None, bucket_size_msec: Optional[int] = 0, filter_by_ts: Optional[List[int]] = None, filter_by_min_value: Optional[int] = None, filter_by_max_value: Optional[int] = None, align: Optional[Union[int, str]] = None, latest: Optional[bool] = False, bucket_timestamp: Optional[str] = None, empty: Optional[bool] = False, ): """ Query a range in forward direction for a specific time-serie. Args: key: Key name for timeseries. from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. count: Limits the number of returned samples. aggregation_type: Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] bucket_size_msec: Time bucket for aggregation in milliseconds. filter_by_ts: List of timestamps to filter the result by specific timestamps. filter_by_min_value: Filter result by minimum value (must mention also filter by_max_value). filter_by_max_value: Filter result by maximum value (must mention also filter by_min_value). align: Timestamp for alignment control for aggregation. latest: Used when a time series is a compaction, reports the compacted value of the latest possibly partial bucket bucket_timestamp: Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, `high`, `~`, `mid`]. empty: Reports aggregations for empty buckets. For more information: """ # noqa params = self.__range_params( key, from_time, to_time, count, aggregation_type, bucket_size_msec, filter_by_ts, filter_by_min_value, filter_by_max_value, align, latest, bucket_timestamp, empty, ) return self.execute_command(RANGE_CMD, *params)
[docs] def revrange( self, key: KeyT, from_time: Union[int, str], to_time: Union[int, str], count: Optional[int] = None, aggregation_type: Optional[str] = None, bucket_size_msec: Optional[int] = 0, filter_by_ts: Optional[List[int]] = None, filter_by_min_value: Optional[int] = None, filter_by_max_value: Optional[int] = None, align: Optional[Union[int, str]] = None, latest: Optional[bool] = False, bucket_timestamp: Optional[str] = None, empty: Optional[bool] = False, ): """ Query a range in reverse direction for a specific time-series. **Note**: This command is only available since RedisTimeSeries >= v1.4 Args: key: Key name for timeseries. from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. count: Limits the number of returned samples. aggregation_type: Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] bucket_size_msec: Time bucket for aggregation in milliseconds. filter_by_ts: List of timestamps to filter the result by specific timestamps. filter_by_min_value: Filter result by minimum value (must mention also filter_by_max_value). filter_by_max_value: Filter result by maximum value (must mention also filter_by_min_value). align: Timestamp for alignment control for aggregation. latest: Used when a time series is a compaction, reports the compacted value of the latest possibly partial bucket bucket_timestamp: Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, `high`, `~`, `mid`]. empty: Reports aggregations for empty buckets. For more information: """ # noqa params = self.__range_params( key, from_time, to_time, count, aggregation_type, bucket_size_msec, filter_by_ts, filter_by_min_value, filter_by_max_value, align, latest, bucket_timestamp, empty, ) return self.execute_command(REVRANGE_CMD, *params)
def __mrange_params( self, aggregation_type: Optional[str], bucket_size_msec: Optional[int], count: Optional[int], filters: List[str], from_time: Union[int, str], to_time: Union[int, str], with_labels: Optional[bool], filter_by_ts: Optional[List[int]], filter_by_min_value: Optional[int], filter_by_max_value: Optional[int], groupby: Optional[str], reduce: Optional[str], select_labels: Optional[List[str]], align: Optional[Union[int, str]], latest: Optional[bool], bucket_timestamp: Optional[str], empty: Optional[bool], ): """Create TS.MRANGE and TS.MREVRANGE arguments.""" params = [from_time, to_time] self._append_latest(params, latest) self._append_filer_by_ts(params, filter_by_ts) self._append_filer_by_value(params, filter_by_min_value, filter_by_max_value) self._append_with_labels(params, with_labels, select_labels) self._append_count(params, count) self._append_align(params, align) self._append_aggregation(params, aggregation_type, bucket_size_msec) self._append_bucket_timestamp(params, bucket_timestamp) self._append_empty(params, empty) params.extend(["FILTER"]) params += filters self._append_groupby_reduce(params, groupby, reduce) return params
[docs] def mrange( self, from_time: Union[int, str], to_time: Union[int, str], filters: List[str], count: Optional[int] = None, aggregation_type: Optional[str] = None, bucket_size_msec: Optional[int] = 0, with_labels: Optional[bool] = False, filter_by_ts: Optional[List[int]] = None, filter_by_min_value: Optional[int] = None, filter_by_max_value: Optional[int] = None, groupby: Optional[str] = None, reduce: Optional[str] = None, select_labels: Optional[List[str]] = None, align: Optional[Union[int, str]] = None, latest: Optional[bool] = False, bucket_timestamp: Optional[str] = None, empty: Optional[bool] = False, ): """ Query a range across multiple time-series by filters in forward direction. Args: from_time: Start timestamp for the range query. `-` can be used to express the minimum possible timestamp (0). to_time: End timestamp for range query, `+` can be used to express the maximum possible timestamp. filters: filter to match the time-series labels. count: Limits the number of returned samples. aggregation_type: Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] bucket_size_msec: Time bucket for aggregation in milliseconds. with_labels: Include in the reply all label-value pairs representing metadata labels of the time series. filter_by_ts: List of timestamps to filter the result by specific timestamps. filter_by_min_value: Filter result by minimum value (must mention also filter_by_max_value). filter_by_max_value: Filter result by maximum value (must mention also filter_by_min_value). groupby: Grouping by fields the results (must mention also reduce). reduce: Applying reducer functions on each group. Can be one of [`avg` `sum`, `min`, `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. select_labels: Include in the reply only a subset of the key-value pair labels of a series. align: Timestamp for alignment control for aggregation. latest: Used when a time series is a compaction, reports the compacted value of the latest possibly partial bucket bucket_timestamp: Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, `high`, `~`, `mid`]. empty: Reports aggregations for empty buckets. For more information: """ # noqa params = self.__mrange_params( aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels, filter_by_ts, filter_by_min_value, filter_by_max_value, groupby, reduce, select_labels, align, latest, bucket_timestamp, empty, ) return self.execute_command(MRANGE_CMD, *params)
[docs] def mrevrange( self, from_time: Union[int, str], to_time: Union[int, str], filters: List[str], count: Optional[int] = None, aggregation_type: Optional[str] = None, bucket_size_msec: Optional[int] = 0, with_labels: Optional[bool] = False, filter_by_ts: Optional[List[int]] = None, filter_by_min_value: Optional[int] = None, filter_by_max_value: Optional[int] = None, groupby: Optional[str] = None, reduce: Optional[str] = None, select_labels: Optional[List[str]] = None, align: Optional[Union[int, str]] = None, latest: Optional[bool] = False, bucket_timestamp: Optional[str] = None, empty: Optional[bool] = False, ): """ Query a range across multiple time-series by filters in reverse direction. Args: from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. filters: Filter to match the time-series labels. count: Limits the number of returned samples. aggregation_type: Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] bucket_size_msec: Time bucket for aggregation in milliseconds. with_labels: Include in the reply all label-value pairs representing metadata labels of the time series. filter_by_ts: List of timestamps to filter the result by specific timestamps. filter_by_min_value: Filter result by minimum value (must mention also filter_by_max_value). filter_by_max_value: Filter result by maximum value (must mention also filter_by_min_value). groupby: Grouping by fields the results (must mention also reduce). reduce: Applying reducer functions on each group. Can be one of [`avg` `sum`, `min`, `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. select_labels: Include in the reply only a subset of the key-value pair labels of a series. align: Timestamp for alignment control for aggregation. latest: Used when a time series is a compaction, reports the compacted value of the latest possibly partial bucket bucket_timestamp: Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, `high`, `~`, `mid`]. empty: Reports aggregations for empty buckets. For more information: """ # noqa params = self.__mrange_params( aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels, filter_by_ts, filter_by_min_value, filter_by_max_value, groupby, reduce, select_labels, align, latest, bucket_timestamp, empty, ) return self.execute_command(MREVRANGE_CMD, *params)
[docs] def get(self, key: KeyT, latest: Optional[bool] = False): """# noqa Get the last sample of `key`. `latest` used when a time series is a compaction, reports the compacted value of the latest (possibly partial) bucket For more information: """ # noqa params = [key] self._append_latest(params, latest) return self.execute_command(GET_CMD, *params)
[docs] def mget( self, filters: List[str], with_labels: Optional[bool] = False, select_labels: Optional[List[str]] = None, latest: Optional[bool] = False, ): """# noqa Get the last samples matching the specific `filter`. Args: filters: Filter to match the time-series labels. with_labels: Include in the reply all label-value pairs representing metadata labels of the time series. select_labels: Include in the reply only a subset of the key-value pair labels of a series. latest: Used when a time series is a compaction, reports the compacted value of the latest possibly partial bucket For more information: """ # noqa params = [] self._append_latest(params, latest) self._append_with_labels(params, with_labels, select_labels) params.extend(["FILTER"]) params += filters return self.execute_command(MGET_CMD, *params)
[docs] def info(self, key: KeyT): """# noqa Get information of `key`. For more information: """ # noqa return self.execute_command(INFO_CMD, key)
[docs] def queryindex(self, filters: List[str]): """# noqa Get all time series keys matching the `filter` list. For more information: """ # noq return self.execute_command(QUERYINDEX_CMD, *filters)
@staticmethod def _append_uncompressed(params: List[str], uncompressed: Optional[bool]): """Append UNCOMPRESSED tag to params.""" if uncompressed: params.extend(["UNCOMPRESSED"]) @staticmethod def _append_with_labels( params: List[str], with_labels: Optional[bool], select_labels: Optional[List[str]], ): """Append labels behavior to params.""" if with_labels and select_labels: raise DataError( "with_labels and select_labels cannot be provided together." ) if with_labels: params.extend(["WITHLABELS"]) if select_labels: params.extend(["SELECTED_LABELS", *select_labels]) @staticmethod def _append_groupby_reduce( params: List[str], groupby: Optional[str], reduce: Optional[str] ): """Append GROUPBY REDUCE property to params.""" if groupby is not None and reduce is not None: params.extend(["GROUPBY", groupby, "REDUCE", reduce.upper()]) @staticmethod def _append_retention(params: List[str], retention: Optional[int]): """Append RETENTION property to params.""" if retention is not None: params.extend(["RETENTION", retention]) @staticmethod def _append_labels(params: List[str], labels: Optional[List[str]]): """Append LABELS property to params.""" if labels: params.append("LABELS") for k, v in labels.items(): params.extend([k, v]) @staticmethod def _append_count(params: List[str], count: Optional[int]): """Append COUNT property to params.""" if count is not None: params.extend(["COUNT", count]) @staticmethod def _append_timestamp(params: List[str], timestamp: Optional[int]): """Append TIMESTAMP property to params.""" if timestamp is not None: params.extend(["TIMESTAMP", timestamp]) @staticmethod def _append_align(params: List[str], align: Optional[Union[int, str]]): """Append ALIGN property to params.""" if align is not None: params.extend(["ALIGN", align]) @staticmethod def _append_aggregation( params: List[str], aggregation_type: Optional[str], bucket_size_msec: Optional[int], ): """Append AGGREGATION property to params.""" if aggregation_type is not None: params.extend(["AGGREGATION", aggregation_type, bucket_size_msec]) @staticmethod def _append_chunk_size(params: List[str], chunk_size: Optional[int]): """Append CHUNK_SIZE property to params.""" if chunk_size is not None: params.extend(["CHUNK_SIZE", chunk_size]) @staticmethod def _append_duplicate_policy( params: List[str], command: Optional[str], duplicate_policy: Optional[str] ): """Append DUPLICATE_POLICY property to params on CREATE and ON_DUPLICATE on ADD. """ if duplicate_policy is not None: if command == "TS.ADD": params.extend(["ON_DUPLICATE", duplicate_policy]) else: params.extend(["DUPLICATE_POLICY", duplicate_policy]) @staticmethod def _append_filer_by_ts(params: List[str], ts_list: Optional[List[int]]): """Append FILTER_BY_TS property to params.""" if ts_list is not None: params.extend(["FILTER_BY_TS", *ts_list]) @staticmethod def _append_filer_by_value( params: List[str], min_value: Optional[int], max_value: Optional[int] ): """Append FILTER_BY_VALUE property to params.""" if min_value is not None and max_value is not None: params.extend(["FILTER_BY_VALUE", min_value, max_value]) @staticmethod def _append_latest(params: List[str], latest: Optional[bool]): """Append LATEST property to params.""" if latest: params.append("LATEST") @staticmethod def _append_bucket_timestamp(params: List[str], bucket_timestamp: Optional[str]): """Append BUCKET_TIMESTAMP property to params.""" if bucket_timestamp is not None: params.extend(["BUCKETTIMESTAMP", bucket_timestamp]) @staticmethod def _append_empty(params: List[str], empty: Optional[bool]): """Append EMPTY property to params.""" if empty: params.append("EMPTY")