Interesting, it indeed seem like a lurking issue in Kafka Streams. Which Kafka version are you using?
Guozhang On Mon, Jun 18, 2018 at 12:32 AM, Amandeep Singh <amandee...@gmail.com> wrote: > Hi Guozhang, > > The file system is XFS and the folder is not a temp folder. The issue goes > away when I restart the streams. I forgot to mention i am running 3 > multiple instances of consumer on 3 machines. > Also, this issue seems to be reported by other users too: > https://issues.apache.org/jira/browse/KAFKA-5998 > > > > Regards, > Amandeep Singh > +91-7838184964 > > > On Mon, Jun 18, 2018 at 6:45 AM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Amandeep, > > > > What file system are you using? Also is `/opt/info` a temp folder that > can > > be auto-cleared from time to time? > > > > > > Guozhang > > > > On Fri, Jun 15, 2018 at 6:39 AM, Amandeep Singh <amandee...@gmail.com> > > wrote: > > > > > Hi, > > > > > > > > > > > > I am getting the below error while processign data with kafka stream. > > The > > > application was runnign for a couple of hours and the ' > > > WatchlistUpdate-StreamThread-9 ' thread was assigned to the same > > partition > > > since beginning. I am assuming it was able to successfully commit > offsets > > > for those couple of hours and the directory ' > > > /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/ > > > UI-Watchlist-ES-App/0_2 > > > ' did exist for that period. > > > > > > And then I start getting the below error after every 30 secs (probably > > > because if offset commit interval) and messages are being missed from > > > processing. > > > > > > Can you please help? > > > > > > > > > 2018-06-15 08:47:58 [WatchlistUpdate-StreamThread-9] WARN > > > o.a.k.s.p.i.ProcessorStateManager:246 > > > - task [0_2] Failed > > > > > > to write checkpoint file to > > > /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/ > > > UI-Watchlist-ES-App/0_2/.che > > > > > > ckpoint: > > > > > > java.io.FileNotFoundException: > > > /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/ > > > UI-Watchlist-ES-App/0_2/. > > > > > > checkpoint.tmp (No such file or directory) > > > > > > at java.io.FileOutputStream.open0(Native Method) > ~[na:1.8.0_141] > > > > > > at java.io.FileOutputStream.open(FileOutputStream.java:270) > > > ~[na:1.8.0_141] > > > > > > at java.io.FileOutputStream.<init>(FileOutputStream.java:213) > > > ~[na:1.8.0_141] > > > > > > at java.io.FileOutputStream.<init>(FileOutputStream.java:162) > > > ~[na:1.8.0_141] > > > > > > at > > > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write( > > > OffsetCheckpoint.java:73) > > > ~[kafka-streams- > > > > > > 1.0.0.jar:na] > > > > > > at > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > > > checkpoint(ProcessorStateManager.java:3 > > > > > > 20) ~[kafka-streams-1.0.0.jar:na] > > > > > > at > > > org.apache.kafka.streams.processor.internals. > StreamTask$1.run(StreamTask. > > > java:306) > > > [kafka-streams-1.0.0.ja > > > > > > r:na] > > > > > > at > > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > > measureLatencyNs(StreamsMetricsImpl.java:2 > > > > > > 08) [kafka-streams-1.0.0.jar:na] > > > > > > at > > > > > org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask. > > > java:299) > > > [kafka-streams-1.0.0.j > > > > > > ar:na] > > > > > > at > > > > > org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask. > > > java:289) > > > [kafka-streams-1.0.0.j > > > > > > ar:na] > > > > > > at > > > org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply( > > > AssignedTasks.java:87) > > > [kafka-streams-1 > > > > > > .0.0.jar:na] > > > > > > at > > > org.apache.kafka.streams.processor.internals.AssignedTasks. > > > applyToRunningTasks(AssignedTasks.java:451) > > > [ka > > > > > > fka-streams-1.0.0.jar:na] > > > > > > at > > > org.apache.kafka.streams.processor.internals.AssignedTasks.commit( > > > AssignedTasks.java:380) > > > [kafka-streams-1 > > > > > > .0.0.jar:na] > > > > > > at > > > org.apache.kafka.streams.processor.internals.TaskManager.commitAll( > > > TaskManager.java:309) > > > [kafka-streams-1. > > > > > > 0.0.jar:na] > > > > > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( > > > StreamThread.java:1018) > > > [kafka-strea > > > > > > ms-1.0.0.jar:na] > > > > > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > > > StreamThread.java:835) > > > [kafka-streams-1. > > > > > > 0.0.jar:na] > > > > > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > > StreamThread.java:774) > > > [kafka-streams-1. > > > > > > 0.0.jar:na] > > > > > > at > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.run(StreamThread.java:744) > > > [kafka-streams-1.0.0. > > > > > > jar:na] > > > > > > > > > Stream config: > > > > > > 2018-06-15 08:09:28 [main] INFO o.a.k.c.consumer.ConsumerConfig:223 - > > > ConsumerConfig values: > > > > > > auto.commit.interval.ms = 5000 > > > > > > auto.offset.reset = earliest > > > > > > bootstrap.servers = [XYZ] > > > > > > check.crcs = true > > > > > > client.id = WatchlistUpdate-StreamThread-9-consumer > > > > > > connections.max.idle.ms = 540000 > > > > > > enable.auto.commit = false > > > > > > exclude.internal.topics = true > > > > > > fetch.max.bytes = 52428800 > > > > > > fetch.max.wait.ms = 500 > > > > > > fetch.min.bytes = 1 > > > > > > group.id = UI-Watchlist-ES-App > > > > > > heartbeat.interval.ms = 3000 > > > > > > interceptor.classes = null > > > > > > internal.leave.group.on.close = false > > > > > > isolation.level = read_uncommitted > > > > > > key.deserializer = class > > > org.apache.kafka.common.serialization.ByteArrayDeserializer > > > > > > max.partition.fetch.bytes = 1048576 > > > > > > max.poll.interval.ms = 2147483647 > > > > > > max.poll.records = 1000 > > > > > > metadata.max.age.ms = 300000 > > > > > > metric.reporters = [] > > > > > > metrics.num.samples = 2 > > > > > > metrics.recording.level = INFO > > > > > > metrics.sample.window.ms = 30000 > > > > > > partition.assignment.strategy = > > > [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor] > > > > > > receive.buffer.bytes = 65536 > > > > > > reconnect.backoff.max.ms = 1000 > > > > > > reconnect.backoff.ms = 50 > > > > > > request.timeout.ms = 305000 > > > > > > retry.backoff.ms = 100 > > > > > > sasl.jaas.config = null > > > > > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > > > > > sasl.kerberos.min.time.before.relogin = 60000 > > > > > > sasl.kerberos.service.name = null > > > > > > sasl.kerberos.ticket.renew.jitter = 0.05 > > > > > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > > > > > sasl.mechanism = GSSAPI > > > > > > security.protocol = PLAINTEXT > > > > > > send.buffer.bytes = 131072 > > > > > > session.timeout.ms = 10000 > > > > > > ssl.cipher.suites = null > > > > > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > > > > > ssl.endpoint.identification.algorithm = null > > > > > > ssl.key.password = null > > > > > > ssl.keymanager.algorithm = SunX509 > > > > > > ssl.keystore.location = null > > > > > > ssl.keystore.password = null > > > > > > ssl.keystore.type = JKS > > > > > > ssl.protocol = TLS > > > > > > ssl.provider = null > > > > > > ssl.secure.random.implementation = null > > > > > > ssl.trustmanager.algorithm = PKIX > > > > > > ssl.truststore.location = null > > > > > > ssl.truststore.password = null > > > > > > ssl.truststore.type = JKS > > > > > > value.deserializer = class > > > org.apache.kafka.common.serialization.ByteArrayDeserializer > > > > > > Regards, > > > Amandeep Singh > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang