Dear All:
I tried to consumer kafka topic "cnr-proto" in Java. It got the
SamzaContainer NullPointerException as below.
The messages can be shown by command line correctly
"deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
--from-beginning --topic cnr-proto"
My Key and message of topic "cir-proto" at Kafka are both in byte[]
run
deploy/samza/bin/run-job.sh
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
--config-path=file://$PWD/deploy/samza/config/mct-aggregation.properties
the properties file and java code are list below also.
Your help is highly appreciated.
Sincerely,
Selina
-------------error in samza-container-0.log-------------
2015-10-14 21:48:22 SamzaContainer [ERROR] Caught exception in process loop.
java.lang.NullPointerException
at samza.http.demo.task.MctAggregateTask.process(MctAggregateTask.java:50)
at
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:133)
at
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:132)
at
org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:112)
at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
at org.apache.samza.container.RunLoop.process(RunLoop.scala:98)
at org.apache.samza.container.RunLoop.run(RunLoop.scala:69)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:555)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93)
at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down.
2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down consumer
multiplexer.
2015-10-14 21:48:22 SystemConsumers [DEBUG] Stopping consumers.
2015-10-14 21:48:22 BrokerProxy [INFO] Shutting down BrokerProxy for
10.1.10.141:9092
2015-10-14 21:48:22 DefaultFetchSimpleConsumer [INFO] Reconnect due to
socket error: java.nio.channels.ClosedByInterruptException
2015-10-14 21:48:22 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from
10.1.10.141:9092
---------mct-aggregation.properties----------
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=mct-aggregation
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=1
# YARN
yarn.package.path=file:///Users/selina/IdeaProjects/cnr-mct-aggregation-samza/target/hello-samza-0.9.1-dist.tar.gz
# Task
# path ./src/main/java/samza/http/demo/task/MctAggregateTask.java
task.class=samza.http.demo.task.MctAggregateTask
task.inputs=kafka.cnr-proto
# Serializers
serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.key.serde=byte
systems.kafka.samza.msg.serde=byte
# Use the "byte" serializer for messages in the "cnr-proto" topic
systems.kafka.streams.cnr-proto.samza.key.serde=byte
systems.kafka.streams.cnr-proto.samza.msg.serde=byte
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
#stream from begining
#systems.kafka.consumer.auto.offset.reset=smallest
#http-demo from the oldest
systems.kafka.cnr-proto.samza.offset.default=oldest
# all stream from the oldest
systems.kafka.streams.cnr-proto.samza.offset.default=oldest
systems.kafka.streams.cnr-proto.samza.reset.offset=true
-------------------MctAggregateTask.java----------
public class MctAggregateTask implements StreamTask {
private static final SystemStream OUTPUT_STREAM = new
SystemStream("kafka", "cnr-proto-tmp");
@SuppressWarnings("unchecked")
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) throws Exception {
byte[] key = (byte[])envelope.getKey();
byte[] message = (byte[]) envelope.getMessage();
logger.info("key="+key.toString()+": message="+message.toString());
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
}