Re: Samza: can not produce new data to kafka
Hi, Yan: Thanks for reply my email in detail. All the files at Yarn logs shown below. No Exception under samza-Demo/deploy/yarn/logs. I guess the StreamTask did not called ... Partial stdout file (samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_01/stderr) is pasted below. In short, the log by logger.info("key="+key+": message="+message); " was not generated. /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -server -Dsamza.container.name=samza-application-master -Dlog4j.configuration=file:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/log4j.xml -Dsamza.log.dir=/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_01 -Djava.io.tmpdir=/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/tmp -Xmx768M -XX:+PrintGCDateStamps -Xloggc:/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_01/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024 -d64 -cp /Users/selina/IdeaProjects/samza-Demo/deploy/yarn/etc/hadoop:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/activation-1.1.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/akka-actor_2.10-2.1.2.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/aopalliance-1.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/asm-3.1.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/avro-1.7.4.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/commons-beanutils-1.7.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/commons-beanutils-core-1.8.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/commons-cli-1.2 For file gc.log.0.current shown Allocation failure and Full GC CommandLine flags: -XX:GCLogFileSize=10241024 -XX:InitialHeapSize=268435456 -XX:MaxHeapSize=805306368 -XX:NumberOfGCLogFiles=10 -XX:+PrintGC -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseGCLogFileRotation -XX:+UseParallelGC 2015-07-24T13:28:56.901+0800: 0.694:* [GC (Allocation Failure)* 65536K->8449K(251392K), 0.0062314 secs] 2015-07-24T13:28:57.188+0800: 0.981: [GC (System.gc()) 39240K->6305K(251392K), 0.0047744 secs] 2015-07-24T13:28:57.193+0800: 0.986: [Full GC (System.gc()) 6305K->5940K(251392K), 0.0147206 secs] 2015-07-24T13:28:57.625+0800: 1.418: [GC (Allocation Failure) 71476K->12511K(251392K), 0.0030179 secs] 2015-07-24T13:28:59.889+0800: 3.682: [GC (Allocation Failure) 78047K->13859K(251392K), 0.0052610 secs] 2015-07-24T13:29:15.487+0800: 19.280: [GC (Metadata GC Threshold) 35659K->10106K(251392K), 0.0036350 secs] 2015-07-24T13:29:15.490+0800: 19.284: *[Full GC (Metadata GC Threshold*) 10106K->7318K(149504K), 0.0200118 secs] [image: Inline image 1] Your help is highly appreciated. Sincerely, Selina On Fri, Jul 24, 2015 at 1:51 PM, Yan Fang wrote: > {quote} > I did not set auto.create.topics.enable anywhere > {quote} > > Fine. Then its default to true. No worries. > > {quote} > My job is listed as below. However I am wondering how can I know if my > method "public void* process*(IncomingMessageEnvelope envelope, > MessageCollector collector, TaskCoordinator coordinator)" was run or not. > {quote} > > If you have log enabled (from the code, you did), you can check the > contain's log to see if it has the output. Assuming you are using the local > yarn like what hello-samza provides, you should be able to check the logs > in deploy/yarn/userlogs/application_Id. > > If you use print.out method, you can see the result in the > deploy/yarn/userlogs/application_Id 's sysout file (if the StreamTask) > works. > > If it does not work, you can check the logs in > deploy/yarn/userlogs/application_Id as well to see the exceptions if there > is any. > > Thanks, > > Fang, Yan > yanfang...@gmail.com > > On Fri, Jul 24, 2015 at 1:45 PM, Job-Selina Wu > wrote: > >> Hi, Yan and Shadi: >> >> I made a mistake. Actually, there is no log at
Re: Samza: can not produce new data to kafka
{quote} I did not set auto.create.topics.enable anywhere {quote} Fine. Then its default to true. No worries. {quote} My job is listed as below. However I am wondering how can I know if my method "public void* process*(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)" was run or not. {quote} If you have log enabled (from the code, you did), you can check the contain's log to see if it has the output. Assuming you are using the local yarn like what hello-samza provides, you should be able to check the logs in deploy/yarn/userlogs/application_Id. If you use print.out method, you can see the result in the deploy/yarn/userlogs/application_Id 's sysout file (if the StreamTask) works. If it does not work, you can check the logs in deploy/yarn/userlogs/application_Id as well to see the exceptions if there is any. Thanks, Fang, Yan yanfang...@gmail.com On Fri, Jul 24, 2015 at 1:45 PM, Job-Selina Wu wrote: > Hi, Yan and Shadi: > > I made a mistake. Actually, there is no log at /tmp/kafka-logs > created by " logger.info("key="+key+": message="+message); ". The log I > provided actually is log for input topic "http-demo" at > /tmp/kafka-logs/http-demo-0 > > My job is listed as below. However I am wondering how can I know if > my method "public void* process*(IncomingMessageEnvelope envelope, > MessageCollector collector, TaskCoordinator coordinator)" was run or not. > > I manually create topic "demo-duplicate" by command line, otherwise > it will be created by samza code. > > I checked I did not set auto.create.topics.enable anywhere. Attached > is my properties file for Kafka > > >Your help is highly appreciated > > Sincerely, > Selina > > [image: Inline image 1] > > > > > On Fri, Jul 24, 2015 at 11:56 AM, Yan Fang wrote: > >> The code and the property seem good to me. collector.send(new >> OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am >> curious if you accidentally disabled auto.create.topics.enable ...Can you >> also try to send msgs from cmd line to "demo-duplicate" to see if it gets >> anything. >> >> Let me know if it works. >> >> Thanks, >> >> Fang, Yan >> yanfang...@gmail.com >> >> On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu >> wrote: >> >> > Hi, Shadi: >> > >> > Thans a lot for your reply. >> > 1. There is no error log at Kafka and Samza >> > >> > 2. this line " logger.info("key="+key+": message="+message); " write >> > log correctly as below: >> > >> > [image: Inline image 1] >> > >> > This are my last two message with right count >> > >> > 3. I tried both way below, none of them create topic, but I will try it >> > again. >> > >> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap)); >> > >> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message)); >> > >> > 4. I wrote a topic call "http-demo" to Kafka as my input, and the >> content >> > can be show with command line below, so the Kafka should be OK. >> > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 >> > --from-beginning --topic http-demo >> > >> > Your help is highly appreciated. >> > >> > Sincerely, >> > Selina >> > >> > >> > >> > >> > On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi < >> > snogh...@linkedin.com.invalid> wrote: >> > >> >> Selina, >> >> >> >> You should probably check a few things >> >> 1. Your log files to see if you have any errors. Also, does you job >> fail >> >> or >> >> continues running? >> >> 2. Does this line " logger.info("key="+key+": message="+message); " >> >> write >> >> any logs? >> >> 3. This might not be the only reason, but you are sending messages of >> >> type Map> >> String>. However, in your config file, you defined " >> >> systems.kafka.samza.msg.serde=string" which expects the message to be a >> >> String. >> >> >> >> >> >> Shadi >> >> >> >> >> >> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu >> >> wrote: >> >> >> >> > Hi, All >> >> > >> >> > I am trying to write my first StreamTask class. I have a topic >> at >> >> > Kafka called "http-demo". I like to read the topic and write it to >> >> another >> >> > topic called "demo-duplicate" >> >> > >> >> > Howeven there is not topic written to Kafka. >> >> > >> >> > My properties file and StreamTask are below. Can anyone told me >> >> what >> >> > is the bug? >> >> > BTW, if I set checkpoint or Metrics at properties file. the >> topic of >> >> > checkpoint and metrics could be written to Kafka. And the content of >> >> > input topic -- http-demo could be show correctly. >> >> > >> >> > Your help is highly appreciated. >> >> > >> >> > Sincerely, >> >> > Selina >> >> > >> >> > >> >> > - - -- - - - - - >> >> > # Job >> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory >> >> > job.name=demo-parser >> >> >> >> > >> >> > # YARN >> >> > >> >> > >> >> >> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz >> >> > >> >> > # T
Re: Samza: can not produce new data to kafka
Hi, Yan and Shadi: I made a mistake. Actually, there is no log at /tmp/kafka-logs created by " logger.info("key="+key+": message="+message); ". The log I provided actually is log for input topic "http-demo" at /tmp/kafka-logs/http-demo-0 My job is listed as below. However I am wondering how can I know if my method "public void* process*(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)" was run or not. I manually create topic "demo-duplicate" by command line, otherwise it will be created by samza code. I checked I did not set auto.create.topics.enable anywhere. Attached is my properties file for Kafka Your help is highly appreciated Sincerely, Selina [image: Inline image 1] On Fri, Jul 24, 2015 at 11:56 AM, Yan Fang wrote: > The code and the property seem good to me. collector.send(new > OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am > curious if you accidentally disabled auto.create.topics.enable ...Can you > also try to send msgs from cmd line to "demo-duplicate" to see if it gets > anything. > > Let me know if it works. > > Thanks, > > Fang, Yan > yanfang...@gmail.com > > On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu > wrote: > > > Hi, Shadi: > > > > Thans a lot for your reply. > > 1. There is no error log at Kafka and Samza > > > > 2. this line " logger.info("key="+key+": message="+message); " write > > log correctly as below: > > > > [image: Inline image 1] > > > > This are my last two message with right count > > > > 3. I tried both way below, none of them create topic, but I will try it > > again. > > > > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap)); > > > > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message)); > > > > 4. I wrote a topic call "http-demo" to Kafka as my input, and the content > > can be show with command line below, so the Kafka should be OK. > > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 > > --from-beginning --topic http-demo > > > > Your help is highly appreciated. > > > > Sincerely, > > Selina > > > > > > > > > > On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi < > > snogh...@linkedin.com.invalid> wrote: > > > >> Selina, > >> > >> You should probably check a few things > >> 1. Your log files to see if you have any errors. Also, does you job fail > >> or > >> continues running? > >> 2. Does this line " logger.info("key="+key+": message="+message); " > >> write > >> any logs? > >> 3. This might not be the only reason, but you are sending messages of > >> type Map >> String>. However, in your config file, you defined " > >> systems.kafka.samza.msg.serde=string" which expects the message to be a > >> String. > >> > >> > >> Shadi > >> > >> > >> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu > >> wrote: > >> > >> > Hi, All > >> > > >> > I am trying to write my first StreamTask class. I have a topic at > >> > Kafka called "http-demo". I like to read the topic and write it to > >> another > >> > topic called "demo-duplicate" > >> > > >> > Howeven there is not topic written to Kafka. > >> > > >> > My properties file and StreamTask are below. Can anyone told me > >> what > >> > is the bug? > >> > BTW, if I set checkpoint or Metrics at properties file. the topic > of > >> > checkpoint and metrics could be written to Kafka. And the content of > >> > input topic -- http-demo could be show correctly. > >> > > >> > Your help is highly appreciated. > >> > > >> > Sincerely, > >> > Selina > >> > > >> > > >> > - - -- - - - - - > >> > # Job > >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > >> > job.name=demo-parser > >> > >> > > >> > # YARN > >> > > >> > > >> > yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz > >> > > >> > # Task > >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask > >> > task.inputs=kafka.http-demo > >> > > >> > # Serializers > >> > > >> > > >> > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > >> > > >> > # Kafka System > >> > > >> > > >> > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > >> > systems.kafka.samza.msg.serde=string > >> > systems.kafka.samza.key.serde=string > >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > >> > systems.kafka.consumer.auto.offset.reset=largest > >> > systems.kafka.producer.bootstrap.servers=localhost:9092 > >> > - - -- - - - - - > >> > > >> > My StreamTask class is simple also > >> > > >> > - > >> > > >> > /** > >> > * > >> > * Read data from http-demo topic and write it back to > "demo-duplicate" > >> > */ > >> > public class HttpDemoParserStreamTask implements StreamTask { > >> > > >> > private static final SystemStream OUTPUT_STREAM = new > >> > SystemStream("kafka", "demo-duplicate"); > >> > Logger logger = > >> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
Re: Samza: can not produce new data to kafka
The code and the property seem good to me. collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am curious if you accidentally disabled auto.create.topics.enable ...Can you also try to send msgs from cmd line to "demo-duplicate" to see if it gets anything. Let me know if it works. Thanks, Fang, Yan yanfang...@gmail.com On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu wrote: > Hi, Shadi: > > Thans a lot for your reply. > 1. There is no error log at Kafka and Samza > > 2. this line " logger.info("key="+key+": message="+message); " write > log correctly as below: > > [image: Inline image 1] > > This are my last two message with right count > > 3. I tried both way below, none of them create topic, but I will try it > again. > > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap)); > > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message)); > > 4. I wrote a topic call "http-demo" to Kafka as my input, and the content > can be show with command line below, so the Kafka should be OK. > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 > --from-beginning --topic http-demo > > Your help is highly appreciated. > > Sincerely, > Selina > > > > > On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi < > snogh...@linkedin.com.invalid> wrote: > >> Selina, >> >> You should probably check a few things >> 1. Your log files to see if you have any errors. Also, does you job fail >> or >> continues running? >> 2. Does this line " logger.info("key="+key+": message="+message); " >> write >> any logs? >> 3. This might not be the only reason, but you are sending messages of >> type Map> String>. However, in your config file, you defined " >> systems.kafka.samza.msg.serde=string" which expects the message to be a >> String. >> >> >> Shadi >> >> >> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu >> wrote: >> >> > Hi, All >> > >> > I am trying to write my first StreamTask class. I have a topic at >> > Kafka called "http-demo". I like to read the topic and write it to >> another >> > topic called "demo-duplicate" >> > >> > Howeven there is not topic written to Kafka. >> > >> > My properties file and StreamTask are below. Can anyone told me >> what >> > is the bug? >> > BTW, if I set checkpoint or Metrics at properties file. the topic of >> > checkpoint and metrics could be written to Kafka. And the content of >> > input topic -- http-demo could be show correctly. >> > >> > Your help is highly appreciated. >> > >> > Sincerely, >> > Selina >> > >> > >> > - - -- - - - - - >> > # Job >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory >> > job.name=demo-parser >> >> > >> > # YARN >> > >> > >> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz >> > >> > # Task >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask >> > task.inputs=kafka.http-demo >> > >> > # Serializers >> > >> > >> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory >> > >> > # Kafka System >> > >> > >> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory >> > systems.kafka.samza.msg.serde=string >> > systems.kafka.samza.key.serde=string >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/ >> > systems.kafka.consumer.auto.offset.reset=largest >> > systems.kafka.producer.bootstrap.servers=localhost:9092 >> > - - -- - - - - - >> > >> > My StreamTask class is simple also >> > >> > - >> > >> > /** >> > * >> > * Read data from http-demo topic and write it back to "demo-duplicate" >> > */ >> > public class HttpDemoParserStreamTask implements StreamTask { >> > >> > private static final SystemStream OUTPUT_STREAM = new >> > SystemStream("kafka", "demo-duplicate"); >> > Logger logger = >> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class); >> > >> > @SuppressWarnings("unchecked") >> > @Override >> > public void process(IncomingMessageEnvelope envelope, >> MessageCollector >> > collector, TaskCoordinator coordinator) throws Exception { >> > >> > String key = (String) envelope.getKey(); >> > String message = envelope.getMessage().toString(); >> > logger.info("key="+key+": message="+message); >> > >> > Map outgoingMap = (Map) >> > (envelope.getMessage()); >> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, >> > outgoingMap)); >> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, >> > message)); >> > } >> > >> > } >> > >> > --- >> > >> > >
Re: Samza: can not produce new data to kafka
Hi, Shadi: Thans a lot for your reply. 1. There is no error log at Kafka and Samza 2. this line " logger.info("key="+key+": message="+message); " write log correctly as below: [image: Inline image 1] This are my last two message with right count 3. I tried both way below, none of them create topic, but I will try it again. collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap)); //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message)); 4. I wrote a topic call "http-demo" to Kafka as my input, and the content can be show with command line below, so the Kafka should be OK. deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic http-demo Your help is highly appreciated. Sincerely, Selina On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi < snogh...@linkedin.com.invalid> wrote: > Selina, > > You should probably check a few things > 1. Your log files to see if you have any errors. Also, does you job fail or > continues running? > 2. Does this line " logger.info("key="+key+": message="+message); " write > any logs? > 3. This might not be the only reason, but you are sending messages of > type Map String>. However, in your config file, you defined " > systems.kafka.samza.msg.serde=string" which expects the message to be a > String. > > > Shadi > > > On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu > wrote: > > > Hi, All > > > > I am trying to write my first StreamTask class. I have a topic at > > Kafka called "http-demo". I like to read the topic and write it to > another > > topic called "demo-duplicate" > > > > Howeven there is not topic written to Kafka. > > > > My properties file and StreamTask are below. Can anyone told me what > > is the bug? > > BTW, if I set checkpoint or Metrics at properties file. the topic of > > checkpoint and metrics could be written to Kafka. And the content of > > input topic -- http-demo could be show correctly. > > > > Your help is highly appreciated. > > > > Sincerely, > > Selina > > > > > > - - -- - - - - - > > # Job > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > > job.name=demo-parser > > > > # YARN > > > > > yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz > > > > # Task > > task.class=samza.http.demo.task.HttpDemoParserStreamTask > > task.inputs=kafka.http-demo > > > > # Serializers > > > > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > > > # Kafka System > > > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > systems.kafka.samza.msg.serde=string > > systems.kafka.samza.key.serde=string > > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > > systems.kafka.consumer.auto.offset.reset=largest > > systems.kafka.producer.bootstrap.servers=localhost:9092 > > - - -- - - - - - > > > > My StreamTask class is simple also > > > > - > > > > /** > > * > > * Read data from http-demo topic and write it back to "demo-duplicate" > > */ > > public class HttpDemoParserStreamTask implements StreamTask { > > > > private static final SystemStream OUTPUT_STREAM = new > > SystemStream("kafka", "demo-duplicate"); > > Logger logger = > > LoggerFactory.getLogger(HttpDemoParserStreamTask.class); > > > > @SuppressWarnings("unchecked") > > @Override > > public void process(IncomingMessageEnvelope envelope, > MessageCollector > > collector, TaskCoordinator coordinator) throws Exception { > > > > String key = (String) envelope.getKey(); > > String message = envelope.getMessage().toString(); > > logger.info("key="+key+": message="+message); > > > > Map outgoingMap = (Map) > > (envelope.getMessage()); > > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > > outgoingMap)); > > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > > message)); > > } > > > > } > > > > --- > > >
Re: Samza: can not produce new data to kafka
Selina, You should probably check a few things 1. Your log files to see if you have any errors. Also, does you job fail or continues running? 2. Does this line " logger.info("key="+key+": message="+message); " write any logs? 3. This might not be the only reason, but you are sending messages of type Map. However, in your config file, you defined " systems.kafka.samza.msg.serde=string" which expects the message to be a String. Shadi On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu wrote: > Hi, All > > I am trying to write my first StreamTask class. I have a topic at > Kafka called "http-demo". I like to read the topic and write it to another > topic called "demo-duplicate" > > Howeven there is not topic written to Kafka. > > My properties file and StreamTask are below. Can anyone told me what > is the bug? > BTW, if I set checkpoint or Metrics at properties file. the topic of > checkpoint and metrics could be written to Kafka. And the content of > input topic -- http-demo could be show correctly. > > Your help is highly appreciated. > > Sincerely, > Selina > > > - - -- - - - - - > # Job > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > job.name=demo-parser > > # YARN > > yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz > > # Task > task.class=samza.http.demo.task.HttpDemoParserStreamTask > task.inputs=kafka.http-demo > > # Serializers > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > # Kafka System > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > systems.kafka.samza.msg.serde=string > systems.kafka.samza.key.serde=string > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > systems.kafka.consumer.auto.offset.reset=largest > systems.kafka.producer.bootstrap.servers=localhost:9092 > - - -- - - - - - > > My StreamTask class is simple also > > - > > /** > * > * Read data from http-demo topic and write it back to "demo-duplicate" > */ > public class HttpDemoParserStreamTask implements StreamTask { > > private static final SystemStream OUTPUT_STREAM = new > SystemStream("kafka", "demo-duplicate"); > Logger logger = > LoggerFactory.getLogger(HttpDemoParserStreamTask.class); > > @SuppressWarnings("unchecked") > @Override > public void process(IncomingMessageEnvelope envelope, MessageCollector > collector, TaskCoordinator coordinator) throws Exception { > > String key = (String) envelope.getKey(); > String message = envelope.getMessage().toString(); > logger.info("key="+key+": message="+message); > > Map outgoingMap = (Map) > (envelope.getMessage()); > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > outgoingMap)); > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > message)); > } > > } > > --- >
Samza: can not produce new data to kafka
Hi, All I am trying to write my first StreamTask class. I have a topic at Kafka called "http-demo". I like to read the topic and write it to another topic called "demo-duplicate" Howeven there is not topic written to Kafka. My properties file and StreamTask are below. Can anyone told me what is the bug? BTW, if I set checkpoint or Metrics at properties file. the topic of checkpoint and metrics could be written to Kafka. And the content of input topic -- http-demo could be show correctly. Your help is highly appreciated. Sincerely, Selina - - -- - - - - - # Job job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=demo-parser # YARN yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz # Task task.class=samza.http.demo.task.HttpDemoParserStreamTask task.inputs=kafka.http-demo # Serializers serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory # Kafka System systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.samza.msg.serde=string systems.kafka.samza.key.serde=string systems.kafka.consumer.zookeeper.connect=localhost:2181/ systems.kafka.consumer.auto.offset.reset=largest systems.kafka.producer.bootstrap.servers=localhost:9092 - - -- - - - - - My StreamTask class is simple also - /** * * Read data from http-demo topic and write it back to "demo-duplicate" */ public class HttpDemoParserStreamTask implements StreamTask { private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "demo-duplicate"); Logger logger = LoggerFactory.getLogger(HttpDemoParserStreamTask.class); @SuppressWarnings("unchecked") @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { String key = (String) envelope.getKey(); String message = envelope.getMessage().toString(); logger.info("key="+key+": message="+message); Map outgoingMap = (Map) (envelope.getMessage()); collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap)); //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message)); } } ---