The code and the property seem good to me. collector.send(new
OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
curious if you accidentally disabled auto.create.topics.enable  ...Can you
also try to send msgs from cmd line to "demo-duplicate" to see if it gets
anything.

Let me know if it works.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu <swucaree...@gmail.com>
wrote:

> Hi, Shadi:
>
>       Thans a lot for your reply.
> 1. There is no error log at Kafka and Samza
>
> 2.  this line "  logger.info("key="+key+": message="+message); " write
> log correctly as below:
>
> [image: Inline image 1]
>
> This are my last two message with right count
>
> 3. I tried both way below, none of them create topic, but I will try it
> again.
>
> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
>
> //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
>
> 4. I wrote a topic call "http-demo" to Kafka as my input, and the content
> can be show with command line below, so the Kafka should be OK.
> deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
> --from-beginning --topic http-demo
>
> Your help is highly appreciated.
>
> Sincerely,
> Selina
>
>
>
>
> On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi <
> snogh...@linkedin.com.invalid> wrote:
>
>> Selina,
>>
>> You should probably check a few things
>> 1. Your log files to see if you have any errors. Also, does you job fail
>> or
>> continues running?
>> 2. Does this line "  logger.info("key="+key+": message="+message); "
>> write
>> any logs?
>> 3. This might not be the only reason, but you are sending messages of
>> type Map<String,
>> String>. However, in your config file, you defined "
>> systems.kafka.samza.msg.serde=string" which expects the message to be a
>> String.
>>
>>
>> Shadi
>>
>>
>> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <swucaree...@gmail.com>
>> wrote:
>>
>> > Hi,  All
>> >
>> >      I am trying to write my first StreamTask class. I have a topic at
>> > Kafka called "http-demo". I like to read the topic and write it to
>> another
>> > topic called "demo-duplicate"
>> >
>> >     Howeven there is not topic written to Kafka.
>> >
>> >     My properties file and StreamTask are below.  Can anyone told me
>> what
>> > is the bug?
>> >     BTW, if I set checkpoint or Metrics at properties file. the topic of
>> > checkpoint and metrics could be written to Kafka.  And the content of
>> >  input topic -- http-demo could be show correctly.
>> >
>> > Your help is highly appreciated.
>> >
>> > Sincerely,
>> > Selina
>> >
>> >
>> > - - -- - - - - -
>> > # Job
>> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>> > job.name=demo-parser
>>
>> >
>> > # YARN
>> >
>> >
>> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>> >
>> > # Task
>> > task.class=samza.http.demo.task.HttpDemoParserStreamTask
>> > task.inputs=kafka.http-demo
>> >
>> > # Serializers
>> >
>> >
>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>> >
>> > # Kafka System
>> >
>> >
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>> > systems.kafka.samza.msg.serde=string
>> > systems.kafka.samza.key.serde=string
>> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
>> > systems.kafka.consumer.auto.offset.reset=largest
>> > systems.kafka.producer.bootstrap.servers=localhost:9092
>> > - - -- - - - - -
>> >
>> > My StreamTask class is simple also
>> >
>> > ---------
>> >
>> > /**
>> >  *
>> >  * Read data from http-demo topic and write it back to "demo-duplicate"
>> >  */
>> > public class HttpDemoParserStreamTask implements StreamTask {
>> >
>> >     private static final SystemStream OUTPUT_STREAM = new
>> > SystemStream("kafka", "demo-duplicate");
>> >     Logger logger =
>> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
>> >
>> >     @SuppressWarnings("unchecked")
>> >     @Override
>> >     public void process(IncomingMessageEnvelope envelope,
>> MessageCollector
>> > collector, TaskCoordinator coordinator) throws Exception {
>> >
>> >         String key = (String) envelope.getKey();
>> >         String message = envelope.getMessage().toString();
>> >         logger.info("key="+key+": message="+message);
>> >
>> >         Map<String, String> outgoingMap = (Map<String, String>)
>> > (envelope.getMessage());
>> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>> > outgoingMap));
>> >         //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>> > message));
>> >     }
>> >
>> > }
>> >
>> > -------
>> >
>>
>
>

Reply via email to