Che cosa succede se un Kafka consumatore gestisce un messaggio troppo lungo? Sarà Kafka rinominiamo questo partizione ad un altro utente e il messaggio sarà doppiamente trattati?

0

Domanda

Supponiamo Kafka, 1 partition, 2 consumers.(2 ° consumatore è inattivo)

Supponiamo che il 1 ° consumato un messaggio, va a gestire con altri 3 servizi e improvvisamente su uno di essi e di perdere il Kafka di timeout.

Sarà Kafka rinominiamo la partizione per il 2 ° dei consumatori e il messaggio sarà doppiamente trattati (supponiamo che il 1 ° uno eventualmente riuscire)?

1

Migliore risposta

1

Che cosa succede se un Kafka consumatore gestisce un messaggio troppo lungo? Sarà Kafka rinominiamo questo partizione ad un altro utente e il messaggio sarà doppiamente trattati?

Sì, questo è corretto. Se Kafka consumatore vuole troppo tempo per gestire un messaggio e successive poll() è in ritardo, Kafka si ri-nominare questa partizione ad un altro utente e il messaggio saranno trattati ancora (e ancora).

Per maggiore chiarezza, prima di tutto dobbiamo decidere " Quanto tempo è troppo lungo?'.

Questo è definito dalla proprietà max.poll.interval.ms. Dal docs,

Il ritardo massimo tra le chiamate di sondaggio() quando si utilizza consumo di gruppo di gestione. Questo pone un limite superiore alla quantità di tempo che il consumatore può essere inattivo prima di recuperare più record. Se poll() non viene chiamato prima della scadenza di questo timeout, quindi il consumatore è considerato fallito e il gruppo sarà riequilibrare al fine di riassegnare le partizioni di un altro membro.

Gruppo di consumatori è riequilibrato se non ci sono chiamate a poll() all'interno di questo tempo.

C'è una proprietà auto.commit.interval.ms. Il commit automatico offset di controllo verrà chiamato solo durante il sondaggio - controlla se il tempo trascorso è maggiore della configurato commit automatico intervallo di tempo e se il risultato è sì, l'offset è impegnata.

Se Kafka consumatore sta prendendo troppo tempo per elaborare i record, quindi la successiva poll() chiamata anche per ritardata e l'offset restituito l'ultimo sondaggio() non sono impegnati. Se riequilibrare accade in questo momento, il nuovo consumatore cliente assegnato a questa partizione avviare l'elaborazione dei messaggi.

Gruppo di consumatori, a riequilibrare e la conseguente partizione di riassegnazione può essere evitato aumentando questo valore. Ciò consentirà di aumentare l'intervallo ammesso tra i sondaggi e dare più tempo ai consumatori di gestire i record restituiti da un sondaggio(). I consumatori potranno aderire al riequilibrio all'interno della chiamata al sondaggio, in modo crescente max intervallo di polling anche del gruppo di ritardo riequilibra.

C'è un problema più in aumento max intervallo di polling di un grande valore. Se il consumatore muore per qualche altro motivo, richiede più tempo rispetto configurato max.poll.interval.ms intervallo di individuare il guasto.

session.timeout.ms e heartbeat.interval.ms sono disponibili, in questo caso, per rilevare il fallimento totale, come in precedenza possibile.

Per ulteriori informazioni su questi parametri:

Si prega di notare che i valori configurati per session.timeout.ms deve essere nella gamma consentita come configurato il broker configurazione delle proprietà

  • gruppo.min.la sessione.timeout.ms
  • gruppo.max.la sessione.timeout.ms

In caso contrario, a seguito di eccezione viene generata durante l'avvio del cliente consumatore.

Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)

Update: Per evitare di maneggiare i messaggi di nuovo

C'è un altro metodo in KafkaConsumer classe commitAsync() per attivare il commit offset operazione.

ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();

Per ulteriori dettagli su commitSync() e commitAsync(), si prega di controllare questo thread

Commettere un offset manualmente è un'azione di dire che l'offset è stato elaborato in modo che Kafka non inviare il commesso record per la stessa partizione di nuovo. Quando offset sono impegnati manualmente, è importante notare che, se il consumatore muore prima del trattamento record per qualsiasi motivo, c'è una possibilità che questi record non essere trattati di nuovo.

2021-11-25 07:04:25

Grazie, è chiaro. Ci sono modi per evitare il secondo trattamento?
J.J. Beam

@J. J. Fascio aggiornato risposta con link e campione
arunkvelu

In altre lingue

Questa pagina è in altre lingue

Русский
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................