Sto cercando di scrivere un pezzo di codice che esegue seguenti:-
- Legge un grosso file csv da origine remota come s3.
- Processo il file di record per record.
- Inviare la notifica di utente
- Scrivere l'output in una posizione remota
Esempio di record in input csv:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
Il mio case di input classe, che rappresenta un record in input csv:
case class InputRecord(recordId: String, name: String, salary: Long)
Record di esempio in output in formato csv (che deve essere scritto):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
Il mio caso di produzione di classe, che rappresenta un record in input csv:
case class OutputRecord(recordId: String, name: String, designation: String)
La lettura di un record utilizzando akka flusso csv (utilizza Alpakka reattiva s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
Ora ho una funzione per elaborare i record:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
Funzione per scrivere il OutputRecord come csv
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
Funzione per l'invio di email di notifica:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
Cuciture tutti insieme
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
Sulla Linea 15 e 16, ricevo un errore, io sia in grado di aggiungere la Riga 15 o Linea 16, ma non entrambi, dal momento che entrambi notify
& writeOutput
esigenze outputRecord
. Una volta notify è chiamato ho sciolto il mio outputRecord
.
C'è un modo che posso aggiungere notify
e writeOutput
per uno stesso grafico?
Non sto cercando di esecuzione parallela come voglio prima chiamata notify
e poi solo writeOutput
. Quindi questo non è utile: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
Il caso in questione sembra molto semplice per me, ma in qualche modo io non sono in grado di trovare una soluzione pulita.