Hi,


 I am getting the below error while processign data with kafka stream. The
application was runnign for a couple of hours and the '
WatchlistUpdate-StreamThread-9 ' thread was assigned to the same partition
since beginning. I am assuming it was able to successfully commit offsets
for those couple of hours and the directory '
/opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/UI-Watchlist-ES-App/0_2
' did exist for that period.

 And then I start getting the below error after every 30 secs (probably
because if offset commit interval)  and messages are being missed from
processing.

Can you please help?


2018-06-15 08:47:58 [WatchlistUpdate-StreamThread-9] WARN
o.a.k.s.p.i.ProcessorStateManager:246
- task [0_2] Failed

to write checkpoint file to
/opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/UI-Watchlist-ES-App/0_2/.che

ckpoint:

java.io.FileNotFoundException:
/opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/UI-Watchlist-ES-App/0_2/.

checkpoint.tmp (No such file or directory)

        at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_141]

        at java.io.FileOutputStream.open(FileOutputStream.java:270)
~[na:1.8.0_141]

        at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
~[na:1.8.0_141]

        at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
~[na:1.8.0_141]

        at
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
~[kafka-streams-

1.0.0.jar:na]

        at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:3

20) ~[kafka-streams-1.0.0.jar:na]

        at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:306)
[kafka-streams-1.0.0.ja

r:na]

        at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:2

08) [kafka-streams-1.0.0.jar:na]

        at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
[kafka-streams-1.0.0.j

ar:na]

        at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
[kafka-streams-1.0.0.j

ar:na]

        at
org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
[kafka-streams-1

.0.0.jar:na]

        at
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
[ka

fka-streams-1.0.0.jar:na]

        at
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
[kafka-streams-1

.0.0.jar:na]

        at
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
[kafka-streams-1.

0.0.jar:na]

        at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
[kafka-strea

ms-1.0.0.jar:na]

        at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
[kafka-streams-1.

0.0.jar:na]

        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
[kafka-streams-1.

0.0.jar:na]

        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
[kafka-streams-1.0.0.

jar:na]


Stream config:

2018-06-15 08:09:28 [main] INFO  o.a.k.c.consumer.ConsumerConfig:223 -
ConsumerConfig values:

        auto.commit.interval.ms = 5000

        auto.offset.reset = earliest

        bootstrap.servers = [XYZ]

        check.crcs = true

        client.id = WatchlistUpdate-StreamThread-9-consumer

        connections.max.idle.ms = 540000

        enable.auto.commit = false

        exclude.internal.topics = true

        fetch.max.bytes = 52428800

        fetch.max.wait.ms = 500

        fetch.min.bytes = 1

        group.id = UI-Watchlist-ES-App

        heartbeat.interval.ms = 3000

        interceptor.classes = null

        internal.leave.group.on.close = false

        isolation.level = read_uncommitted

        key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer

        max.partition.fetch.bytes = 1048576

        max.poll.interval.ms = 2147483647

        max.poll.records = 1000

        metadata.max.age.ms = 300000

        metric.reporters = []

        metrics.num.samples = 2

        metrics.recording.level = INFO

        metrics.sample.window.ms = 30000

        partition.assignment.strategy =
[org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]

        receive.buffer.bytes = 65536

        reconnect.backoff.max.ms = 1000

        reconnect.backoff.ms = 50

        request.timeout.ms = 305000

        retry.backoff.ms = 100

        sasl.jaas.config = null

        sasl.kerberos.kinit.cmd = /usr/bin/kinit

        sasl.kerberos.min.time.before.relogin = 60000

        sasl.kerberos.service.name = null

        sasl.kerberos.ticket.renew.jitter = 0.05

        sasl.kerberos.ticket.renew.window.factor = 0.8

        sasl.mechanism = GSSAPI

        security.protocol = PLAINTEXT

        send.buffer.bytes = 131072

        session.timeout.ms = 10000

        ssl.cipher.suites = null

        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

        ssl.endpoint.identification.algorithm = null

        ssl.key.password = null

        ssl.keymanager.algorithm = SunX509

        ssl.keystore.location = null

        ssl.keystore.password = null

        ssl.keystore.type = JKS

        ssl.protocol = TLS

        ssl.provider = null

        ssl.secure.random.implementation = null

        ssl.trustmanager.algorithm = PKIX

        ssl.truststore.location = null

        ssl.truststore.password = null

        ssl.truststore.type = JKS

        value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer

Regards,
Amandeep Singh

Reply via email to