[
https://issues.apache.org/jira/browse/KAFKA-7874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Christopher S Lester updated KAFKA-7874:
----------------------------------------
Description:
I'm seeing an exception thrown when instantiating a KafkaStreams object in a
Clojure stream processor and am not sure that this is a config problem (would
be great if it is and someone can point out what config is missing).
Kafka deps:
{code:java}
[org.apache.kafka/kafka-streams "2.1.0"] ;;
https://search.maven.org/artifact/org.apache.kafka/kafka-streams/2.1.0/jar
[org.apache.kafka/kafka-streams-test-utils "2.1.0"] ;;
https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils
[org.apache.kafka/kafka-clients "2.1.0"] ;;
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
[org.apache.kafka/kafka-clients "1.1.0" :classifier "test"]{code}
Configuration:
{code:java}
;; kafka configuration
:calais-response-processor.config/kafka-configuration {
:applicationid "open-calais-tagging-microservice"
:bootstrap-servers "localhost:9092"
:commit-interval 10000 ;; milliseconds
:input-topic "db.draft.created"
:output-topic "streaming.calais.response"
}
{code}
Fn:
{code:java}
(defn start->streams
[]
(log/info "[start->streams] enter")
(when (not (instance? KafkaStreams @streams))
(let [kafka-config (get-in (config CONFIG_PATH)
[:calais-response-processor.config/kafka-configuration])
stream-processing-props {StreamsConfig/APPLICATION_ID_CONFIG (get-in
kafka-config [:applicationid])
StreamsConfig/COMMIT_INTERVAL_MS_CONFIG (get-in kafka-config
[:auto.commit.interval.ms])
StreamsConfig/BOOTSTRAP_SERVERS_CONFIG (get-in kafka-config
[:bootstrap-servers])
StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (.getName (.getClass
(Serdes/String)))
StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (.getName (.getClass
(Serdes/String)))
StreamsConfig/PROCESSING_GUARANTEE_CONFIG StreamsConfig/EXACTLY_ONCE}]
(try
(let [kafka-streams (KafkaStreams. (calais-processor-topology) (StreamsConfig.
stream-processing-props))]
(log/infof "[start->streams] created kafka stream with config: %s"
stream-processing-props)
(swap! streams kafka-streams))
(catch Exception e (log/error e)))))
(.start @streams))
{code}
Things seem to be "ok" if I don't catch the exception (i.e. it attempts to
connect and process) but since it's throwing an exception I'm unable to control
it's lifecycle right now unless I move this to a (def ..) vs a (defn..) ..
however the exception still happens so I need that understood before putting it
into production.
Result:
{code:java}
[{:type java.lang.NullPointerException
:message nil
:at [org.apache.kafka.streams.processor.internals.StreamThread <init>
StreamThread.java 719]}]
:trace
[[org.apache.kafka.streams.processor.internals.StreamThread <init>
StreamThread.java 719]
[org.apache.kafka.streams.processor.internals.StreamThread create
StreamThread.java 671]{code}
(full log of startup)
{code:java}
=> (start-stream-processing)
"[start-stream-processing] START: streams processing"
INFO calais-response-processor.core: [start->streams] enter
INFO calais-response-processor.config: loading config from resources/config.edn
INFO calais-response-processor.core: [calais-processor-topology] Open Calais
API Streaming Topology
INFO org.apache.kafka.streams.StreamsConfig: StreamsConfig values:
application.id = open-calais-tagging-microservice
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = null
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class
org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class
org.apache.kafka.common.serialization.Serdes$StringSerde
max.task.idle.ms = 0
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = exactly_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000
INFO org.apache.kafka.clients.admin.AdminClientConfig: AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
client.id =
open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-admin
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId :
eec43959745f444f
INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread
[open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1]
Creating restore consumer client
INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-restore-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_committed
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId :
eec43959745f444f
INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread
[open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1]
Creating consumer client
INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = open-calais-tagging-microservice
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_committed
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy =
[org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
WARN org.apache.kafka.clients.consumer.ConsumerConfig: The configuration
'admin.retries' was supplied but isn't a known config.
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId :
eec43959745f444f
ERROR calais-response-processor.core: #error {
:cause nil
:via
[{:type java.lang.NullPointerException
:message nil
:at [org.apache.kafka.streams.processor.internals.StreamThread <init>
StreamThread.java 719]}]
:trace
[[org.apache.kafka.streams.processor.internals.StreamThread <init>
StreamThread.java 719]
[org.apache.kafka.streams.processor.internals.StreamThread create
StreamThread.java 671]
[org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 706]
[org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 624]
[org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 608]
[org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 598]
[sun.reflect.NativeConstructorAccessorImpl newInstance0 nil -2]
[sun.reflect.NativeConstructorAccessorImpl newInstance nil -1]
[sun.reflect.DelegatingConstructorAccessorImpl newInstance nil -1]
[java.lang.reflect.Constructor newInstance nil -1]
[clojure.lang.Reflector invokeConstructor Reflector.java 180]
[calais_response_processor.core$start__GT_streams$fn__258 invoke core.clj 148]
[calais_response_processor.core$start__GT_streams invokeStatic core.clj 147]
[calais_response_processor.core$start__GT_streams invoke core.clj 136]
[calais_response_processor.core$start_stream_processing invokeStatic core.clj
165]
[calais_response_processor.core$start_stream_processing invoke core.clj 162]
[calais_response_processor.core$eval1539 invokeStatic
form-init8971608817606635487.clj 1]
[calais_response_processor.core$eval1539 invoke
form-init8971608817606635487.clj 1]
[clojure.lang.Compiler eval Compiler.java 7062]
[clojure.lang.Compiler eval Compiler.java 7025]
[clojure.core$eval invokeStatic core.clj 3206]
[clojure.core$eval invoke core.clj 3202]
[clojure.main$repl$read_eval_print__8572$fn__8575 invoke main.clj 243]
[clojure.main$repl$read_eval_print__8572 invoke main.clj 243]
[clojure.main$repl$fn__8581 invoke main.clj 261]
[clojure.main$repl invokeStatic main.clj 261]
[clojure.main$repl doInvoke main.clj 177]
[clojure.lang.RestFn invoke RestFn.java 1523]
[nrepl.middleware.interruptible_eval$evaluate$fn__912 invoke
interruptible_eval.clj 83]
[clojure.lang.AFn applyToHelper AFn.java 152]
[clojure.lang.AFn applyTo AFn.java 144]
[clojure.core$apply invokeStatic core.clj 657]
[clojure.core$with_bindings_STAR_ invokeStatic core.clj 1965]
[clojure.core$with_bindings_STAR_ doInvoke core.clj 1965]
[clojure.lang.RestFn invoke RestFn.java 425]
[nrepl.middleware.interruptible_eval$evaluate invokeStatic
interruptible_eval.clj 81]
[nrepl.middleware.interruptible_eval$evaluate invoke interruptible_eval.clj 50]
[nrepl.middleware.interruptible_eval$interruptible_eval$fn__955$fn__958 invoke
interruptible_eval.clj 221]
[nrepl.middleware.interruptible_eval$run_next$fn__950 invoke
interruptible_eval.clj 189]
[clojure.lang.AFn run AFn.java 22]
[java.util.concurrent.ThreadPoolExecutor runWorker nil -1]
[java.util.concurrent.ThreadPoolExecutor$Worker run nil -1]
[java.lang.Thread run nil -1]]}
NullPointerException clojure.lang.Reflector.invokeNoArgInstanceMember
(Reflector.java:301)
{code}
was:
I'm seeing an exception thrown when instantiating a KafkaStreams object in a
Clojure stream processor and am not sure that this is a config problem (would
be great if it is and someone can point out what config is missing).
Kafka deps:
{code:java}
[org.apache.kafka/kafka-streams "2.1.0"] ;;
https://search.maven.org/artifact/org.apache.kafka/kafka-streams/2.1.0/jar
[org.apache.kafka/kafka-streams-test-utils "2.1.0"] ;;
https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils
[org.apache.kafka/kafka-clients "2.1.0"] ;;
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
[org.apache.kafka/kafka-clients "1.1.0" :classifier "test"]{code}
Configuration:
{code:java}
;; kafka configuration
:calais-response-processor.config/kafka-configuration {
:applicationid "open-calais-tagging-microservice"
:bootstrap-servers "localhost:9092"
:commit-interval 10000 ;; milliseconds
:input-topic "db.draft.created"
:output-topic "streaming.calais.response"
}
{code}
Fn:
{code:java}
(defn start->streams
[]
(log/info "[start->streams] enter")
(when (not (instance? KafkaStreams @streams))
(let [kafka-config (get-in (config CONFIG_PATH)
[:calais-response-processor.config/kafka-configuration])
stream-processing-props {StreamsConfig/APPLICATION_ID_CONFIG (get-in
kafka-config [:applicationid])
StreamsConfig/COMMIT_INTERVAL_MS_CONFIG (get-in kafka-config
[:auto.commit.interval.ms])
StreamsConfig/BOOTSTRAP_SERVERS_CONFIG (get-in kafka-config
[:bootstrap-servers])
StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (.getName (.getClass
(Serdes/String)))
StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (.getName (.getClass
(Serdes/String)))
StreamsConfig/PROCESSING_GUARANTEE_CONFIG StreamsConfig/EXACTLY_ONCE}]
(try
(let [kafka-streams (KafkaStreams. (calais-processor-topology) (StreamsConfig.
stream-processing-props))]
(log/infof "[start->streams] created kafka stream with config: %s"
stream-processing-props)
(swap! streams kafka-streams))
(catch Exception e (log/error e)))))
(.start @streams))
{code}
Things seem to be "ok" if I don't catch the exception (i.e. it attempts to
connect and process) but since it's throwing an exception I'm unable to control
it's lifecycle right now unless I move this to a (def ..) vs a (defn..) ..
however the exception still happens so I need that understood before putting it
into production.
Result:
[\{:type java.lang.NullPointerException
:message nil
:at [org.apache.kafka.streams.processor.internals.StreamThread <init>
StreamThread.java 719]}]
:trace
[[org.apache.kafka.streams.processor.internals.StreamThread <init>
StreamThread.java 719]
[org.apache.kafka.streams.processor.internals.StreamThread create
StreamThread.java 671]
(full log of startup)
{code:java}
=> (start-stream-processing)
"[start-stream-processing] START: streams processing"
INFO calais-response-processor.core: [start->streams] enter
INFO calais-response-processor.config: loading config from resources/config.edn
INFO calais-response-processor.core: [calais-processor-topology] Open Calais
API Streaming Topology
INFO org.apache.kafka.streams.StreamsConfig: StreamsConfig values:
application.id = open-calais-tagging-microservice
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = null
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class
org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class
org.apache.kafka.common.serialization.Serdes$StringSerde
max.task.idle.ms = 0
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = exactly_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000
INFO org.apache.kafka.clients.admin.AdminClientConfig: AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
client.id =
open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-admin
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId :
eec43959745f444f
INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread
[open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1]
Creating restore consumer client
INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-restore-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_committed
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId :
eec43959745f444f
INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread
[open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1]
Creating consumer client
INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = open-calais-tagging-microservice
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_committed
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy =
[org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
WARN org.apache.kafka.clients.consumer.ConsumerConfig: The configuration
'admin.retries' was supplied but isn't a known config.
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId :
eec43959745f444f
ERROR calais-response-processor.core: #error {
:cause nil
:via
[{:type java.lang.NullPointerException
:message nil
:at [org.apache.kafka.streams.processor.internals.StreamThread <init>
StreamThread.java 719]}]
:trace
[[org.apache.kafka.streams.processor.internals.StreamThread <init>
StreamThread.java 719]
[org.apache.kafka.streams.processor.internals.StreamThread create
StreamThread.java 671]
[org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 706]
[org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 624]
[org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 608]
[org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 598]
[sun.reflect.NativeConstructorAccessorImpl newInstance0 nil -2]
[sun.reflect.NativeConstructorAccessorImpl newInstance nil -1]
[sun.reflect.DelegatingConstructorAccessorImpl newInstance nil -1]
[java.lang.reflect.Constructor newInstance nil -1]
[clojure.lang.Reflector invokeConstructor Reflector.java 180]
[calais_response_processor.core$start__GT_streams$fn__258 invoke core.clj 148]
[calais_response_processor.core$start__GT_streams invokeStatic core.clj 147]
[calais_response_processor.core$start__GT_streams invoke core.clj 136]
[calais_response_processor.core$start_stream_processing invokeStatic core.clj
165]
[calais_response_processor.core$start_stream_processing invoke core.clj 162]
[calais_response_processor.core$eval1539 invokeStatic
form-init8971608817606635487.clj 1]
[calais_response_processor.core$eval1539 invoke
form-init8971608817606635487.clj 1]
[clojure.lang.Compiler eval Compiler.java 7062]
[clojure.lang.Compiler eval Compiler.java 7025]
[clojure.core$eval invokeStatic core.clj 3206]
[clojure.core$eval invoke core.clj 3202]
[clojure.main$repl$read_eval_print__8572$fn__8575 invoke main.clj 243]
[clojure.main$repl$read_eval_print__8572 invoke main.clj 243]
[clojure.main$repl$fn__8581 invoke main.clj 261]
[clojure.main$repl invokeStatic main.clj 261]
[clojure.main$repl doInvoke main.clj 177]
[clojure.lang.RestFn invoke RestFn.java 1523]
[nrepl.middleware.interruptible_eval$evaluate$fn__912 invoke
interruptible_eval.clj 83]
[clojure.lang.AFn applyToHelper AFn.java 152]
[clojure.lang.AFn applyTo AFn.java 144]
[clojure.core$apply invokeStatic core.clj 657]
[clojure.core$with_bindings_STAR_ invokeStatic core.clj 1965]
[clojure.core$with_bindings_STAR_ doInvoke core.clj 1965]
[clojure.lang.RestFn invoke RestFn.java 425]
[nrepl.middleware.interruptible_eval$evaluate invokeStatic
interruptible_eval.clj 81]
[nrepl.middleware.interruptible_eval$evaluate invoke interruptible_eval.clj 50]
[nrepl.middleware.interruptible_eval$interruptible_eval$fn__955$fn__958 invoke
interruptible_eval.clj 221]
[nrepl.middleware.interruptible_eval$run_next$fn__950 invoke
interruptible_eval.clj 189]
[clojure.lang.AFn run AFn.java 22]
[java.util.concurrent.ThreadPoolExecutor runWorker nil -1]
[java.util.concurrent.ThreadPoolExecutor$Worker run nil -1]
[java.lang.Thread run nil -1]]}
NullPointerException clojure.lang.Reflector.invokeNoArgInstanceMember
(Reflector.java:301)
{code}
> NPE thrown while instantiating a KafkaStreams. object
> -----------------------------------------------------
>
> Key: KAFKA-7874
> URL: https://issues.apache.org/jira/browse/KAFKA-7874
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.1.0
> Environment: Windows 10 Pro bld 1803
> Ryzen 7 2700 32GM ram
> Clojure 1.10
> Reporter: Christopher S Lester
> Priority: Major
>
> I'm seeing an exception thrown when instantiating a KafkaStreams object in a
> Clojure stream processor and am not sure that this is a config problem (would
> be great if it is and someone can point out what config is missing).
> Kafka deps:
> {code:java}
> [org.apache.kafka/kafka-streams "2.1.0"] ;;
> https://search.maven.org/artifact/org.apache.kafka/kafka-streams/2.1.0/jar
> [org.apache.kafka/kafka-streams-test-utils "2.1.0"] ;;
> https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils
> [org.apache.kafka/kafka-clients "2.1.0"] ;;
> https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
> [org.apache.kafka/kafka-clients "1.1.0" :classifier "test"]{code}
>
> Configuration:
> {code:java}
> ;; kafka configuration
> :calais-response-processor.config/kafka-configuration {
> :applicationid "open-calais-tagging-microservice"
> :bootstrap-servers "localhost:9092"
> :commit-interval 10000 ;; milliseconds
> :input-topic "db.draft.created"
> :output-topic "streaming.calais.response"
> }
> {code}
>
> Fn:
> {code:java}
> (defn start->streams
> []
> (log/info "[start->streams] enter")
> (when (not (instance? KafkaStreams @streams))
> (let [kafka-config (get-in (config CONFIG_PATH)
> [:calais-response-processor.config/kafka-configuration])
> stream-processing-props {StreamsConfig/APPLICATION_ID_CONFIG (get-in
> kafka-config [:applicationid])
> StreamsConfig/COMMIT_INTERVAL_MS_CONFIG (get-in kafka-config
> [:auto.commit.interval.ms])
> StreamsConfig/BOOTSTRAP_SERVERS_CONFIG (get-in kafka-config
> [:bootstrap-servers])
> StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (.getName (.getClass
> (Serdes/String)))
> StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (.getName (.getClass
> (Serdes/String)))
> StreamsConfig/PROCESSING_GUARANTEE_CONFIG StreamsConfig/EXACTLY_ONCE}]
> (try
> (let [kafka-streams (KafkaStreams. (calais-processor-topology)
> (StreamsConfig. stream-processing-props))]
> (log/infof "[start->streams] created kafka stream with config: %s"
> stream-processing-props)
> (swap! streams kafka-streams))
> (catch Exception e (log/error e)))))
> (.start @streams))
> {code}
> Things seem to be "ok" if I don't catch the exception (i.e. it attempts to
> connect and process) but since it's throwing an exception I'm unable to
> control it's lifecycle right now unless I move this to a (def ..) vs a
> (defn..) .. however the exception still happens so I need that understood
> before putting it into production.
>
> Result:
> {code:java}
> [{:type java.lang.NullPointerException
> :message nil
> :at [org.apache.kafka.streams.processor.internals.StreamThread <init>
> StreamThread.java 719]}]
> :trace
> [[org.apache.kafka.streams.processor.internals.StreamThread <init>
> StreamThread.java 719]
> [org.apache.kafka.streams.processor.internals.StreamThread create
> StreamThread.java 671]{code}
>
> (full log of startup)
> {code:java}
> => (start-stream-processing)
> "[start-stream-processing] START: streams processing"
> INFO calais-response-processor.core: [start->streams] enter
> INFO calais-response-processor.config: loading config from
> resources/config.edn
> INFO calais-response-processor.core: [calais-processor-topology] Open Calais
> API Streaming Topology
> INFO org.apache.kafka.streams.StreamsConfig: StreamsConfig values:
> application.id = open-calais-tagging-microservice
> application.server =
> bootstrap.servers = [localhost:9092]
> buffered.records.per.partition = 1000
> cache.max.bytes.buffering = 10485760
> client.id =
> commit.interval.ms = null
> connections.max.idle.ms = 540000
> default.deserialization.exception.handler = class
> org.apache.kafka.streams.errors.LogAndFailExceptionHandler
> default.key.serde = class
> org.apache.kafka.common.serialization.Serdes$StringSerde
> default.production.exception.handler = class
> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
> default.timestamp.extractor = class
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp
> default.value.serde = class
> org.apache.kafka.common.serialization.Serdes$StringSerde
> max.task.idle.ms = 0
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> num.standby.replicas = 0
> num.stream.threads = 1
> partition.grouper = class
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
> poll.ms = 100
> processing.guarantee = exactly_once
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> replication.factor = 1
> request.timeout.ms = 40000
> retries = 0
> retry.backoff.ms = 100
> rocksdb.config.setter = null
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> state.cleanup.delay.ms = 600000
> state.dir = /tmp/kafka-streams
> topology.optimization = none
> upgrade.from = null
> windowstore.changelog.additional.retention.ms = 86400000
> INFO org.apache.kafka.clients.admin.AdminClientConfig: AdminClientConfig
> values:
> bootstrap.servers = [localhost:9092]
> client.dns.lookup = default
> client.id =
> open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-admin
> connections.max.idle.ms = 300000
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 120000
> retries = 5
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = https
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
> INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId :
> eec43959745f444f
> INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread
> [open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1]
> Creating restore consumer client
> INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
> auto.commit.interval.ms = 5000
> auto.offset.reset = none
> bootstrap.servers = [localhost:9092]
> check.crcs = true
> client.dns.lookup = default
> client.id =
> open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-restore-consumer
> connections.max.idle.ms = 540000
> default.api.timeout.ms = 60000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id =
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = false
> isolation.level = read_committed
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 2147483647
> max.poll.records = 1000
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 30000
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> session.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = https
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
> INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId :
> eec43959745f444f
> INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread
> [open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1]
> Creating consumer client
> INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
> auto.commit.interval.ms = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [localhost:9092]
> check.crcs = true
> client.dns.lookup = default
> client.id =
> open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-consumer
> connections.max.idle.ms = 540000
> default.api.timeout.ms = 60000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = open-calais-tagging-microservice
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = false
> isolation.level = read_committed
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 2147483647
> max.poll.records = 1000
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy =
> [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 30000
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> session.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = https
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> WARN org.apache.kafka.clients.consumer.ConsumerConfig: The configuration
> 'admin.retries' was supplied but isn't a known config.
> INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
> INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId :
> eec43959745f444f
> ERROR calais-response-processor.core: #error {
> :cause nil
> :via
> [{:type java.lang.NullPointerException
> :message nil
> :at [org.apache.kafka.streams.processor.internals.StreamThread <init>
> StreamThread.java 719]}]
> :trace
> [[org.apache.kafka.streams.processor.internals.StreamThread <init>
> StreamThread.java 719]
> [org.apache.kafka.streams.processor.internals.StreamThread create
> StreamThread.java 671]
> [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 706]
> [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 624]
> [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 608]
> [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 598]
> [sun.reflect.NativeConstructorAccessorImpl newInstance0 nil -2]
> [sun.reflect.NativeConstructorAccessorImpl newInstance nil -1]
> [sun.reflect.DelegatingConstructorAccessorImpl newInstance nil -1]
> [java.lang.reflect.Constructor newInstance nil -1]
> [clojure.lang.Reflector invokeConstructor Reflector.java 180]
> [calais_response_processor.core$start__GT_streams$fn__258 invoke core.clj
> 148]
> [calais_response_processor.core$start__GT_streams invokeStatic core.clj 147]
> [calais_response_processor.core$start__GT_streams invoke core.clj 136]
> [calais_response_processor.core$start_stream_processing invokeStatic
> core.clj 165]
> [calais_response_processor.core$start_stream_processing invoke core.clj 162]
> [calais_response_processor.core$eval1539 invokeStatic
> form-init8971608817606635487.clj 1]
> [calais_response_processor.core$eval1539 invoke
> form-init8971608817606635487.clj 1]
> [clojure.lang.Compiler eval Compiler.java 7062]
> [clojure.lang.Compiler eval Compiler.java 7025]
> [clojure.core$eval invokeStatic core.clj 3206]
> [clojure.core$eval invoke core.clj 3202]
> [clojure.main$repl$read_eval_print__8572$fn__8575 invoke main.clj 243]
> [clojure.main$repl$read_eval_print__8572 invoke main.clj 243]
> [clojure.main$repl$fn__8581 invoke main.clj 261]
> [clojure.main$repl invokeStatic main.clj 261]
> [clojure.main$repl doInvoke main.clj 177]
> [clojure.lang.RestFn invoke RestFn.java 1523]
> [nrepl.middleware.interruptible_eval$evaluate$fn__912 invoke
> interruptible_eval.clj 83]
> [clojure.lang.AFn applyToHelper AFn.java 152]
> [clojure.lang.AFn applyTo AFn.java 144]
> [clojure.core$apply invokeStatic core.clj 657]
> [clojure.core$with_bindings_STAR_ invokeStatic core.clj 1965]
> [clojure.core$with_bindings_STAR_ doInvoke core.clj 1965]
> [clojure.lang.RestFn invoke RestFn.java 425]
> [nrepl.middleware.interruptible_eval$evaluate invokeStatic
> interruptible_eval.clj 81]
> [nrepl.middleware.interruptible_eval$evaluate invoke interruptible_eval.clj
> 50]
> [nrepl.middleware.interruptible_eval$interruptible_eval$fn__955$fn__958
> invoke interruptible_eval.clj 221]
> [nrepl.middleware.interruptible_eval$run_next$fn__950 invoke
> interruptible_eval.clj 189]
> [clojure.lang.AFn run AFn.java 22]
> [java.util.concurrent.ThreadPoolExecutor runWorker nil -1]
> [java.util.concurrent.ThreadPoolExecutor$Worker run nil -1]
> [java.lang.Thread run nil -1]]}
> NullPointerException clojure.lang.Reflector.invokeNoArgInstanceMember
> (Reflector.java:301)
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)