Hi, I am using a Trident topology to process files, transform them from CSV, EDI, XML to a general JSON format. I have a working prototype, but wanted to make sure I am implementing this correctly. Here is the flow
1. Read message from kafka, this message is meta data of the file location on S3 2. Next a function/bolt streams and transform the file from S3, emitting records one at a time. 3. Final step is a partitionPersist to kafka Here’s the topology TridentState kafkaState=topology.newStream("tracking-file-processor", FileNameSpout.opaqueKafkaSpout(zkHosts,topicName)) //parallelism should be number of partitions of topic .parallelismHint(1) .each(new Fields("str"),new S3Reader(), new Fields("tracking_num", "json", "record_type")) .shuffle() .partitionPersist(stateFactory, new Fields("tracking_num", "json", "record_type"), new TridentKafkaUpdater(), new Fields()) // .parallelismHint(10) ; Questions 1. Is this the correct approach ? 2. The files are of varying sizes and could be close to 500Mb, the S3Reader function will emit one record (of the file) at a time, trident will batch them before doing the partitionPersist, so basically the entire file would be in memory ? While processing multiple files the memory requirement will increase ? Do i just parallelize and spread partitions over multiple workers or is there a better way ? 3. This also means that the batch being written to kafka can vary in size and maybe quite large, is this acceptable ? 4. If i do need to write to a data source other than kafka, such as a regular db (most likely will be kafka but just want to gain some more knowledge) what would be the best way to do this ? Hoping the community can help Thanks Sherwin