Hi All,

I am having a topology with Kafka Spout Implementation with the
topologyBuilder mentioned below,

        TopologyBuilder builder=new TopologyBuilder();
        builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 8);
        builder.setBolt("Parser", new
ParserBolt()).globalGrouping("KafkaSpout");
        builder.setBolt("FileBolt", new
PersistBolt()).globalGrouping("Parser");

        Config config=new Config();
        config.put(Config.TOPOLOGY_WORKERS, 4);
        config.setNumWorkers(2);
        config.setMaxSpoutPending(10);
        config.setMaxTaskParallelism(10);

I am having two level of Bolts,

1) Parser - Parsing of data and emitting a output tuple value which is
containing POJO serialized object
2) Persist - Persisting of the forwarded data after some computation, which
is received through previous bolt(Parser).

Now I was looking out a way for the last PersistBolt("FileBolt") I want the
field grouping on the parser bolt based on the some field value(POJO) which
is being emitted.


To make it more clear,

Parser is emitting a POJO of the form,

collector.emit(new Values(responseHandler));

where responseHandler is a POJO,

public class ResponseHandler implements Serializable{

private String host = null;
private String user = null;
private String msg = null;
 public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getUser() {
return hostName;
}
public void setuser(String user) {
this.user = user;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
 }

Now I was looking out for a way to field group on the host and user level.

Actively looking for the way around!

Thanks!

Reply via email to