Nella sua docstring, elasticsearch.helpers.async_bulk
si descrive come un
Di supporto per l' :meth:
~elasticsearch.AsyncElasticsearch.bulk
api che fornisce più umano e amichevole interfaccia - consuma un iteratore di azioni e li invia al elasticsearch in blocchi. fonte
Contesto
Sono stato con AsyncElasticsearch.bulk()
successo per inviare panda dataframes per alcuni ES istanza
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
Problema
Tuttavia, quando si tratta di async_bulk
Io sono sempre index is missing
errori.
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
Provato a sintonizzare _rec_to_actions()
in diversi modi, senza molto effetto.
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
Credo che il problema principale è che io non sono abbastanza sicuro di sapere che cosa è un'azione, in un contesto di elasticsearch. Questa nozione è ovunque nella documentazione ma non hanno una chiara struttura di dati controparte in questa libreria di codice sorgente (nessuno che ho trovato, comunque)
Che cosa è esattamente un azione e come faccio a sintonizzare il mio generatore di inviare df dati self.index
?
ambiente
- python = "3.9.5"
- elasticsearch = "7.14.1"