Hi  Guozhang,

The file system is XFS and the folder is not a temp folder. The issue goes
away when I restart the streams. I forgot to mention i am running 3
multiple instances of consumer on 3 machines.
Also, this issue seems to be reported by other users too:
https://issues.apache.org/jira/browse/KAFKA-5998



Regards,
Amandeep Singh
+91-7838184964


On Mon, Jun 18, 2018 at 6:45 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Amandeep,
>
> What file system are you using? Also is `/opt/info` a temp folder that can
> be auto-cleared from time to time?
>
>
> Guozhang
>
> On Fri, Jun 15, 2018 at 6:39 AM, Amandeep Singh <amandee...@gmail.com>
> wrote:
>
> > Hi,
> >
> >
> >
> >  I am getting the below error while processign data with kafka stream.
> The
> > application was runnign for a couple of hours and the '
> > WatchlistUpdate-StreamThread-9 ' thread was assigned to the same
> partition
> > since beginning. I am assuming it was able to successfully commit offsets
> > for those couple of hours and the directory '
> > /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> > UI-Watchlist-ES-App/0_2
> > ' did exist for that period.
> >
> >  And then I start getting the below error after every 30 secs (probably
> > because if offset commit interval)  and messages are being missed from
> > processing.
> >
> > Can you please help?
> >
> >
> > 2018-06-15 08:47:58 [WatchlistUpdate-StreamThread-9] WARN
> > o.a.k.s.p.i.ProcessorStateManager:246
> > - task [0_2] Failed
> >
> > to write checkpoint file to
> > /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> > UI-Watchlist-ES-App/0_2/.che
> >
> > ckpoint:
> >
> > java.io.FileNotFoundException:
> > /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> > UI-Watchlist-ES-App/0_2/.
> >
> > checkpoint.tmp (No such file or directory)
> >
> >         at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_141]
> >
> >         at java.io.FileOutputStream.open(FileOutputStream.java:270)
> > ~[na:1.8.0_141]
> >
> >         at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> > ~[na:1.8.0_141]
> >
> >         at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> > ~[na:1.8.0_141]
> >
> >         at
> > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> > OffsetCheckpoint.java:73)
> > ~[kafka-streams-
> >
> > 1.0.0.jar:na]
> >
> >         at
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > checkpoint(ProcessorStateManager.java:3
> >
> > 20) ~[kafka-streams-1.0.0.jar:na]
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.
> > java:306)
> > [kafka-streams-1.0.0.ja
> >
> > r:na]
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:2
> >
> > 08) [kafka-streams-1.0.0.jar:na]
> >
> >         at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> > java:299)
> > [kafka-streams-1.0.0.j
> >
> > ar:na]
> >
> >         at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> > java:289)
> > [kafka-streams-1.0.0.j
> >
> > ar:na]
> >
> >         at
> > org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(
> > AssignedTasks.java:87)
> > [kafka-streams-1
> >
> > .0.0.jar:na]
> >
> >         at
> > org.apache.kafka.streams.processor.internals.AssignedTasks.
> > applyToRunningTasks(AssignedTasks.java:451)
> > [ka
> >
> > fka-streams-1.0.0.jar:na]
> >
> >         at
> > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(
> > AssignedTasks.java:380)
> > [kafka-streams-1
> >
> > .0.0.jar:na]
> >
> >         at
> > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(
> > TaskManager.java:309)
> > [kafka-streams-1.
> >
> > 0.0.jar:na]
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> > StreamThread.java:1018)
> > [kafka-strea
> >
> > ms-1.0.0.jar:na]
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > StreamThread.java:835)
> > [kafka-streams-1.
> >
> > 0.0.jar:na]
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:774)
> > [kafka-streams-1.
> >
> > 0.0.jar:na]
> >
> >         at
> > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:744)
> > [kafka-streams-1.0.0.
> >
> > jar:na]
> >
> >
> > Stream config:
> >
> > 2018-06-15 08:09:28 [main] INFO  o.a.k.c.consumer.ConsumerConfig:223 -
> > ConsumerConfig values:
> >
> >         auto.commit.interval.ms = 5000
> >
> >         auto.offset.reset = earliest
> >
> >         bootstrap.servers = [XYZ]
> >
> >         check.crcs = true
> >
> >         client.id = WatchlistUpdate-StreamThread-9-consumer
> >
> >         connections.max.idle.ms = 540000
> >
> >         enable.auto.commit = false
> >
> >         exclude.internal.topics = true
> >
> >         fetch.max.bytes = 52428800
> >
> >         fetch.max.wait.ms = 500
> >
> >         fetch.min.bytes = 1
> >
> >         group.id = UI-Watchlist-ES-App
> >
> >         heartbeat.interval.ms = 3000
> >
> >         interceptor.classes = null
> >
> >         internal.leave.group.on.close = false
> >
> >         isolation.level = read_uncommitted
> >
> >         key.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
> >
> >         max.partition.fetch.bytes = 1048576
> >
> >         max.poll.interval.ms = 2147483647
> >
> >         max.poll.records = 1000
> >
> >         metadata.max.age.ms = 300000
> >
> >         metric.reporters = []
> >
> >         metrics.num.samples = 2
> >
> >         metrics.recording.level = INFO
> >
> >         metrics.sample.window.ms = 30000
> >
> >         partition.assignment.strategy =
> > [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
> >
> >         receive.buffer.bytes = 65536
> >
> >         reconnect.backoff.max.ms = 1000
> >
> >         reconnect.backoff.ms = 50
> >
> >         request.timeout.ms = 305000
> >
> >         retry.backoff.ms = 100
> >
> >         sasl.jaas.config = null
> >
> >         sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >
> >         sasl.kerberos.min.time.before.relogin = 60000
> >
> >         sasl.kerberos.service.name = null
> >
> >         sasl.kerberos.ticket.renew.jitter = 0.05
> >
> >         sasl.kerberos.ticket.renew.window.factor = 0.8
> >
> >         sasl.mechanism = GSSAPI
> >
> >         security.protocol = PLAINTEXT
> >
> >         send.buffer.bytes = 131072
> >
> >         session.timeout.ms = 10000
> >
> >         ssl.cipher.suites = null
> >
> >         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >
> >         ssl.endpoint.identification.algorithm = null
> >
> >         ssl.key.password = null
> >
> >         ssl.keymanager.algorithm = SunX509
> >
> >         ssl.keystore.location = null
> >
> >         ssl.keystore.password = null
> >
> >         ssl.keystore.type = JKS
> >
> >         ssl.protocol = TLS
> >
> >         ssl.provider = null
> >
> >         ssl.secure.random.implementation = null
> >
> >         ssl.trustmanager.algorithm = PKIX
> >
> >         ssl.truststore.location = null
> >
> >         ssl.truststore.password = null
> >
> >         ssl.truststore.type = JKS
> >
> >         value.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
> >
> > Regards,
> > Amandeep Singh
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to