import itertools
import time
from typing import Dict, List, Optional, Union
from redis.client import Pipeline
from redis.utils import deprecated_function
from ..helpers import get_protocol_version, parse_to_dict
from ._util import to_string
from .aggregation import AggregateRequest, AggregateResult, Cursor
from .document import Document
from .query import Query
from .result import Result
from .suggestion import SuggestionParser
NUMERIC = "NUMERIC"
CREATE_CMD = "FT.CREATE"
ALTER_CMD = "FT.ALTER"
SEARCH_CMD = "FT.SEARCH"
ADD_CMD = "FT.ADD"
ADDHASH_CMD = "FT.ADDHASH"
DROP_CMD = "FT.DROP"
DROPINDEX_CMD = "FT.DROPINDEX"
EXPLAIN_CMD = "FT.EXPLAIN"
EXPLAINCLI_CMD = "FT.EXPLAINCLI"
DEL_CMD = "FT.DEL"
AGGREGATE_CMD = "FT.AGGREGATE"
PROFILE_CMD = "FT.PROFILE"
CURSOR_CMD = "FT.CURSOR"
SPELLCHECK_CMD = "FT.SPELLCHECK"
DICT_ADD_CMD = "FT.DICTADD"
DICT_DEL_CMD = "FT.DICTDEL"
DICT_DUMP_CMD = "FT.DICTDUMP"
GET_CMD = "FT.GET"
MGET_CMD = "FT.MGET"
CONFIG_CMD = "FT.CONFIG"
TAGVALS_CMD = "FT.TAGVALS"
ALIAS_ADD_CMD = "FT.ALIASADD"
ALIAS_UPDATE_CMD = "FT.ALIASUPDATE"
ALIAS_DEL_CMD = "FT.ALIASDEL"
INFO_CMD = "FT.INFO"
SUGADD_COMMAND = "FT.SUGADD"
SUGDEL_COMMAND = "FT.SUGDEL"
SUGLEN_COMMAND = "FT.SUGLEN"
SUGGET_COMMAND = "FT.SUGGET"
SYNUPDATE_CMD = "FT.SYNUPDATE"
SYNDUMP_CMD = "FT.SYNDUMP"
NOOFFSETS = "NOOFFSETS"
NOFIELDS = "NOFIELDS"
NOHL = "NOHL"
NOFREQS = "NOFREQS"
MAXTEXTFIELDS = "MAXTEXTFIELDS"
TEMPORARY = "TEMPORARY"
STOPWORDS = "STOPWORDS"
SKIPINITIALSCAN = "SKIPINITIALSCAN"
WITHSCORES = "WITHSCORES"
FUZZY = "FUZZY"
WITHPAYLOADS = "WITHPAYLOADS"
[docs]class SearchCommands:
"""Search commands."""
def _parse_results(self, cmd, res, **kwargs):
if get_protocol_version(self.client) in ["3", 3]:
return res
else:
return self._RESP2_MODULE_CALLBACKS[cmd](res, **kwargs)
def _parse_info(self, res, **kwargs):
it = map(to_string, res)
return dict(zip(it, it))
def _parse_search(self, res, **kwargs):
return Result(
res,
not kwargs["query"]._no_content,
duration=kwargs["duration"],
has_payload=kwargs["query"]._with_payloads,
with_scores=kwargs["query"]._with_scores,
)
def _parse_aggregate(self, res, **kwargs):
return self._get_aggregate_result(res, kwargs["query"], kwargs["has_cursor"])
def _parse_profile(self, res, **kwargs):
query = kwargs["query"]
if isinstance(query, AggregateRequest):
result = self._get_aggregate_result(res[0], query, query._cursor)
else:
result = Result(
res[0],
not query._no_content,
duration=kwargs["duration"],
has_payload=query._with_payloads,
with_scores=query._with_scores,
)
return result, parse_to_dict(res[1])
def _parse_spellcheck(self, res, **kwargs):
corrections = {}
if res == 0:
return corrections
for _correction in res:
if isinstance(_correction, int) and _correction == 0:
continue
if len(_correction) != 3:
continue
if not _correction[2]:
continue
if not _correction[2][0]:
continue
# For spellcheck output
# 1) 1) "TERM"
# 2) "{term1}"
# 3) 1) 1) "{score1}"
# 2) "{suggestion1}"
# 2) 1) "{score2}"
# 2) "{suggestion2}"
#
# Following dictionary will be made
# corrections = {
# '{term1}': [
# {'score': '{score1}', 'suggestion': '{suggestion1}'},
# {'score': '{score2}', 'suggestion': '{suggestion2}'}
# ]
# }
corrections[_correction[1]] = [
{"score": _item[0], "suggestion": _item[1]} for _item in _correction[2]
]
return corrections
def _parse_config_get(self, res, **kwargs):
return {kvs[0]: kvs[1] for kvs in res} if res else {}
def _parse_syndump(self, res, **kwargs):
return {res[i]: res[i + 1] for i in range(0, len(res), 2)}
[docs] def batch_indexer(self, chunk_size=100):
"""
Create a new batch indexer from the client with a given chunk size
"""
return self.BatchIndexer(self, chunk_size=chunk_size)
[docs] def create_index(
self,
fields,
no_term_offsets=False,
no_field_flags=False,
stopwords=None,
definition=None,
max_text_fields=False,
temporary=None,
no_highlight=False,
no_term_frequencies=False,
skip_initial_scan=False,
):
"""
Create the search index. The index must not already exist.
### Parameters:
- **fields**: a list of TextField or NumericField objects
- **no_term_offsets**: If true, we will not save term offsets in
the index
- **no_field_flags**: If true, we will not save field flags that
allow searching in specific fields
- **stopwords**: If not None, we create the index with this custom
stopword list. The list can be empty
- **max_text_fields**: If true, we will encode indexes as if there
were more than 32 text fields which allows you to add additional
fields (beyond 32).
- **temporary**: Create a lightweight temporary index which will
expire after the specified period of inactivity (in seconds). The
internal idle timer is reset whenever the index is searched or added to.
- **no_highlight**: If true, disabling highlighting support.
Also implied by no_term_offsets.
- **no_term_frequencies**: If true, we avoid saving the term frequencies
in the index.
- **skip_initial_scan**: If true, we do not scan and index.
For more information see `FT.CREATE <https://redis.io/commands/ft.create>`_.
""" # noqa
args = [CREATE_CMD, self.index_name]
if definition is not None:
args += definition.args
if max_text_fields:
args.append(MAXTEXTFIELDS)
if temporary is not None and isinstance(temporary, int):
args.append(TEMPORARY)
args.append(temporary)
if no_term_offsets:
args.append(NOOFFSETS)
if no_highlight:
args.append(NOHL)
if no_field_flags:
args.append(NOFIELDS)
if no_term_frequencies:
args.append(NOFREQS)
if skip_initial_scan:
args.append(SKIPINITIALSCAN)
if stopwords is not None and isinstance(stopwords, (list, tuple, set)):
args += [STOPWORDS, len(stopwords)]
if len(stopwords) > 0:
args += list(stopwords)
args.append("SCHEMA")
try:
args += list(itertools.chain(*(f.redis_args() for f in fields)))
except TypeError:
args += fields.redis_args()
return self.execute_command(*args)
[docs] def alter_schema_add(self, fields: List[str]):
"""
Alter the existing search index by adding new fields. The index
must already exist.
### Parameters:
- **fields**: a list of Field objects to add for the index
For more information see `FT.ALTER <https://redis.io/commands/ft.alter>`_.
""" # noqa
args = [ALTER_CMD, self.index_name, "SCHEMA", "ADD"]
try:
args += list(itertools.chain(*(f.redis_args() for f in fields)))
except TypeError:
args += fields.redis_args()
return self.execute_command(*args)
[docs] def dropindex(self, delete_documents: bool = False):
"""
Drop the index if it exists.
Replaced `drop_index` in RediSearch 2.0.
Default behavior was changed to not delete the indexed documents.
### Parameters:
- **delete_documents**: If `True`, all documents will be deleted.
For more information see `FT.DROPINDEX <https://redis.io/commands/ft.dropindex>`_.
""" # noqa
delete_str = "DD" if delete_documents else ""
return self.execute_command(DROPINDEX_CMD, self.index_name, delete_str)
def _add_document(
self,
doc_id,
conn=None,
nosave=False,
score=1.0,
payload=None,
replace=False,
partial=False,
language=None,
no_create=False,
**fields,
):
"""
Internal add_document used for both batch and single doc indexing
"""
if partial or no_create:
replace = True
args = [ADD_CMD, self.index_name, doc_id, score]
if nosave:
args.append("NOSAVE")
if payload is not None:
args.append("PAYLOAD")
args.append(payload)
if replace:
args.append("REPLACE")
if partial:
args.append("PARTIAL")
if no_create:
args.append("NOCREATE")
if language:
args += ["LANGUAGE", language]
args.append("FIELDS")
args += list(itertools.chain(*fields.items()))
if conn is not None:
return conn.execute_command(*args)
return self.execute_command(*args)
def _add_document_hash(
self, doc_id, conn=None, score=1.0, language=None, replace=False
):
"""
Internal add_document_hash used for both batch and single doc indexing
"""
args = [ADDHASH_CMD, self.index_name, doc_id, score]
if replace:
args.append("REPLACE")
if language:
args += ["LANGUAGE", language]
if conn is not None:
return conn.execute_command(*args)
return self.execute_command(*args)
[docs] @deprecated_function(
version="2.0.0", reason="deprecated since redisearch 2.0, call hset instead"
)
def add_document(
self,
doc_id: str,
nosave: bool = False,
score: float = 1.0,
payload: bool = None,
replace: bool = False,
partial: bool = False,
language: Optional[str] = None,
no_create: str = False,
**fields: List[str],
):
"""
Add a single document to the index.
### Parameters
- **doc_id**: the id of the saved document.
- **nosave**: if set to true, we just index the document, and don't
save a copy of it. This means that searches will just
return ids.
- **score**: the document ranking, between 0.0 and 1.0
- **payload**: optional inner-index payload we can save for fast
i access in scoring functions
- **replace**: if True, and the document already is in the index,
we perform an update and reindex the document
- **partial**: if True, the fields specified will be added to the
existing document.
This has the added benefit that any fields specified
with `no_index`
will not be reindexed again. Implies `replace`
- **language**: Specify the language used for document tokenization.
- **no_create**: if True, the document is only updated and reindexed
if it already exists.
If the document does not exist, an error will be
returned. Implies `replace`
- **fields** kwargs dictionary of the document fields to be saved
and/or indexed.
NOTE: Geo points shoule be encoded as strings of "lon,lat"
""" # noqa
return self._add_document(
doc_id,
conn=None,
nosave=nosave,
score=score,
payload=payload,
replace=replace,
partial=partial,
language=language,
no_create=no_create,
**fields,
)
[docs] @deprecated_function(
version="2.0.0", reason="deprecated since redisearch 2.0, call hset instead"
)
def add_document_hash(self, doc_id, score=1.0, language=None, replace=False):
"""
Add a hash document to the index.
### Parameters
- **doc_id**: the document's id. This has to be an existing HASH key
in Redis that will hold the fields the index needs.
- **score**: the document ranking, between 0.0 and 1.0
- **replace**: if True, and the document already is in the index, we
perform an update and reindex the document
- **language**: Specify the language used for document tokenization.
""" # noqa
return self._add_document_hash(
doc_id, conn=None, score=score, language=language, replace=replace
)
[docs] def delete_document(self, doc_id, conn=None, delete_actual_document=False):
"""
Delete a document from index
Returns 1 if the document was deleted, 0 if not
### Parameters
- **delete_actual_document**: if set to True, RediSearch also delete
the actual document if it is in the index
""" # noqa
args = [DEL_CMD, self.index_name, doc_id]
if delete_actual_document:
args.append("DD")
if conn is not None:
return conn.execute_command(*args)
return self.execute_command(*args)
[docs] def load_document(self, id):
"""
Load a single document by id
"""
fields = self.client.hgetall(id)
f2 = {to_string(k): to_string(v) for k, v in fields.items()}
fields = f2
try:
del fields["id"]
except KeyError:
pass
return Document(id=id, **fields)
[docs] def get(self, *ids):
"""
Returns the full contents of multiple documents.
### Parameters
- **ids**: the ids of the saved documents.
"""
return self.execute_command(MGET_CMD, self.index_name, *ids)
[docs] def info(self):
"""
Get info an stats about the the current index, including the number of
documents, memory consumption, etc
For more information see `FT.INFO <https://redis.io/commands/ft.info>`_.
"""
res = self.execute_command(INFO_CMD, self.index_name)
return self._parse_results(INFO_CMD, res)
def get_params_args(
self, query_params: Union[Dict[str, Union[str, int, float, bytes]], None]
):
if query_params is None:
return []
args = []
if len(query_params) > 0:
args.append("params")
args.append(len(query_params) * 2)
for key, value in query_params.items():
args.append(key)
args.append(value)
return args
def _mk_query_args(
self, query, query_params: Union[Dict[str, Union[str, int, float, bytes]], None]
):
args = [self.index_name]
if isinstance(query, str):
# convert the query from a text to a query object
query = Query(query)
if not isinstance(query, Query):
raise ValueError(f"Bad query type {type(query)}")
args += query.get_args()
args += self.get_params_args(query_params)
return args, query
[docs] def search(
self,
query: Union[str, Query],
query_params: Union[Dict[str, Union[str, int, float, bytes]], None] = None,
):
"""
Search the index for a given query, and return a result of documents
### Parameters
- **query**: the search query. Either a text for simple queries with
default parameters, or a Query object for complex queries.
See RediSearch's documentation on query format
For more information see `FT.SEARCH <https://redis.io/commands/ft.search>`_.
""" # noqa
args, query = self._mk_query_args(query, query_params=query_params)
st = time.time()
res = self.execute_command(SEARCH_CMD, *args)
if isinstance(res, Pipeline):
return res
return self._parse_results(
SEARCH_CMD, res, query=query, duration=(time.time() - st) * 1000.0
)
[docs] def explain(
self,
query: Union[str, Query],
query_params: Dict[str, Union[str, int, float]] = None,
):
"""Returns the execution plan for a complex query.
For more information see `FT.EXPLAIN <https://redis.io/commands/ft.explain>`_.
""" # noqa
args, query_text = self._mk_query_args(query, query_params=query_params)
return self.execute_command(EXPLAIN_CMD, *args)
def explain_cli(self, query: Union[str, Query]): # noqa
raise NotImplementedError("EXPLAINCLI will not be implemented.")
[docs] def aggregate(
self,
query: Union[str, Query],
query_params: Dict[str, Union[str, int, float]] = None,
):
"""
Issue an aggregation query.
### Parameters
**query**: This can be either an `AggregateRequest`, or a `Cursor`
An `AggregateResult` object is returned. You can access the rows from
its `rows` property, which will always yield the rows of the result.
For more information see `FT.AGGREGATE <https://redis.io/commands/ft.aggregate>`_.
""" # noqa
if isinstance(query, AggregateRequest):
has_cursor = bool(query._cursor)
cmd = [AGGREGATE_CMD, self.index_name] + query.build_args()
elif isinstance(query, Cursor):
has_cursor = True
cmd = [CURSOR_CMD, "READ", self.index_name] + query.build_args()
else:
raise ValueError("Bad query", query)
cmd += self.get_params_args(query_params)
raw = self.execute_command(*cmd)
return self._parse_results(
AGGREGATE_CMD, raw, query=query, has_cursor=has_cursor
)
def _get_aggregate_result(
self, raw: List, query: Union[str, Query, AggregateRequest], has_cursor: bool
):
if has_cursor:
if isinstance(query, Cursor):
query.cid = raw[1]
cursor = query
else:
cursor = Cursor(raw[1])
raw = raw[0]
else:
cursor = None
if isinstance(query, AggregateRequest) and query._with_schema:
schema = raw[0]
rows = raw[2:]
else:
schema = None
rows = raw[1:]
return AggregateResult(rows, cursor, schema)
[docs] def profile(
self,
query: Union[str, Query, AggregateRequest],
limited: bool = False,
query_params: Optional[Dict[str, Union[str, int, float]]] = None,
):
"""
Performs a search or aggregate command and collects performance
information.
### Parameters
**query**: This can be either an `AggregateRequest`, `Query` or string.
**limited**: If set to True, removes details of reader iterator.
**query_params**: Define one or more value parameters.
Each parameter has a name and a value.
"""
st = time.time()
cmd = [PROFILE_CMD, self.index_name, ""]
if limited:
cmd.append("LIMITED")
cmd.append("QUERY")
if isinstance(query, AggregateRequest):
cmd[2] = "AGGREGATE"
cmd += query.build_args()
elif isinstance(query, Query):
cmd[2] = "SEARCH"
cmd += query.get_args()
cmd += self.get_params_args(query_params)
else:
raise ValueError("Must provide AggregateRequest object or Query object.")
res = self.execute_command(*cmd)
return self._parse_results(
PROFILE_CMD, res, query=query, duration=(time.time() - st) * 1000.0
)
[docs] def spellcheck(self, query, distance=None, include=None, exclude=None):
"""
Issue a spellcheck query
### Parameters
**query**: search query.
**distance***: the maximal Levenshtein distance for spelling
suggestions (default: 1, max: 4).
**include**: specifies an inclusion custom dictionary.
**exclude**: specifies an exclusion custom dictionary.
For more information see `FT.SPELLCHECK <https://redis.io/commands/ft.spellcheck>`_.
""" # noqa
cmd = [SPELLCHECK_CMD, self.index_name, query]
if distance:
cmd.extend(["DISTANCE", distance])
if include:
cmd.extend(["TERMS", "INCLUDE", include])
if exclude:
cmd.extend(["TERMS", "EXCLUDE", exclude])
res = self.execute_command(*cmd)
return self._parse_results(SPELLCHECK_CMD, res)
[docs] def dict_add(self, name: str, *terms: List[str]):
"""Adds terms to a dictionary.
### Parameters
- **name**: Dictionary name.
- **terms**: List of items for adding to the dictionary.
For more information see `FT.DICTADD <https://redis.io/commands/ft.dictadd>`_.
""" # noqa
cmd = [DICT_ADD_CMD, name]
cmd.extend(terms)
return self.execute_command(*cmd)
[docs] def dict_del(self, name: str, *terms: List[str]):
"""Deletes terms from a dictionary.
### Parameters
- **name**: Dictionary name.
- **terms**: List of items for removing from the dictionary.
For more information see `FT.DICTDEL <https://redis.io/commands/ft.dictdel>`_.
""" # noqa
cmd = [DICT_DEL_CMD, name]
cmd.extend(terms)
return self.execute_command(*cmd)
[docs] def dict_dump(self, name: str):
"""Dumps all terms in the given dictionary.
### Parameters
- **name**: Dictionary name.
For more information see `FT.DICTDUMP <https://redis.io/commands/ft.dictdump>`_.
""" # noqa
cmd = [DICT_DUMP_CMD, name]
return self.execute_command(*cmd)
[docs] def config_set(self, option: str, value: str) -> bool:
"""Set runtime configuration option.
### Parameters
- **option**: the name of the configuration option.
- **value**: a value for the configuration option.
For more information see `FT.CONFIG SET <https://redis.io/commands/ft.config-set>`_.
""" # noqa
cmd = [CONFIG_CMD, "SET", option, value]
raw = self.execute_command(*cmd)
return raw == "OK"
[docs] def config_get(self, option: str) -> str:
"""Get runtime configuration option value.
### Parameters
- **option**: the name of the configuration option.
For more information see `FT.CONFIG GET <https://redis.io/commands/ft.config-get>`_.
""" # noqa
cmd = [CONFIG_CMD, "GET", option]
res = self.execute_command(*cmd)
return self._parse_results(CONFIG_CMD, res)
[docs] def tagvals(self, tagfield: str):
"""
Return a list of all possible tag values
### Parameters
- **tagfield**: Tag field name
For more information see `FT.TAGVALS <https://redis.io/commands/ft.tagvals>`_.
""" # noqa
return self.execute_command(TAGVALS_CMD, self.index_name, tagfield)
[docs] def aliasadd(self, alias: str):
"""
Alias a search index - will fail if alias already exists
### Parameters
- **alias**: Name of the alias to create
For more information see `FT.ALIASADD <https://redis.io/commands/ft.aliasadd>`_.
""" # noqa
return self.execute_command(ALIAS_ADD_CMD, alias, self.index_name)
[docs] def aliasupdate(self, alias: str):
"""
Updates an alias - will fail if alias does not already exist
### Parameters
- **alias**: Name of the alias to create
For more information see `FT.ALIASUPDATE <https://redis.io/commands/ft.aliasupdate>`_.
""" # noqa
return self.execute_command(ALIAS_UPDATE_CMD, alias, self.index_name)
[docs] def aliasdel(self, alias: str):
"""
Removes an alias to a search index
### Parameters
- **alias**: Name of the alias to delete
For more information see `FT.ALIASDEL <https://redis.io/commands/ft.aliasdel>`_.
""" # noqa
return self.execute_command(ALIAS_DEL_CMD, alias)
[docs] def sugadd(self, key, *suggestions, **kwargs):
"""
Add suggestion terms to the AutoCompleter engine. Each suggestion has
a score and string.
If kwargs["increment"] is true and the terms are already in the
server's dictionary, we increment their scores.
For more information see `FT.SUGADD <https://redis.io/commands/ft.sugadd/>`_.
""" # noqa
# If Transaction is not False it will MULTI/EXEC which will error
pipe = self.pipeline(transaction=False)
for sug in suggestions:
args = [SUGADD_COMMAND, key, sug.string, sug.score]
if kwargs.get("increment"):
args.append("INCR")
if sug.payload:
args.append("PAYLOAD")
args.append(sug.payload)
pipe.execute_command(*args)
return pipe.execute()[-1]
[docs] def suglen(self, key: str) -> int:
"""
Return the number of entries in the AutoCompleter index.
For more information see `FT.SUGLEN <https://redis.io/commands/ft.suglen>`_.
""" # noqa
return self.execute_command(SUGLEN_COMMAND, key)
[docs] def sugdel(self, key: str, string: str) -> int:
"""
Delete a string from the AutoCompleter index.
Returns 1 if the string was found and deleted, 0 otherwise.
For more information see `FT.SUGDEL <https://redis.io/commands/ft.sugdel>`_.
""" # noqa
return self.execute_command(SUGDEL_COMMAND, key, string)
[docs] def sugget(
self,
key: str,
prefix: str,
fuzzy: bool = False,
num: int = 10,
with_scores: bool = False,
with_payloads: bool = False,
) -> List[SuggestionParser]:
"""
Get a list of suggestions from the AutoCompleter, for a given prefix.
Parameters:
prefix : str
The prefix we are searching. **Must be valid ascii or utf-8**
fuzzy : bool
If set to true, the prefix search is done in fuzzy mode.
**NOTE**: Running fuzzy searches on short (<3 letters) prefixes
can be very
slow, and even scan the entire index.
with_scores : bool
If set to true, we also return the (refactored) score of
each suggestion.
This is normally not needed, and is NOT the original score
inserted into the index.
with_payloads : bool
Return suggestion payloads
num : int
The maximum number of results we return. Note that we might
return less. The algorithm trims irrelevant suggestions.
Returns:
list:
A list of Suggestion objects. If with_scores was False, the
score of all suggestions is 1.
For more information see `FT.SUGGET <https://redis.io/commands/ft.sugget>`_.
""" # noqa
args = [SUGGET_COMMAND, key, prefix, "MAX", num]
if fuzzy:
args.append(FUZZY)
if with_scores:
args.append(WITHSCORES)
if with_payloads:
args.append(WITHPAYLOADS)
res = self.execute_command(*args)
results = []
if not res:
return results
parser = SuggestionParser(with_scores, with_payloads, res)
return [s for s in parser]
[docs] def synupdate(self, groupid: str, skipinitial: bool = False, *terms: List[str]):
"""
Updates a synonym group.
The command is used to create or update a synonym group with
additional terms.
Only documents which were indexed after the update will be affected.
Parameters:
groupid :
Synonym group id.
skipinitial : bool
If set to true, we do not scan and index.
terms :
The terms.
For more information see `FT.SYNUPDATE <https://redis.io/commands/ft.synupdate>`_.
""" # noqa
cmd = [SYNUPDATE_CMD, self.index_name, groupid]
if skipinitial:
cmd.extend(["SKIPINITIALSCAN"])
cmd.extend(terms)
return self.execute_command(*cmd)
[docs] def syndump(self):
"""
Dumps the contents of a synonym group.
The command is used to dump the synonyms data structure.
Returns a list of synonym terms and their synonym group ids.
For more information see `FT.SYNDUMP <https://redis.io/commands/ft.syndump>`_.
""" # noqa
res = self.execute_command(SYNDUMP_CMD, self.index_name)
return self._parse_results(SYNDUMP_CMD, res)
class AsyncSearchCommands(SearchCommands):
async def info(self):
"""
Get info an stats about the the current index, including the number of
documents, memory consumption, etc
For more information see `FT.INFO <https://redis.io/commands/ft.info>`_.
"""
res = await self.execute_command(INFO_CMD, self.index_name)
return self._parse_results(INFO_CMD, res)
async def search(
self,
query: Union[str, Query],
query_params: Dict[str, Union[str, int, float]] = None,
):
"""
Search the index for a given query, and return a result of documents
### Parameters
- **query**: the search query. Either a text for simple queries with
default parameters, or a Query object for complex queries.
See RediSearch's documentation on query format
For more information see `FT.SEARCH <https://redis.io/commands/ft.search>`_.
""" # noqa
args, query = self._mk_query_args(query, query_params=query_params)
st = time.time()
res = await self.execute_command(SEARCH_CMD, *args)
if isinstance(res, Pipeline):
return res
return self._parse_results(
SEARCH_CMD, res, query=query, duration=(time.time() - st) * 1000.0
)
async def aggregate(
self,
query: Union[str, Query],
query_params: Dict[str, Union[str, int, float]] = None,
):
"""
Issue an aggregation query.
### Parameters
**query**: This can be either an `AggregateRequest`, or a `Cursor`
An `AggregateResult` object is returned. You can access the rows from
its `rows` property, which will always yield the rows of the result.
For more information see `FT.AGGREGATE <https://redis.io/commands/ft.aggregate>`_.
""" # noqa
if isinstance(query, AggregateRequest):
has_cursor = bool(query._cursor)
cmd = [AGGREGATE_CMD, self.index_name] + query.build_args()
elif isinstance(query, Cursor):
has_cursor = True
cmd = [CURSOR_CMD, "READ", self.index_name] + query.build_args()
else:
raise ValueError("Bad query", query)
cmd += self.get_params_args(query_params)
raw = await self.execute_command(*cmd)
return self._parse_results(
AGGREGATE_CMD, raw, query=query, has_cursor=has_cursor
)
async def spellcheck(self, query, distance=None, include=None, exclude=None):
"""
Issue a spellcheck query
### Parameters
**query**: search query.
**distance***: the maximal Levenshtein distance for spelling
suggestions (default: 1, max: 4).
**include**: specifies an inclusion custom dictionary.
**exclude**: specifies an exclusion custom dictionary.
For more information see `FT.SPELLCHECK <https://redis.io/commands/ft.spellcheck>`_.
""" # noqa
cmd = [SPELLCHECK_CMD, self.index_name, query]
if distance:
cmd.extend(["DISTANCE", distance])
if include:
cmd.extend(["TERMS", "INCLUDE", include])
if exclude:
cmd.extend(["TERMS", "EXCLUDE", exclude])
res = await self.execute_command(*cmd)
return self._parse_results(SPELLCHECK_CMD, res)
async def config_set(self, option: str, value: str) -> bool:
"""Set runtime configuration option.
### Parameters
- **option**: the name of the configuration option.
- **value**: a value for the configuration option.
For more information see `FT.CONFIG SET <https://redis.io/commands/ft.config-set>`_.
""" # noqa
cmd = [CONFIG_CMD, "SET", option, value]
raw = await self.execute_command(*cmd)
return raw == "OK"
async def config_get(self, option: str) -> str:
"""Get runtime configuration option value.
### Parameters
- **option**: the name of the configuration option.
For more information see `FT.CONFIG GET <https://redis.io/commands/ft.config-get>`_.
""" # noqa
cmd = [CONFIG_CMD, "GET", option]
res = {}
res = await self.execute_command(*cmd)
return self._parse_results(CONFIG_CMD, res)
async def load_document(self, id):
"""
Load a single document by id
"""
fields = await self.client.hgetall(id)
f2 = {to_string(k): to_string(v) for k, v in fields.items()}
fields = f2
try:
del fields["id"]
except KeyError:
pass
return Document(id=id, **fields)
async def sugadd(self, key, *suggestions, **kwargs):
"""
Add suggestion terms to the AutoCompleter engine. Each suggestion has
a score and string.
If kwargs["increment"] is true and the terms are already in the
server's dictionary, we increment their scores.
For more information see `FT.SUGADD <https://redis.io/commands/ft.sugadd>`_.
""" # noqa
# If Transaction is not False it will MULTI/EXEC which will error
pipe = self.pipeline(transaction=False)
for sug in suggestions:
args = [SUGADD_COMMAND, key, sug.string, sug.score]
if kwargs.get("increment"):
args.append("INCR")
if sug.payload:
args.append("PAYLOAD")
args.append(sug.payload)
pipe.execute_command(*args)
return (await pipe.execute())[-1]
async def sugget(
self,
key: str,
prefix: str,
fuzzy: bool = False,
num: int = 10,
with_scores: bool = False,
with_payloads: bool = False,
) -> List[SuggestionParser]:
"""
Get a list of suggestions from the AutoCompleter, for a given prefix.
Parameters:
prefix : str
The prefix we are searching. **Must be valid ascii or utf-8**
fuzzy : bool
If set to true, the prefix search is done in fuzzy mode.
**NOTE**: Running fuzzy searches on short (<3 letters) prefixes
can be very
slow, and even scan the entire index.
with_scores : bool
If set to true, we also return the (refactored) score of
each suggestion.
This is normally not needed, and is NOT the original score
inserted into the index.
with_payloads : bool
Return suggestion payloads
num : int
The maximum number of results we return. Note that we might
return less. The algorithm trims irrelevant suggestions.
Returns:
list:
A list of Suggestion objects. If with_scores was False, the
score of all suggestions is 1.
For more information see `FT.SUGGET <https://redis.io/commands/ft.sugget>`_.
""" # noqa
args = [SUGGET_COMMAND, key, prefix, "MAX", num]
if fuzzy:
args.append(FUZZY)
if with_scores:
args.append(WITHSCORES)
if with_payloads:
args.append(WITHPAYLOADS)
ret = await self.execute_command(*args)
results = []
if not ret:
return results
parser = SuggestionParser(with_scores, with_payloads, ret)
return [s for s in parser]