Akka flusso di Input (`A`) come Output ("Out")

0

Domanda

Sto cercando di scrivere un pezzo di codice che esegue seguenti:-

  1. Legge un grosso file csv da origine remota come s3.
  2. Processo il file di record per record.
  3. Inviare la notifica di utente
  4. Scrivere l'output in una posizione remota

Esempio di record in input csv:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

Il mio case di input classe, che rappresenta un record in input csv:

case class InputRecord(recordId: String, name: String, salary: Long)

Record di esempio in output in formato csv (che deve essere scritto):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

Il mio caso di produzione di classe, che rappresenta un record in input csv:

case class OutputRecord(recordId: String, name: String, designation: String)

La lettura di un record utilizzando akka flusso csv (utilizza Alpakka reattiva s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

Ora ho una funzione per elaborare i record:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

Funzione per scrivere il OutputRecord come csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

Funzione per l'invio di email di notifica:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

Cuciture tutti insieme

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

Sulla Linea 15 e 16, ricevo un errore, io sia in grado di aggiungere la Riga 15 o Linea 16, ma non entrambi, dal momento che entrambi notify & writeOutput esigenze outputRecord. Una volta notify è chiamato ho sciolto il mio outputRecord.

C'è un modo che posso aggiungere notify e writeOutput per uno stesso grafico?

Non sto cercando di esecuzione parallela come voglio prima chiamata notify e poi solo writeOutput. Quindi questo non è utile: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

Il caso in questione sembra molto semplice per me, ma in qualche modo io non sono in grado di trovare una soluzione pulita.

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

Migliore risposta

1

L'output di notify è un PushResultma l'ingresso di writeOutput è ByteString. Una volta che si cambia, che verrà compilata. Nel caso In cui avete bisogno di ByteString, avere la stessa OutputRecord.

BTW, il codice di esempio che hai fornito, un errore simile esiste in readCSV e process.

2021-11-24 03:36:16

In altre lingue

Questa pagina è in altre lingue

Русский
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................