Hi, All
I am trying to write my first StreamTask class. I have a topic at
Kafka called "http-demo". I like to read the topic and write it to another
topic called "demo-duplicate"
Howeven there is not topic written to Kafka.
My properties file and StreamTask are below. Can anyone told me what
is the bug?
BTW, if I set checkpoint or Metrics at properties file. the topic of
checkpoint and metrics could be written to Kafka. And the content of
input topic -- http-demo could be show correctly.
Your help is highly appreciated.
Sincerely,
Selina
- - -- - - - - -
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=demo-parser
# YARN
yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
# Task
task.class=samza.http.demo.task.HttpDemoParserStreamTask
task.inputs=kafka.http-demo
# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.samza.key.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.producer.bootstrap.servers=localhost:9092
- - -- - - - - -
My StreamTask class is simple also
---------
/**
*
* Read data from http-demo topic and write it back to "demo-duplicate"
*/
public class HttpDemoParserStreamTask implements StreamTask {
private static final SystemStream OUTPUT_STREAM = new
SystemStream("kafka", "demo-duplicate");
Logger logger = LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
@SuppressWarnings("unchecked")
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) throws Exception {
String key = (String) envelope.getKey();
String message = envelope.getMessage().toString();
logger.info("key="+key+": message="+message);
Map<String, String> outgoingMap = (Map<String, String>)
(envelope.getMessage());
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
outgoingMap));
//collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
message));
}
}
-------