[ 
https://issues.apache.org/jira/browse/KAFKA-7874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher S Lester updated KAFKA-7874:
----------------------------------------
    Description: 
I'm seeing an exception thrown when instantiating a KafkaStreams object in a 
Clojure stream processor and am not sure that this is a config problem (would 
be great if it is and someone can point out what config is missing). 

Kafka deps:
{code:java}
[org.apache.kafka/kafka-streams "2.1.0"] ;; 
https://search.maven.org/artifact/org.apache.kafka/kafka-streams/2.1.0/jar 
[org.apache.kafka/kafka-streams-test-utils "2.1.0"] ;; 
https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils 
[org.apache.kafka/kafka-clients "2.1.0"] ;; 
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
[org.apache.kafka/kafka-clients "1.1.0" :classifier "test"]{code}
 
 Configuration:
{code:java}
;; kafka configuration
:calais-response-processor.config/kafka-configuration {
:applicationid "open-calais-tagging-microservice"
:bootstrap-servers "localhost:9092"
:commit-interval 10000 ;; milliseconds
:input-topic "db.draft.created"
:output-topic "streaming.calais.response"
}
{code}
 
 Fn:
{code:java}
(defn start->streams
[]
(log/info "[start->streams] enter")
(when (not (instance? KafkaStreams @streams))
(let [kafka-config (get-in (config CONFIG_PATH) 
[:calais-response-processor.config/kafka-configuration])
stream-processing-props {StreamsConfig/APPLICATION_ID_CONFIG (get-in 
kafka-config [:applicationid])
StreamsConfig/COMMIT_INTERVAL_MS_CONFIG (get-in kafka-config 
[:auto.commit.interval.ms])
StreamsConfig/BOOTSTRAP_SERVERS_CONFIG (get-in kafka-config 
[:bootstrap-servers])
StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (.getName (.getClass 
(Serdes/String)))
StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (.getName (.getClass 
(Serdes/String)))
StreamsConfig/PROCESSING_GUARANTEE_CONFIG StreamsConfig/EXACTLY_ONCE}]
(try 
(let [kafka-streams (KafkaStreams. (calais-processor-topology) (StreamsConfig. 
stream-processing-props))]
(log/infof "[start->streams] created kafka stream with config: %s" 
stream-processing-props)
(swap! streams kafka-streams))
(catch Exception e (log/error e)))))
(.start @streams))
{code}
Things seem to be "ok" if I don't catch the exception (i.e. it attempts to 
connect and process) but since it's throwing an exception I'm unable to control 
it's lifecycle right now unless I move this to a (def ..) vs a (defn..) .. 
however the exception still happens so I need that understood before putting it 
into production. 

 

Result:
{code:java}
[{:type java.lang.NullPointerException
 :message nil
 :at [org.apache.kafka.streams.processor.internals.StreamThread <init> 
StreamThread.java 719]}]
 :trace
 [[org.apache.kafka.streams.processor.internals.StreamThread <init> 
StreamThread.java 719]
 [org.apache.kafka.streams.processor.internals.StreamThread create 
StreamThread.java 671]{code}
 

(full log of startup)
{code:java}
=> (start-stream-processing)
"[start-stream-processing] START: streams processing"
INFO calais-response-processor.core: [start->streams] enter
INFO calais-response-processor.config: loading config from resources/config.edn
INFO calais-response-processor.core: [calais-processor-topology] Open Calais 
API Streaming Topology
INFO org.apache.kafka.streams.StreamsConfig: StreamsConfig values:
 application.id = open-calais-tagging-microservice
 application.server =
 bootstrap.servers = [localhost:9092]
 buffered.records.per.partition = 1000
 cache.max.bytes.buffering = 10485760
 client.id =
 commit.interval.ms = null
 connections.max.idle.ms = 540000
 default.deserialization.exception.handler = class 
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
 default.key.serde = class 
org.apache.kafka.common.serialization.Serdes$StringSerde
 default.production.exception.handler = class 
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
 default.timestamp.extractor = class 
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
 default.value.serde = class 
org.apache.kafka.common.serialization.Serdes$StringSerde
 max.task.idle.ms = 0
 metadata.max.age.ms = 300000
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 num.standby.replicas = 0
 num.stream.threads = 1
 partition.grouper = class 
org.apache.kafka.streams.processor.DefaultPartitionGrouper
 poll.ms = 100
 processing.guarantee = exactly_once
 receive.buffer.bytes = 32768
 reconnect.backoff.max.ms = 1000
 reconnect.backoff.ms = 50
 replication.factor = 1
 request.timeout.ms = 40000
 retries = 0
 retry.backoff.ms = 100
 rocksdb.config.setter = null
 security.protocol = PLAINTEXT
 send.buffer.bytes = 131072
 state.cleanup.delay.ms = 600000
 state.dir = /tmp/kafka-streams
 topology.optimization = none
 upgrade.from = null
 windowstore.changelog.additional.retention.ms = 86400000
INFO org.apache.kafka.clients.admin.AdminClientConfig: AdminClientConfig values:
 bootstrap.servers = [localhost:9092]
 client.dns.lookup = default
 client.id = 
open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-admin
 connections.max.idle.ms = 300000
 metadata.max.age.ms = 300000
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 receive.buffer.bytes = 65536
 reconnect.backoff.max.ms = 1000
 reconnect.backoff.ms = 50
 request.timeout.ms = 120000
 retries = 5
 retry.backoff.ms = 100
 sasl.client.callback.handler.class = null
 sasl.jaas.config = null
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 sasl.kerberos.min.time.before.relogin = 60000
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 sasl.kerberos.ticket.renew.window.factor = 0.8
 sasl.login.callback.handler.class = null
 sasl.login.class = null
 sasl.login.refresh.buffer.seconds = 300
 sasl.login.refresh.min.period.seconds = 60
 sasl.login.refresh.window.factor = 0.8
 sasl.login.refresh.window.jitter = 0.05
 sasl.mechanism = GSSAPI
 security.protocol = PLAINTEXT
 send.buffer.bytes = 131072
 ssl.cipher.suites = null
 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 ssl.endpoint.identification.algorithm = https
 ssl.key.password = null
 ssl.keymanager.algorithm = SunX509
 ssl.keystore.location = null
 ssl.keystore.password = null
 ssl.keystore.type = JKS
 ssl.protocol = TLS
 ssl.provider = null
 ssl.secure.random.implementation = null
 ssl.trustmanager.algorithm = PKIX
 ssl.truststore.location = null
 ssl.truststore.password = null
 ssl.truststore.type = JKS
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 
eec43959745f444f
INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
[open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1]
 Creating restore consumer client
INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
 auto.commit.interval.ms = 5000
 auto.offset.reset = none
 bootstrap.servers = [localhost:9092]
 check.crcs = true
 client.dns.lookup = default
 client.id = 
open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-restore-consumer
 connections.max.idle.ms = 540000
 default.api.timeout.ms = 60000
 enable.auto.commit = false
 exclude.internal.topics = true
 fetch.max.bytes = 52428800
 fetch.max.wait.ms = 500
 fetch.min.bytes = 1
 group.id =
 heartbeat.interval.ms = 3000
 interceptor.classes = []
 internal.leave.group.on.close = false
 isolation.level = read_committed
 key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
 max.partition.fetch.bytes = 1048576
 max.poll.interval.ms = 2147483647
 max.poll.records = 1000
 metadata.max.age.ms = 300000
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
 receive.buffer.bytes = 65536
 reconnect.backoff.max.ms = 1000
 reconnect.backoff.ms = 50
 request.timeout.ms = 30000
 retry.backoff.ms = 100
 sasl.client.callback.handler.class = null
 sasl.jaas.config = null
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 sasl.kerberos.min.time.before.relogin = 60000
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 sasl.kerberos.ticket.renew.window.factor = 0.8
 sasl.login.callback.handler.class = null
 sasl.login.class = null
 sasl.login.refresh.buffer.seconds = 300
 sasl.login.refresh.min.period.seconds = 60
 sasl.login.refresh.window.factor = 0.8
 sasl.login.refresh.window.jitter = 0.05
 sasl.mechanism = GSSAPI
 security.protocol = PLAINTEXT
 send.buffer.bytes = 131072
 session.timeout.ms = 10000
 ssl.cipher.suites = null
 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 ssl.endpoint.identification.algorithm = https
 ssl.key.password = null
 ssl.keymanager.algorithm = SunX509
 ssl.keystore.location = null
 ssl.keystore.password = null
 ssl.keystore.type = JKS
 ssl.protocol = TLS
 ssl.provider = null
 ssl.secure.random.implementation = null
 ssl.trustmanager.algorithm = PKIX
 ssl.truststore.location = null
 ssl.truststore.password = null
 ssl.truststore.type = JKS
 value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 
eec43959745f444f
INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
[open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1]
 Creating consumer client
INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
 auto.commit.interval.ms = 5000
 auto.offset.reset = earliest
 bootstrap.servers = [localhost:9092]
 check.crcs = true
 client.dns.lookup = default
 client.id = 
open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-consumer
 connections.max.idle.ms = 540000
 default.api.timeout.ms = 60000
 enable.auto.commit = false
 exclude.internal.topics = true
 fetch.max.bytes = 52428800
 fetch.max.wait.ms = 500
 fetch.min.bytes = 1
 group.id = open-calais-tagging-microservice
 heartbeat.interval.ms = 3000
 interceptor.classes = []
 internal.leave.group.on.close = false
 isolation.level = read_committed
 key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
 max.partition.fetch.bytes = 1048576
 max.poll.interval.ms = 2147483647
 max.poll.records = 1000
 metadata.max.age.ms = 300000
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 partition.assignment.strategy = 
[org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
 receive.buffer.bytes = 65536
 reconnect.backoff.max.ms = 1000
 reconnect.backoff.ms = 50
 request.timeout.ms = 30000
 retry.backoff.ms = 100
 sasl.client.callback.handler.class = null
 sasl.jaas.config = null
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 sasl.kerberos.min.time.before.relogin = 60000
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 sasl.kerberos.ticket.renew.window.factor = 0.8
 sasl.login.callback.handler.class = null
 sasl.login.class = null
 sasl.login.refresh.buffer.seconds = 300
 sasl.login.refresh.min.period.seconds = 60
 sasl.login.refresh.window.factor = 0.8
 sasl.login.refresh.window.jitter = 0.05
 sasl.mechanism = GSSAPI
 security.protocol = PLAINTEXT
 send.buffer.bytes = 131072
 session.timeout.ms = 10000
 ssl.cipher.suites = null
 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 ssl.endpoint.identification.algorithm = https
 ssl.key.password = null
 ssl.keymanager.algorithm = SunX509
 ssl.keystore.location = null
 ssl.keystore.password = null
 ssl.keystore.type = JKS
 ssl.protocol = TLS
 ssl.provider = null
 ssl.secure.random.implementation = null
 ssl.trustmanager.algorithm = PKIX
 ssl.truststore.location = null
 ssl.truststore.password = null
 ssl.truststore.type = JKS
 value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
WARN org.apache.kafka.clients.consumer.ConsumerConfig: The configuration 
'admin.retries' was supplied but isn't a known config.
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 
eec43959745f444f
ERROR calais-response-processor.core: #error {
 :cause nil
 :via
 [{:type java.lang.NullPointerException
 :message nil
 :at [org.apache.kafka.streams.processor.internals.StreamThread <init> 
StreamThread.java 719]}]
 :trace
 [[org.apache.kafka.streams.processor.internals.StreamThread <init> 
StreamThread.java 719]
 [org.apache.kafka.streams.processor.internals.StreamThread create 
StreamThread.java 671]
 [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 706]
 [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 624]
 [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 608]
 [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 598]
 [sun.reflect.NativeConstructorAccessorImpl newInstance0 nil -2]
 [sun.reflect.NativeConstructorAccessorImpl newInstance nil -1]
 [sun.reflect.DelegatingConstructorAccessorImpl newInstance nil -1]
 [java.lang.reflect.Constructor newInstance nil -1]
 [clojure.lang.Reflector invokeConstructor Reflector.java 180]
 [calais_response_processor.core$start__GT_streams$fn__258 invoke core.clj 148]
 [calais_response_processor.core$start__GT_streams invokeStatic core.clj 147]
 [calais_response_processor.core$start__GT_streams invoke core.clj 136]
 [calais_response_processor.core$start_stream_processing invokeStatic core.clj 
165]
 [calais_response_processor.core$start_stream_processing invoke core.clj 162]
 [calais_response_processor.core$eval1539 invokeStatic 
form-init8971608817606635487.clj 1]
 [calais_response_processor.core$eval1539 invoke 
form-init8971608817606635487.clj 1]
 [clojure.lang.Compiler eval Compiler.java 7062]
 [clojure.lang.Compiler eval Compiler.java 7025]
 [clojure.core$eval invokeStatic core.clj 3206]
 [clojure.core$eval invoke core.clj 3202]
 [clojure.main$repl$read_eval_print__8572$fn__8575 invoke main.clj 243]
 [clojure.main$repl$read_eval_print__8572 invoke main.clj 243]
 [clojure.main$repl$fn__8581 invoke main.clj 261]
 [clojure.main$repl invokeStatic main.clj 261]
 [clojure.main$repl doInvoke main.clj 177]
 [clojure.lang.RestFn invoke RestFn.java 1523]
 [nrepl.middleware.interruptible_eval$evaluate$fn__912 invoke 
interruptible_eval.clj 83]
 [clojure.lang.AFn applyToHelper AFn.java 152]
 [clojure.lang.AFn applyTo AFn.java 144]
 [clojure.core$apply invokeStatic core.clj 657]
 [clojure.core$with_bindings_STAR_ invokeStatic core.clj 1965]
 [clojure.core$with_bindings_STAR_ doInvoke core.clj 1965]
 [clojure.lang.RestFn invoke RestFn.java 425]
 [nrepl.middleware.interruptible_eval$evaluate invokeStatic 
interruptible_eval.clj 81]
 [nrepl.middleware.interruptible_eval$evaluate invoke interruptible_eval.clj 50]
 [nrepl.middleware.interruptible_eval$interruptible_eval$fn__955$fn__958 invoke 
interruptible_eval.clj 221]
 [nrepl.middleware.interruptible_eval$run_next$fn__950 invoke 
interruptible_eval.clj 189]
 [clojure.lang.AFn run AFn.java 22]
 [java.util.concurrent.ThreadPoolExecutor runWorker nil -1]
 [java.util.concurrent.ThreadPoolExecutor$Worker run nil -1]
 [java.lang.Thread run nil -1]]}
NullPointerException clojure.lang.Reflector.invokeNoArgInstanceMember 
(Reflector.java:301)
{code}
 

  was:
I'm seeing an exception thrown when instantiating a KafkaStreams object in a 
Clojure stream processor and am not sure that this is a config problem (would 
be great if it is and someone can point out what config is missing). 

Kafka deps:
{code:java}
[org.apache.kafka/kafka-streams "2.1.0"] ;; 
https://search.maven.org/artifact/org.apache.kafka/kafka-streams/2.1.0/jar 
[org.apache.kafka/kafka-streams-test-utils "2.1.0"] ;; 
https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils 
[org.apache.kafka/kafka-clients "2.1.0"] ;; 
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
[org.apache.kafka/kafka-clients "1.1.0" :classifier "test"]{code}
 
 Configuration:
{code:java}
;; kafka configuration
:calais-response-processor.config/kafka-configuration {
:applicationid "open-calais-tagging-microservice"
:bootstrap-servers "localhost:9092"
:commit-interval 10000 ;; milliseconds
:input-topic "db.draft.created"
:output-topic "streaming.calais.response"
}
{code}
 
 Fn:
{code:java}
(defn start->streams
[]
(log/info "[start->streams] enter")
(when (not (instance? KafkaStreams @streams))
(let [kafka-config (get-in (config CONFIG_PATH) 
[:calais-response-processor.config/kafka-configuration])
stream-processing-props {StreamsConfig/APPLICATION_ID_CONFIG (get-in 
kafka-config [:applicationid])
StreamsConfig/COMMIT_INTERVAL_MS_CONFIG (get-in kafka-config 
[:auto.commit.interval.ms])
StreamsConfig/BOOTSTRAP_SERVERS_CONFIG (get-in kafka-config 
[:bootstrap-servers])
StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (.getName (.getClass 
(Serdes/String)))
StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (.getName (.getClass 
(Serdes/String)))
StreamsConfig/PROCESSING_GUARANTEE_CONFIG StreamsConfig/EXACTLY_ONCE}]
(try 
(let [kafka-streams (KafkaStreams. (calais-processor-topology) (StreamsConfig. 
stream-processing-props))]
(log/infof "[start->streams] created kafka stream with config: %s" 
stream-processing-props)
(swap! streams kafka-streams))
(catch Exception e (log/error e)))))
(.start @streams))
{code}
Things seem to be "ok" if I don't catch the exception (i.e. it attempts to 
connect and process) but since it's throwing an exception I'm unable to control 
it's lifecycle right now unless I move this to a (def ..) vs a (defn..) .. 
however the exception still happens so I need that understood before putting it 
into production. 

 

Result:
 [\{:type java.lang.NullPointerException
 :message nil
 :at [org.apache.kafka.streams.processor.internals.StreamThread <init> 
StreamThread.java 719]}]
 :trace
 [[org.apache.kafka.streams.processor.internals.StreamThread <init> 
StreamThread.java 719]
 [org.apache.kafka.streams.processor.internals.StreamThread create 
StreamThread.java 671]
 

(full log of startup)
{code:java}
=> (start-stream-processing)
"[start-stream-processing] START: streams processing"
INFO calais-response-processor.core: [start->streams] enter
INFO calais-response-processor.config: loading config from resources/config.edn
INFO calais-response-processor.core: [calais-processor-topology] Open Calais 
API Streaming Topology
INFO org.apache.kafka.streams.StreamsConfig: StreamsConfig values:
 application.id = open-calais-tagging-microservice
 application.server =
 bootstrap.servers = [localhost:9092]
 buffered.records.per.partition = 1000
 cache.max.bytes.buffering = 10485760
 client.id =
 commit.interval.ms = null
 connections.max.idle.ms = 540000
 default.deserialization.exception.handler = class 
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
 default.key.serde = class 
org.apache.kafka.common.serialization.Serdes$StringSerde
 default.production.exception.handler = class 
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
 default.timestamp.extractor = class 
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
 default.value.serde = class 
org.apache.kafka.common.serialization.Serdes$StringSerde
 max.task.idle.ms = 0
 metadata.max.age.ms = 300000
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 num.standby.replicas = 0
 num.stream.threads = 1
 partition.grouper = class 
org.apache.kafka.streams.processor.DefaultPartitionGrouper
 poll.ms = 100
 processing.guarantee = exactly_once
 receive.buffer.bytes = 32768
 reconnect.backoff.max.ms = 1000
 reconnect.backoff.ms = 50
 replication.factor = 1
 request.timeout.ms = 40000
 retries = 0
 retry.backoff.ms = 100
 rocksdb.config.setter = null
 security.protocol = PLAINTEXT
 send.buffer.bytes = 131072
 state.cleanup.delay.ms = 600000
 state.dir = /tmp/kafka-streams
 topology.optimization = none
 upgrade.from = null
 windowstore.changelog.additional.retention.ms = 86400000
INFO org.apache.kafka.clients.admin.AdminClientConfig: AdminClientConfig values:
 bootstrap.servers = [localhost:9092]
 client.dns.lookup = default
 client.id = 
open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-admin
 connections.max.idle.ms = 300000
 metadata.max.age.ms = 300000
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 receive.buffer.bytes = 65536
 reconnect.backoff.max.ms = 1000
 reconnect.backoff.ms = 50
 request.timeout.ms = 120000
 retries = 5
 retry.backoff.ms = 100
 sasl.client.callback.handler.class = null
 sasl.jaas.config = null
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 sasl.kerberos.min.time.before.relogin = 60000
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 sasl.kerberos.ticket.renew.window.factor = 0.8
 sasl.login.callback.handler.class = null
 sasl.login.class = null
 sasl.login.refresh.buffer.seconds = 300
 sasl.login.refresh.min.period.seconds = 60
 sasl.login.refresh.window.factor = 0.8
 sasl.login.refresh.window.jitter = 0.05
 sasl.mechanism = GSSAPI
 security.protocol = PLAINTEXT
 send.buffer.bytes = 131072
 ssl.cipher.suites = null
 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 ssl.endpoint.identification.algorithm = https
 ssl.key.password = null
 ssl.keymanager.algorithm = SunX509
 ssl.keystore.location = null
 ssl.keystore.password = null
 ssl.keystore.type = JKS
 ssl.protocol = TLS
 ssl.provider = null
 ssl.secure.random.implementation = null
 ssl.trustmanager.algorithm = PKIX
 ssl.truststore.location = null
 ssl.truststore.password = null
 ssl.truststore.type = JKS
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 
eec43959745f444f
INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
[open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1]
 Creating restore consumer client
INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
 auto.commit.interval.ms = 5000
 auto.offset.reset = none
 bootstrap.servers = [localhost:9092]
 check.crcs = true
 client.dns.lookup = default
 client.id = 
open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-restore-consumer
 connections.max.idle.ms = 540000
 default.api.timeout.ms = 60000
 enable.auto.commit = false
 exclude.internal.topics = true
 fetch.max.bytes = 52428800
 fetch.max.wait.ms = 500
 fetch.min.bytes = 1
 group.id =
 heartbeat.interval.ms = 3000
 interceptor.classes = []
 internal.leave.group.on.close = false
 isolation.level = read_committed
 key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
 max.partition.fetch.bytes = 1048576
 max.poll.interval.ms = 2147483647
 max.poll.records = 1000
 metadata.max.age.ms = 300000
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
 receive.buffer.bytes = 65536
 reconnect.backoff.max.ms = 1000
 reconnect.backoff.ms = 50
 request.timeout.ms = 30000
 retry.backoff.ms = 100
 sasl.client.callback.handler.class = null
 sasl.jaas.config = null
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 sasl.kerberos.min.time.before.relogin = 60000
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 sasl.kerberos.ticket.renew.window.factor = 0.8
 sasl.login.callback.handler.class = null
 sasl.login.class = null
 sasl.login.refresh.buffer.seconds = 300
 sasl.login.refresh.min.period.seconds = 60
 sasl.login.refresh.window.factor = 0.8
 sasl.login.refresh.window.jitter = 0.05
 sasl.mechanism = GSSAPI
 security.protocol = PLAINTEXT
 send.buffer.bytes = 131072
 session.timeout.ms = 10000
 ssl.cipher.suites = null
 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 ssl.endpoint.identification.algorithm = https
 ssl.key.password = null
 ssl.keymanager.algorithm = SunX509
 ssl.keystore.location = null
 ssl.keystore.password = null
 ssl.keystore.type = JKS
 ssl.protocol = TLS
 ssl.provider = null
 ssl.secure.random.implementation = null
 ssl.trustmanager.algorithm = PKIX
 ssl.truststore.location = null
 ssl.truststore.password = null
 ssl.truststore.type = JKS
 value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 
eec43959745f444f
INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
[open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1]
 Creating consumer client
INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
 auto.commit.interval.ms = 5000
 auto.offset.reset = earliest
 bootstrap.servers = [localhost:9092]
 check.crcs = true
 client.dns.lookup = default
 client.id = 
open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-consumer
 connections.max.idle.ms = 540000
 default.api.timeout.ms = 60000
 enable.auto.commit = false
 exclude.internal.topics = true
 fetch.max.bytes = 52428800
 fetch.max.wait.ms = 500
 fetch.min.bytes = 1
 group.id = open-calais-tagging-microservice
 heartbeat.interval.ms = 3000
 interceptor.classes = []
 internal.leave.group.on.close = false
 isolation.level = read_committed
 key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
 max.partition.fetch.bytes = 1048576
 max.poll.interval.ms = 2147483647
 max.poll.records = 1000
 metadata.max.age.ms = 300000
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 partition.assignment.strategy = 
[org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
 receive.buffer.bytes = 65536
 reconnect.backoff.max.ms = 1000
 reconnect.backoff.ms = 50
 request.timeout.ms = 30000
 retry.backoff.ms = 100
 sasl.client.callback.handler.class = null
 sasl.jaas.config = null
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 sasl.kerberos.min.time.before.relogin = 60000
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 sasl.kerberos.ticket.renew.window.factor = 0.8
 sasl.login.callback.handler.class = null
 sasl.login.class = null
 sasl.login.refresh.buffer.seconds = 300
 sasl.login.refresh.min.period.seconds = 60
 sasl.login.refresh.window.factor = 0.8
 sasl.login.refresh.window.jitter = 0.05
 sasl.mechanism = GSSAPI
 security.protocol = PLAINTEXT
 send.buffer.bytes = 131072
 session.timeout.ms = 10000
 ssl.cipher.suites = null
 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 ssl.endpoint.identification.algorithm = https
 ssl.key.password = null
 ssl.keymanager.algorithm = SunX509
 ssl.keystore.location = null
 ssl.keystore.password = null
 ssl.keystore.type = JKS
 ssl.protocol = TLS
 ssl.provider = null
 ssl.secure.random.implementation = null
 ssl.trustmanager.algorithm = PKIX
 ssl.truststore.location = null
 ssl.truststore.password = null
 ssl.truststore.type = JKS
 value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
WARN org.apache.kafka.clients.consumer.ConsumerConfig: The configuration 
'admin.retries' was supplied but isn't a known config.
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 
eec43959745f444f
ERROR calais-response-processor.core: #error {
 :cause nil
 :via
 [{:type java.lang.NullPointerException
 :message nil
 :at [org.apache.kafka.streams.processor.internals.StreamThread <init> 
StreamThread.java 719]}]
 :trace
 [[org.apache.kafka.streams.processor.internals.StreamThread <init> 
StreamThread.java 719]
 [org.apache.kafka.streams.processor.internals.StreamThread create 
StreamThread.java 671]
 [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 706]
 [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 624]
 [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 608]
 [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 598]
 [sun.reflect.NativeConstructorAccessorImpl newInstance0 nil -2]
 [sun.reflect.NativeConstructorAccessorImpl newInstance nil -1]
 [sun.reflect.DelegatingConstructorAccessorImpl newInstance nil -1]
 [java.lang.reflect.Constructor newInstance nil -1]
 [clojure.lang.Reflector invokeConstructor Reflector.java 180]
 [calais_response_processor.core$start__GT_streams$fn__258 invoke core.clj 148]
 [calais_response_processor.core$start__GT_streams invokeStatic core.clj 147]
 [calais_response_processor.core$start__GT_streams invoke core.clj 136]
 [calais_response_processor.core$start_stream_processing invokeStatic core.clj 
165]
 [calais_response_processor.core$start_stream_processing invoke core.clj 162]
 [calais_response_processor.core$eval1539 invokeStatic 
form-init8971608817606635487.clj 1]
 [calais_response_processor.core$eval1539 invoke 
form-init8971608817606635487.clj 1]
 [clojure.lang.Compiler eval Compiler.java 7062]
 [clojure.lang.Compiler eval Compiler.java 7025]
 [clojure.core$eval invokeStatic core.clj 3206]
 [clojure.core$eval invoke core.clj 3202]
 [clojure.main$repl$read_eval_print__8572$fn__8575 invoke main.clj 243]
 [clojure.main$repl$read_eval_print__8572 invoke main.clj 243]
 [clojure.main$repl$fn__8581 invoke main.clj 261]
 [clojure.main$repl invokeStatic main.clj 261]
 [clojure.main$repl doInvoke main.clj 177]
 [clojure.lang.RestFn invoke RestFn.java 1523]
 [nrepl.middleware.interruptible_eval$evaluate$fn__912 invoke 
interruptible_eval.clj 83]
 [clojure.lang.AFn applyToHelper AFn.java 152]
 [clojure.lang.AFn applyTo AFn.java 144]
 [clojure.core$apply invokeStatic core.clj 657]
 [clojure.core$with_bindings_STAR_ invokeStatic core.clj 1965]
 [clojure.core$with_bindings_STAR_ doInvoke core.clj 1965]
 [clojure.lang.RestFn invoke RestFn.java 425]
 [nrepl.middleware.interruptible_eval$evaluate invokeStatic 
interruptible_eval.clj 81]
 [nrepl.middleware.interruptible_eval$evaluate invoke interruptible_eval.clj 50]
 [nrepl.middleware.interruptible_eval$interruptible_eval$fn__955$fn__958 invoke 
interruptible_eval.clj 221]
 [nrepl.middleware.interruptible_eval$run_next$fn__950 invoke 
interruptible_eval.clj 189]
 [clojure.lang.AFn run AFn.java 22]
 [java.util.concurrent.ThreadPoolExecutor runWorker nil -1]
 [java.util.concurrent.ThreadPoolExecutor$Worker run nil -1]
 [java.lang.Thread run nil -1]]}
NullPointerException clojure.lang.Reflector.invokeNoArgInstanceMember 
(Reflector.java:301)
{code}
 


> NPE thrown while instantiating a KafkaStreams. object
> -----------------------------------------------------
>
>                 Key: KAFKA-7874
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7874
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.0
>         Environment: Windows 10 Pro bld 1803
> Ryzen 7 2700 32GM ram
> Clojure 1.10
>            Reporter: Christopher S Lester
>            Priority: Major
>
> I'm seeing an exception thrown when instantiating a KafkaStreams object in a 
> Clojure stream processor and am not sure that this is a config problem (would 
> be great if it is and someone can point out what config is missing). 
> Kafka deps:
> {code:java}
> [org.apache.kafka/kafka-streams "2.1.0"] ;; 
> https://search.maven.org/artifact/org.apache.kafka/kafka-streams/2.1.0/jar 
> [org.apache.kafka/kafka-streams-test-utils "2.1.0"] ;; 
> https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils 
> [org.apache.kafka/kafka-clients "2.1.0"] ;; 
> https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
> [org.apache.kafka/kafka-clients "1.1.0" :classifier "test"]{code}
>  
>  Configuration:
> {code:java}
> ;; kafka configuration
> :calais-response-processor.config/kafka-configuration {
> :applicationid "open-calais-tagging-microservice"
> :bootstrap-servers "localhost:9092"
> :commit-interval 10000 ;; milliseconds
> :input-topic "db.draft.created"
> :output-topic "streaming.calais.response"
> }
> {code}
>  
>  Fn:
> {code:java}
> (defn start->streams
> []
> (log/info "[start->streams] enter")
> (when (not (instance? KafkaStreams @streams))
> (let [kafka-config (get-in (config CONFIG_PATH) 
> [:calais-response-processor.config/kafka-configuration])
> stream-processing-props {StreamsConfig/APPLICATION_ID_CONFIG (get-in 
> kafka-config [:applicationid])
> StreamsConfig/COMMIT_INTERVAL_MS_CONFIG (get-in kafka-config 
> [:auto.commit.interval.ms])
> StreamsConfig/BOOTSTRAP_SERVERS_CONFIG (get-in kafka-config 
> [:bootstrap-servers])
> StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (.getName (.getClass 
> (Serdes/String)))
> StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (.getName (.getClass 
> (Serdes/String)))
> StreamsConfig/PROCESSING_GUARANTEE_CONFIG StreamsConfig/EXACTLY_ONCE}]
> (try 
> (let [kafka-streams (KafkaStreams. (calais-processor-topology) 
> (StreamsConfig. stream-processing-props))]
> (log/infof "[start->streams] created kafka stream with config: %s" 
> stream-processing-props)
> (swap! streams kafka-streams))
> (catch Exception e (log/error e)))))
> (.start @streams))
> {code}
> Things seem to be "ok" if I don't catch the exception (i.e. it attempts to 
> connect and process) but since it's throwing an exception I'm unable to 
> control it's lifecycle right now unless I move this to a (def ..) vs a 
> (defn..) .. however the exception still happens so I need that understood 
> before putting it into production. 
>  
> Result:
> {code:java}
> [{:type java.lang.NullPointerException
>  :message nil
>  :at [org.apache.kafka.streams.processor.internals.StreamThread <init> 
> StreamThread.java 719]}]
>  :trace
>  [[org.apache.kafka.streams.processor.internals.StreamThread <init> 
> StreamThread.java 719]
>  [org.apache.kafka.streams.processor.internals.StreamThread create 
> StreamThread.java 671]{code}
>  
> (full log of startup)
> {code:java}
> => (start-stream-processing)
> "[start-stream-processing] START: streams processing"
> INFO calais-response-processor.core: [start->streams] enter
> INFO calais-response-processor.config: loading config from 
> resources/config.edn
> INFO calais-response-processor.core: [calais-processor-topology] Open Calais 
> API Streaming Topology
> INFO org.apache.kafka.streams.StreamsConfig: StreamsConfig values:
>  application.id = open-calais-tagging-microservice
>  application.server =
>  bootstrap.servers = [localhost:9092]
>  buffered.records.per.partition = 1000
>  cache.max.bytes.buffering = 10485760
>  client.id =
>  commit.interval.ms = null
>  connections.max.idle.ms = 540000
>  default.deserialization.exception.handler = class 
> org.apache.kafka.streams.errors.LogAndFailExceptionHandler
>  default.key.serde = class 
> org.apache.kafka.common.serialization.Serdes$StringSerde
>  default.production.exception.handler = class 
> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
>  default.timestamp.extractor = class 
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp
>  default.value.serde = class 
> org.apache.kafka.common.serialization.Serdes$StringSerde
>  max.task.idle.ms = 0
>  metadata.max.age.ms = 300000
>  metric.reporters = []
>  metrics.num.samples = 2
>  metrics.recording.level = INFO
>  metrics.sample.window.ms = 30000
>  num.standby.replicas = 0
>  num.stream.threads = 1
>  partition.grouper = class 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
>  poll.ms = 100
>  processing.guarantee = exactly_once
>  receive.buffer.bytes = 32768
>  reconnect.backoff.max.ms = 1000
>  reconnect.backoff.ms = 50
>  replication.factor = 1
>  request.timeout.ms = 40000
>  retries = 0
>  retry.backoff.ms = 100
>  rocksdb.config.setter = null
>  security.protocol = PLAINTEXT
>  send.buffer.bytes = 131072
>  state.cleanup.delay.ms = 600000
>  state.dir = /tmp/kafka-streams
>  topology.optimization = none
>  upgrade.from = null
>  windowstore.changelog.additional.retention.ms = 86400000
> INFO org.apache.kafka.clients.admin.AdminClientConfig: AdminClientConfig 
> values:
>  bootstrap.servers = [localhost:9092]
>  client.dns.lookup = default
>  client.id = 
> open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-admin
>  connections.max.idle.ms = 300000
>  metadata.max.age.ms = 300000
>  metric.reporters = []
>  metrics.num.samples = 2
>  metrics.recording.level = INFO
>  metrics.sample.window.ms = 30000
>  receive.buffer.bytes = 65536
>  reconnect.backoff.max.ms = 1000
>  reconnect.backoff.ms = 50
>  request.timeout.ms = 120000
>  retries = 5
>  retry.backoff.ms = 100
>  sasl.client.callback.handler.class = null
>  sasl.jaas.config = null
>  sasl.kerberos.kinit.cmd = /usr/bin/kinit
>  sasl.kerberos.min.time.before.relogin = 60000
>  sasl.kerberos.service.name = null
>  sasl.kerberos.ticket.renew.jitter = 0.05
>  sasl.kerberos.ticket.renew.window.factor = 0.8
>  sasl.login.callback.handler.class = null
>  sasl.login.class = null
>  sasl.login.refresh.buffer.seconds = 300
>  sasl.login.refresh.min.period.seconds = 60
>  sasl.login.refresh.window.factor = 0.8
>  sasl.login.refresh.window.jitter = 0.05
>  sasl.mechanism = GSSAPI
>  security.protocol = PLAINTEXT
>  send.buffer.bytes = 131072
>  ssl.cipher.suites = null
>  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>  ssl.endpoint.identification.algorithm = https
>  ssl.key.password = null
>  ssl.keymanager.algorithm = SunX509
>  ssl.keystore.location = null
>  ssl.keystore.password = null
>  ssl.keystore.type = JKS
>  ssl.protocol = TLS
>  ssl.provider = null
>  ssl.secure.random.implementation = null
>  ssl.trustmanager.algorithm = PKIX
>  ssl.truststore.location = null
>  ssl.truststore.password = null
>  ssl.truststore.type = JKS
> INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
> INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 
> eec43959745f444f
> INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
> [open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1]
>  Creating restore consumer client
> INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
>  auto.commit.interval.ms = 5000
>  auto.offset.reset = none
>  bootstrap.servers = [localhost:9092]
>  check.crcs = true
>  client.dns.lookup = default
>  client.id = 
> open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-restore-consumer
>  connections.max.idle.ms = 540000
>  default.api.timeout.ms = 60000
>  enable.auto.commit = false
>  exclude.internal.topics = true
>  fetch.max.bytes = 52428800
>  fetch.max.wait.ms = 500
>  fetch.min.bytes = 1
>  group.id =
>  heartbeat.interval.ms = 3000
>  interceptor.classes = []
>  internal.leave.group.on.close = false
>  isolation.level = read_committed
>  key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>  max.partition.fetch.bytes = 1048576
>  max.poll.interval.ms = 2147483647
>  max.poll.records = 1000
>  metadata.max.age.ms = 300000
>  metric.reporters = []
>  metrics.num.samples = 2
>  metrics.recording.level = INFO
>  metrics.sample.window.ms = 30000
>  partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>  receive.buffer.bytes = 65536
>  reconnect.backoff.max.ms = 1000
>  reconnect.backoff.ms = 50
>  request.timeout.ms = 30000
>  retry.backoff.ms = 100
>  sasl.client.callback.handler.class = null
>  sasl.jaas.config = null
>  sasl.kerberos.kinit.cmd = /usr/bin/kinit
>  sasl.kerberos.min.time.before.relogin = 60000
>  sasl.kerberos.service.name = null
>  sasl.kerberos.ticket.renew.jitter = 0.05
>  sasl.kerberos.ticket.renew.window.factor = 0.8
>  sasl.login.callback.handler.class = null
>  sasl.login.class = null
>  sasl.login.refresh.buffer.seconds = 300
>  sasl.login.refresh.min.period.seconds = 60
>  sasl.login.refresh.window.factor = 0.8
>  sasl.login.refresh.window.jitter = 0.05
>  sasl.mechanism = GSSAPI
>  security.protocol = PLAINTEXT
>  send.buffer.bytes = 131072
>  session.timeout.ms = 10000
>  ssl.cipher.suites = null
>  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>  ssl.endpoint.identification.algorithm = https
>  ssl.key.password = null
>  ssl.keymanager.algorithm = SunX509
>  ssl.keystore.location = null
>  ssl.keystore.password = null
>  ssl.keystore.type = JKS
>  ssl.protocol = TLS
>  ssl.provider = null
>  ssl.secure.random.implementation = null
>  ssl.trustmanager.algorithm = PKIX
>  ssl.truststore.location = null
>  ssl.truststore.password = null
>  ssl.truststore.type = JKS
>  value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
> INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 
> eec43959745f444f
> INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
> [open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1]
>  Creating consumer client
> INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
>  auto.commit.interval.ms = 5000
>  auto.offset.reset = earliest
>  bootstrap.servers = [localhost:9092]
>  check.crcs = true
>  client.dns.lookup = default
>  client.id = 
> open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-consumer
>  connections.max.idle.ms = 540000
>  default.api.timeout.ms = 60000
>  enable.auto.commit = false
>  exclude.internal.topics = true
>  fetch.max.bytes = 52428800
>  fetch.max.wait.ms = 500
>  fetch.min.bytes = 1
>  group.id = open-calais-tagging-microservice
>  heartbeat.interval.ms = 3000
>  interceptor.classes = []
>  internal.leave.group.on.close = false
>  isolation.level = read_committed
>  key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>  max.partition.fetch.bytes = 1048576
>  max.poll.interval.ms = 2147483647
>  max.poll.records = 1000
>  metadata.max.age.ms = 300000
>  metric.reporters = []
>  metrics.num.samples = 2
>  metrics.recording.level = INFO
>  metrics.sample.window.ms = 30000
>  partition.assignment.strategy = 
> [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
>  receive.buffer.bytes = 65536
>  reconnect.backoff.max.ms = 1000
>  reconnect.backoff.ms = 50
>  request.timeout.ms = 30000
>  retry.backoff.ms = 100
>  sasl.client.callback.handler.class = null
>  sasl.jaas.config = null
>  sasl.kerberos.kinit.cmd = /usr/bin/kinit
>  sasl.kerberos.min.time.before.relogin = 60000
>  sasl.kerberos.service.name = null
>  sasl.kerberos.ticket.renew.jitter = 0.05
>  sasl.kerberos.ticket.renew.window.factor = 0.8
>  sasl.login.callback.handler.class = null
>  sasl.login.class = null
>  sasl.login.refresh.buffer.seconds = 300
>  sasl.login.refresh.min.period.seconds = 60
>  sasl.login.refresh.window.factor = 0.8
>  sasl.login.refresh.window.jitter = 0.05
>  sasl.mechanism = GSSAPI
>  security.protocol = PLAINTEXT
>  send.buffer.bytes = 131072
>  session.timeout.ms = 10000
>  ssl.cipher.suites = null
>  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>  ssl.endpoint.identification.algorithm = https
>  ssl.key.password = null
>  ssl.keymanager.algorithm = SunX509
>  ssl.keystore.location = null
>  ssl.keystore.password = null
>  ssl.keystore.type = JKS
>  ssl.protocol = TLS
>  ssl.provider = null
>  ssl.secure.random.implementation = null
>  ssl.trustmanager.algorithm = PKIX
>  ssl.truststore.location = null
>  ssl.truststore.password = null
>  ssl.truststore.type = JKS
>  value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> WARN org.apache.kafka.clients.consumer.ConsumerConfig: The configuration 
> 'admin.retries' was supplied but isn't a known config.
> INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0
> INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 
> eec43959745f444f
> ERROR calais-response-processor.core: #error {
>  :cause nil
>  :via
>  [{:type java.lang.NullPointerException
>  :message nil
>  :at [org.apache.kafka.streams.processor.internals.StreamThread <init> 
> StreamThread.java 719]}]
>  :trace
>  [[org.apache.kafka.streams.processor.internals.StreamThread <init> 
> StreamThread.java 719]
>  [org.apache.kafka.streams.processor.internals.StreamThread create 
> StreamThread.java 671]
>  [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 706]
>  [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 624]
>  [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 608]
>  [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 598]
>  [sun.reflect.NativeConstructorAccessorImpl newInstance0 nil -2]
>  [sun.reflect.NativeConstructorAccessorImpl newInstance nil -1]
>  [sun.reflect.DelegatingConstructorAccessorImpl newInstance nil -1]
>  [java.lang.reflect.Constructor newInstance nil -1]
>  [clojure.lang.Reflector invokeConstructor Reflector.java 180]
>  [calais_response_processor.core$start__GT_streams$fn__258 invoke core.clj 
> 148]
>  [calais_response_processor.core$start__GT_streams invokeStatic core.clj 147]
>  [calais_response_processor.core$start__GT_streams invoke core.clj 136]
>  [calais_response_processor.core$start_stream_processing invokeStatic 
> core.clj 165]
>  [calais_response_processor.core$start_stream_processing invoke core.clj 162]
>  [calais_response_processor.core$eval1539 invokeStatic 
> form-init8971608817606635487.clj 1]
>  [calais_response_processor.core$eval1539 invoke 
> form-init8971608817606635487.clj 1]
>  [clojure.lang.Compiler eval Compiler.java 7062]
>  [clojure.lang.Compiler eval Compiler.java 7025]
>  [clojure.core$eval invokeStatic core.clj 3206]
>  [clojure.core$eval invoke core.clj 3202]
>  [clojure.main$repl$read_eval_print__8572$fn__8575 invoke main.clj 243]
>  [clojure.main$repl$read_eval_print__8572 invoke main.clj 243]
>  [clojure.main$repl$fn__8581 invoke main.clj 261]
>  [clojure.main$repl invokeStatic main.clj 261]
>  [clojure.main$repl doInvoke main.clj 177]
>  [clojure.lang.RestFn invoke RestFn.java 1523]
>  [nrepl.middleware.interruptible_eval$evaluate$fn__912 invoke 
> interruptible_eval.clj 83]
>  [clojure.lang.AFn applyToHelper AFn.java 152]
>  [clojure.lang.AFn applyTo AFn.java 144]
>  [clojure.core$apply invokeStatic core.clj 657]
>  [clojure.core$with_bindings_STAR_ invokeStatic core.clj 1965]
>  [clojure.core$with_bindings_STAR_ doInvoke core.clj 1965]
>  [clojure.lang.RestFn invoke RestFn.java 425]
>  [nrepl.middleware.interruptible_eval$evaluate invokeStatic 
> interruptible_eval.clj 81]
>  [nrepl.middleware.interruptible_eval$evaluate invoke interruptible_eval.clj 
> 50]
>  [nrepl.middleware.interruptible_eval$interruptible_eval$fn__955$fn__958 
> invoke interruptible_eval.clj 221]
>  [nrepl.middleware.interruptible_eval$run_next$fn__950 invoke 
> interruptible_eval.clj 189]
>  [clojure.lang.AFn run AFn.java 22]
>  [java.util.concurrent.ThreadPoolExecutor runWorker nil -1]
>  [java.util.concurrent.ThreadPoolExecutor$Worker run nil -1]
>  [java.lang.Thread run nil -1]]}
> NullPointerException clojure.lang.Reflector.invokeNoArgInstanceMember 
> (Reflector.java:301)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to