Hi All, I am having a topology with Kafka Spout Implementation with the topologyBuilder mentioned below,
TopologyBuilder builder=new TopologyBuilder(); builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 8); builder.setBolt("Parser", new ParserBolt()).globalGrouping("KafkaSpout"); builder.setBolt("FileBolt", new PersistBolt()).globalGrouping("Parser"); Config config=new Config(); config.put(Config.TOPOLOGY_WORKERS, 4); config.setNumWorkers(2); config.setMaxSpoutPending(10); config.setMaxTaskParallelism(10); I am having two level of Bolts, 1) Parser - Parsing of data and emitting a output tuple value which is containing POJO serialized object 2) Persist - Persisting of the forwarded data after some computation, which is received through previous bolt(Parser). Now I was looking out a way for the last PersistBolt("FileBolt") I want the field grouping on the parser bolt based on the some field value(POJO) which is being emitted. To make it more clear, Parser is emitting a POJO of the form, collector.emit(new Values(responseHandler)); where responseHandler is a POJO, public class ResponseHandler implements Serializable{ private String host = null; private String user = null; private String msg = null; public String getHost() { return host; } public void setHost(String host) { this.host = host; } public String getUser() { return hostName; } public void setuser(String user) { this.user = user; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } } Now I was looking out for a way to field group on the host and user level. Actively looking for the way around! Thanks!