The code and the property seem good to me. collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am curious if you accidentally disabled auto.create.topics.enable ...Can you also try to send msgs from cmd line to "demo-duplicate" to see if it gets anything.
Let me know if it works. Thanks, Fang, Yan yanfang...@gmail.com On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu <swucaree...@gmail.com> wrote: > Hi, Shadi: > > Thans a lot for your reply. > 1. There is no error log at Kafka and Samza > > 2. this line " logger.info("key="+key+": message="+message); " write > log correctly as below: > > [image: Inline image 1] > > This are my last two message with right count > > 3. I tried both way below, none of them create topic, but I will try it > again. > > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap)); > > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message)); > > 4. I wrote a topic call "http-demo" to Kafka as my input, and the content > can be show with command line below, so the Kafka should be OK. > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 > --from-beginning --topic http-demo > > Your help is highly appreciated. > > Sincerely, > Selina > > > > > On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi < > snogh...@linkedin.com.invalid> wrote: > >> Selina, >> >> You should probably check a few things >> 1. Your log files to see if you have any errors. Also, does you job fail >> or >> continues running? >> 2. Does this line " logger.info("key="+key+": message="+message); " >> write >> any logs? >> 3. This might not be the only reason, but you are sending messages of >> type Map<String, >> String>. However, in your config file, you defined " >> systems.kafka.samza.msg.serde=string" which expects the message to be a >> String. >> >> >> Shadi >> >> >> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <swucaree...@gmail.com> >> wrote: >> >> > Hi, All >> > >> > I am trying to write my first StreamTask class. I have a topic at >> > Kafka called "http-demo". I like to read the topic and write it to >> another >> > topic called "demo-duplicate" >> > >> > Howeven there is not topic written to Kafka. >> > >> > My properties file and StreamTask are below. Can anyone told me >> what >> > is the bug? >> > BTW, if I set checkpoint or Metrics at properties file. the topic of >> > checkpoint and metrics could be written to Kafka. And the content of >> > input topic -- http-demo could be show correctly. >> > >> > Your help is highly appreciated. >> > >> > Sincerely, >> > Selina >> > >> > >> > - - -- - - - - - >> > # Job >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory >> > job.name=demo-parser >> >> > >> > # YARN >> > >> > >> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz >> > >> > # Task >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask >> > task.inputs=kafka.http-demo >> > >> > # Serializers >> > >> > >> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory >> > >> > # Kafka System >> > >> > >> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory >> > systems.kafka.samza.msg.serde=string >> > systems.kafka.samza.key.serde=string >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/ >> > systems.kafka.consumer.auto.offset.reset=largest >> > systems.kafka.producer.bootstrap.servers=localhost:9092 >> > - - -- - - - - - >> > >> > My StreamTask class is simple also >> > >> > --------- >> > >> > /** >> > * >> > * Read data from http-demo topic and write it back to "demo-duplicate" >> > */ >> > public class HttpDemoParserStreamTask implements StreamTask { >> > >> > private static final SystemStream OUTPUT_STREAM = new >> > SystemStream("kafka", "demo-duplicate"); >> > Logger logger = >> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class); >> > >> > @SuppressWarnings("unchecked") >> > @Override >> > public void process(IncomingMessageEnvelope envelope, >> MessageCollector >> > collector, TaskCoordinator coordinator) throws Exception { >> > >> > String key = (String) envelope.getKey(); >> > String message = envelope.getMessage().toString(); >> > logger.info("key="+key+": message="+message); >> > >> > Map<String, String> outgoingMap = (Map<String, String>) >> > (envelope.getMessage()); >> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, >> > outgoingMap)); >> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, >> > message)); >> > } >> > >> > } >> > >> > ------- >> > >> > >