Selina,
You should probably check a few things
1. Your log files to see if you have any errors. Also, does you job fail or
continues running?
2. Does this line " logger.info("key="+key+": message="+message); " write
any logs?
3. This might not be the only reason, but you are sending messages of
type Map<String,
String>. However, in your config file, you defined "
systems.kafka.samza.msg.serde=string" which expects the message to be a
String.
Shadi
On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <[email protected]>
wrote:
> Hi, All
>
> I am trying to write my first StreamTask class. I have a topic at
> Kafka called "http-demo". I like to read the topic and write it to another
> topic called "demo-duplicate"
>
> Howeven there is not topic written to Kafka.
>
> My properties file and StreamTask are below. Can anyone told me what
> is the bug?
> BTW, if I set checkpoint or Metrics at properties file. the topic of
> checkpoint and metrics could be written to Kafka. And the content of
> input topic -- http-demo could be show correctly.
>
> Your help is highly appreciated.
>
> Sincerely,
> Selina
>
>
> - - -- - - - - -
> # Job
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.name=demo-parser
>
> # YARN
>
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>
> # Task
> task.class=samza.http.demo.task.HttpDemoParserStreamTask
> task.inputs=kafka.http-demo
>
> # Serializers
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
> # Kafka System
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.samza.msg.serde=string
> systems.kafka.samza.key.serde=string
> systems.kafka.consumer.zookeeper.connect=localhost:2181/
> systems.kafka.consumer.auto.offset.reset=largest
> systems.kafka.producer.bootstrap.servers=localhost:9092
> - - -- - - - - -
>
> My StreamTask class is simple also
>
> ---------
>
> /**
> *
> * Read data from http-demo topic and write it back to "demo-duplicate"
> */
> public class HttpDemoParserStreamTask implements StreamTask {
>
> private static final SystemStream OUTPUT_STREAM = new
> SystemStream("kafka", "demo-duplicate");
> Logger logger =
> LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
>
> @SuppressWarnings("unchecked")
> @Override
> public void process(IncomingMessageEnvelope envelope, MessageCollector
> collector, TaskCoordinator coordinator) throws Exception {
>
> String key = (String) envelope.getKey();
> String message = envelope.getMessage().toString();
> logger.info("key="+key+": message="+message);
>
> Map<String, String> outgoingMap = (Map<String, String>)
> (envelope.getMessage());
> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> outgoingMap));
> //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> message));
> }
>
> }
>
> -------
>