Hi Chris,

Thanks so much for your help. The application worked after making the changes 
mentioned by you to the properties file. For reference of someone else running 
the same issue, here's how my properties file now looks like:

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=kafka-test

# YARN
yarn.package.path=file:///home/test/SAMZA/backup_hello_samza/hello-samza/samza-job-package/target/samza-job-package-0.7.0-dist.tar.gz

# Task
task.class=samza.examples.wikipedia.task.MyTask
task.inputs=kafka.testConsumer
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=1

# Metrics
metrics.reporters=snapshot,jmx
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
metrics.reporter.snapshot.stream=kafka.metrics
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory

# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory

# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.producer.metadata.broker.list=localhost:9092
systems.kafka.producer.producer.type=sync
# Normally, we'd set this much higher, but we want things to look snappy in the 
demo.
systems.kafka.producer.batch.num.messages=1
systems.kafka.streams.metrics.samza.msg.serde=metrics

Regards,
Jayati
________________________________________
From: Chris Riccomini [[email protected]]
Sent: Sunday, November 10, 2013 10:47 AM
To: [email protected]
Subject: RE: Writing a Samza application to work as both Kafka Producer and 
Consumer

Hey Jayati,

Thanks for sending these logs.

Some observations:

1. The metrics snapshot serde is not defined in your job properties file. It 
should be set like this:

serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory

You can see an example here:

https://github.com/linkedin/hello-samza/blob/master/samza-job-package/src/main/config/wikipedia-parser.properties

Without setting this, you'll see errors like this:

2013-11-08 13:37:24 Producer [INFO] Shutting down producer
2013-11-08 13:37:24 ProducerPool [INFO] Closing all sync producers
2013-11-08 13:37:24 KafkaSystemProducer [INFO] Creating a new producer for 
system kafka.
2013-11-08 13:37:24 KafkaSystemProducer [WARN] Triggering a reconnect for kafka 
because connection failed: org.apache.samza.metrics.reporter.MetricsSnapshot 
cannot be cast to [B

2. Your String serde is improperly defined. You have it set like this:

serializers.registry.string.class=samza.serializers.StringSerdeFactory

This is an incorrect package space. It should be set like this:

serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

With this improper setting, your containers are failing. In stderr, you can see:

Exception in thread "main" java.lang.ClassNotFoundException: 
samza.serializers.StringSerdeFactory
...

Cheers,
Chris
________________________________________
From: Jayati Tiwari [[email protected]]
Sent: Friday, November 08, 2013 12:22 AM
To: [email protected]
Subject: RE: Writing a Samza application to work as both Kafka Producer and 
Consumer

Hello Chris,

Thanks so much for your prompt response.

Please find the properties file at the link below:

http://pastebin.com/SQjadXn2

Yes, I have tested that kafka is producing messages by running a consumer on 
the console using the following command:

deploy/kafka/bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic 
testConsumer

where, my console producer is running using the command:

deploy/kafka/bin/kafka-console-producer.sh --topic testConsumer --broker-list 
localhost:9092

I am attaching the AM and container logs herewith.

Please help.

Regards,
Jayati

________________________________________
From: Chris Riccomini [[email protected]]
Sent: Thursday, November 07, 2013 9:32 PM
To: [email protected]
Subject: Re: Writing a Samza application to work as both Kafka Producer and 
Consumer

Hey Jayati,

It appears that Apache is stripping your properties file attachment. Try 
sending a link with:

  http://pastebin.com/

Also, could you please attach logs from the AM and container? I want to verify 
that there are no exceptions that are blocking the processing of messages.

Lastly, have you verified that messages are actually being produced to Kafka? 
This can be done using the kafka-console-consumer.sh script that comes with 
Kafka.

  deploy/kafka/bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic 
YOUR_TOPIC_HERE

If you get me the logs and properties files, I can probably help more.

Cheers,
Chris

From: Jayati Tiwari 
<[email protected]<mailto:[email protected]>>
Reply-To: 
"[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Thursday, November 7, 2013 4:27 AM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: RE: Writing a Samza application to work as both Kafka Producer and 
Consumer

Please find the attached properties file.

Regards,
Jayati
________________________________
From: Jayati Tiwari
Sent: Thursday, November 07, 2013 5:55 PM
To: [email protected]<mailto:[email protected]>
Subject: Writing a Samza application to work as both Kafka Producer and Consumer

Hello,

I have tried running the Hello-Samza project which worked successfully for me. 
Now I am targeting a Samza application that can consume messages from a Kafka 
console producer and process it to emit the results back to a Kafka console 
consumer.

I followed the thread "Writing a simple KafkaProducer in Samza", from where I 
could get an example of the Kafka consumer in "WikipediaParserStreamTask.java".

On similar lines, I have been trying to create a Samza program in Java called 
MyTask.java which is attached herewith. I have created the class in the package 
"samza.examples.wikipedia.task" of the samza-wikipedia project. And I follow 
the same instructions to build the project as those for Hello-Samza.

The config/properties file I use is also attached. I need to add 
"systems.kafka.samza.msg.serde=string" since the messages I emit from the kafka 
producer console are strings, as opposed to 
"systems.kafka.samza.msg.serde=json" used in the "WikipediaParserStreamTask" 
example.

Even after having tried out many changes to the properties file, the Kafka 
producer and Samza application don't seem to connect to each other. However, I 
can see SamzaAppMaster and SamzaContainer processes running on the machine.

I can get that since the Samza application is not able to receive messages from 
the Kafka producer running locally, its process() method is not invoked.

Can you help me in making this work please?

Regards,
Jayati

________________________________






NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.

________________________________






NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.

________________________________






NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.

________________________________






NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.

Reply via email to