Hi,
Running environment:
Samza version: 2.12-1.1.0
Kafka cluster version: 2.12-1.1.0
Hadoop version: 3.1.0
When using Kafka_2.12-1.1.0.jar, Samza job failed to run when
retrieving coordinator stream meta info from Kafka:
2019-06-03 17:26:11.176 [main] KafkaSystemAdmin [INFO] Fetching
SystemStreamMetadata for topics [__samza_coordinator_canal-metrics-test_1] on
system kafka
2019-06-03 17:26:11.179 [main] KafkaSystemAdmin [ERROR] Fetching system stream
metadata for: [__samza_coordinator_canal-metrics-test_1] threw an exception.
java.lang.NullPointerException
at
java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
at
org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:75)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:253)
at kafka.utils.Logging.logger(Logging.scala:43)
at kafka.utils.Logging.debug(Logging.scala:62)
at kafka.utils.Logging.debug$(Logging.scala:62)
at
org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala$.debug(KafkaSystemAdminUtilsScala.scala:45)
at
org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala$.assembleMetadata(KafkaSystemAdminUtilsScala.scala:74)
at
org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala.assembleMetadata(KafkaSystemAdminUtilsScala.scala)
at
org.apache.samza.system.kafka.KafkaSystemAdmin.fetchSystemStreamMetadata(KafkaSystemAdmin.java:461)
at
org.apache.samza.system.kafka.KafkaSystemAdmin.access$400(KafkaSystemAdmin.java:76)
at
org.apache.samza.system.kafka.KafkaSystemAdmin$4.apply(KafkaSystemAdmin.java:340)
at
org.apache.samza.system.kafka.KafkaSystemAdmin$4.apply(KafkaSystemAdmin.java:337)
at
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:90)
at
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.java:374)
at
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.java:292)
at
org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.register(CoordinatorStreamSystemConsumer.java:109)
at org.apache.samza.job.JobRunner.run(JobRunner.scala:105)
at
org.apache.samza.runtime.RemoteApplicationRunner.lambda$run$0(RemoteApplicationRunner.java:76)
at java.util.ArrayList.forEach(ArrayList.java:1255)
at
org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:73)
at
org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49)
at
org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53)
Exception in thread "main" org.apache.samza.SamzaException: Failed to run
application
at
org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79)
at
org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49)
at
org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53)
Caused by: org.apache.samza.SamzaException: java.lang.NullPointerException
at
org.apache.samza.system.kafka.KafkaSystemAdmin$5.apply(KafkaSystemAdmin.java:358)
at
org.apache.samza.system.kafka.KafkaSystemAdmin$5.apply(KafkaSystemAdmin.java:347)
at
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:97)
at
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.java:374)
at
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.java:292)
at
org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.register(CoordinatorStreamSystemConsumer.java:109)
at org.apache.samza.job.JobRunner.run(JobRunner.scala:105)
at
org.apache.samza.runtime.RemoteApplicationRunner.lambda$run$0(RemoteApplicationRunner.java:76)
at java.util.ArrayList.forEach(ArrayList.java:1255)
at
org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:73)
... 2 more
Caused by: java.lang.NullPointerException
at
java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
at
org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:75)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:253)
at kafka.utils.Logging.logger(Logging.scala:43)
at kafka.utils.Logging.debug(Logging.scala:62)
at kafka.utils.Logging.debug$(Logging.scala:62)
at
org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala$.debug(KafkaSystemAdminUtilsScala.scala:45)
at
org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala$.assembleMetadata(KafkaSystemAdminUtilsScala.scala:74)
at
org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala.assembleMetadata(KafkaSystemAdminUtilsScala.scala)
at
org.apache.samza.system.kafka.KafkaSystemAdmin.fetchSystemStreamMetadata(KafkaSystemAdmin.java:461)
at
org.apache.samza.system.kafka.KafkaSystemAdmin.access$400(KafkaSystemAdmin.java:76)
at
org.apache.samza.system.kafka.KafkaSystemAdmin$4.apply(KafkaSystemAdmin.java:340)
at
org.apache.samza.system.kafka.KafkaSystemAdmin$4.apply(KafkaSystemAdmin.java:337)
at
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:90)
... 9 more
When using Kafka_2.12-0.11.0.2.jar,coordinator stream meta info can be
retrieved successfully.
So I want to know the supported Kafka version by Samza, thanks!
Qi Shu