Hi, Selina,

Your stack trace showed that the exception was thrown at line 50 in your
task code. Could you point out which line is it?

It would be helpful if you can add some log info regarding to the message
you receive in the process() vs the message you read from Kafka console
consumer.

Thanks!

-Yi

On Wed, Oct 14, 2015 at 10:07 PM, Selina Tech <swucaree...@gmail.com> wrote:

> Dear All:
>     I tried to consumer kafka topic "cnr-proto" in Java. It got the
>  SamzaContainer NullPointerException as below.
>     The messages can be shown by command line correctly
> "deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
> --from-beginning --topic cnr-proto"
>
>     My Key and message of topic "cir-proto" at Kafka are both in byte[]
>
>    run
>    deploy/samza/bin/run-job.sh
> --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
> --config-path=file://$PWD/deploy/samza/config/mct-aggregation.properties
>
>     the properties file and java code are list below also.
>
>    Your help is highly appreciated.
>
> Sincerely,
> Selina
>
> -------------error in samza-container-0.log-------------
>
> 2015-10-14 21:48:22 SamzaContainer [ERROR] Caught exception in process
> loop.
> java.lang.NullPointerException
> at samza.http.demo.task.MctAggregateTask.process(MctAggregateTask.java:50)
> at
>
> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:133)
> at
>
> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
> at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:132)
> at
>
> org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:112)
> at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> at org.apache.samza.container.RunLoop.process(RunLoop.scala:98)
> at org.apache.samza.container.RunLoop.run(RunLoop.scala:69)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:555)
> at
>
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93)
> at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
> at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down.
> 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down consumer
> multiplexer.
> 2015-10-14 21:48:22 SystemConsumers [DEBUG] Stopping consumers.
> 2015-10-14 21:48:22 BrokerProxy [INFO] Shutting down BrokerProxy for
> 10.1.10.141:9092
> 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [INFO] Reconnect due to
> socket error: java.nio.channels.ClosedByInterruptException
> 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from
> 10.1.10.141:9092
>
> ---------mct-aggregation.properties----------
>  # Job
>  job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>  job.name=mct-aggregation
>
>
>  
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>  task.checkpoint.system=kafka
>  # Normally, this would be 3, but we have only one broker.
>  task.checkpoint.replication.factor=1
>
>  # YARN
>
>  
> yarn.package.path=file:///Users/selina/IdeaProjects/cnr-mct-aggregation-samza/target/hello-samza-0.9.1-dist.tar.gz
>
>  # Task
> # path ./src/main/java/samza/http/demo/task/MctAggregateTask.java
>  task.class=samza.http.demo.task.MctAggregateTask
>  task.inputs=kafka.cnr-proto
>
>  # Serializers
>
>  serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
>
>  
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
>  serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
>
>  
> serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
>
>
>  # Kafka System
>
>  systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>  systems.kafka.samza.key.serde=byte
>  systems.kafka.samza.msg.serde=byte
>
> # Use the "byte" serializer for messages in the "cnr-proto" topic
> systems.kafka.streams.cnr-proto.samza.key.serde=byte
> systems.kafka.streams.cnr-proto.samza.msg.serde=byte
>
>  systems.kafka.consumer.zookeeper.connect=localhost:2181/
>  systems.kafka.producer.bootstrap.servers=localhost:9092
>
>  #stream from begining
>  #systems.kafka.consumer.auto.offset.reset=smallest
> #http-demo from the oldest
>  systems.kafka.cnr-proto.samza.offset.default=oldest
> # all stream from the oldest
>  systems.kafka.streams.cnr-proto.samza.offset.default=oldest
>  systems.kafka.streams.cnr-proto.samza.reset.offset=true
>
> -------------------MctAggregateTask.java----------
>
> public class MctAggregateTask implements StreamTask {
>
>   private static final SystemStream OUTPUT_STREAM = new
> SystemStream("kafka", "cnr-proto-tmp");
>
>   @SuppressWarnings("unchecked")
>   @Override
>   public void process(IncomingMessageEnvelope envelope, MessageCollector
> collector, TaskCoordinator coordinator) throws Exception {
>
>     byte[] key = (byte[])envelope.getKey();
>     byte[] message = (byte[]) envelope.getMessage();
>
>     logger.info("key="+key.toString()+": message="+message.toString());
>     collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
>   }
>

Reply via email to