Hi,

I am using a Trident topology to process files, transform them from CSV, EDI, 
XML to a general JSON format. I have a working prototype, but wanted to make 
sure I am implementing this correctly. Here is the flow

1. Read message from kafka, this message is meta data of the file location on S3
2. Next a function/bolt streams and transform the file from S3, emitting 
records one at a time.
3. Final step is a partitionPersist to kafka

Here’s the topology

TridentState kafkaState=topology.newStream("tracking-file-processor", 
FileNameSpout.opaqueKafkaSpout(zkHosts,topicName)) //parallelism should be 
number of partitions of topic
                                  .parallelismHint(1)
                                  .each(new Fields("str"),new S3Reader(), new 
Fields("tracking_num", "json", "record_type"))
                                  .shuffle()
                                  .partitionPersist(stateFactory, new 
Fields("tracking_num", "json", "record_type"), new TridentKafkaUpdater(), new 
Fields())
//                                .parallelismHint(10)
                                  ;

Questions

1. Is this the correct approach ?
2. The files are of varying sizes and could be close to 500Mb, the S3Reader 
function will emit one record (of the file) at a time, trident will batch them 
before doing the partitionPersist, so basically the entire file would be in 
memory ? While processing multiple files the memory requirement will increase ? 
Do i just parallelize and spread partitions over multiple workers  or is there 
a better way ?
3. This also means that the batch being written to kafka can vary in size and 
maybe quite large, is this acceptable ?
4. If i do need to write to a data source other than kafka, such as a regular 
db (most likely will be kafka but just want to gain some more knowledge) what 
would be the best way to do this ?

Hoping the community can help 

Thanks

Sherwin



Reply via email to