Ian,

an important hint: it is highly recommended to change "state.dir"
configuration parameter from "/tmp/kafka-streams" to a different
directory. It might be, that /tmp gets deleted and thus you loose all
your cached data.

While this is no an issues with regard to data loss (as all data is
reliably store in Kafka brokers) it might give you a performance penalty
as the state need to get re-created.

Furthermore, if the application is running and /tmp gets deleted, your
app instance might crash, as RocksDB relies on the data it writes to disc.

Could this be related to your problem?


-Matthias

On 2/13/17 2:10 PM, Ian Duffy wrote:
> Hi Guozhang,
> 
> Thank you for your assistance on this.
> 
>> About the stack trace pasted before: is it tailing some warning logs like
> "Could not create task ... Will retry"
> 
> Yes, we see the following:
> 
> 17/02/13 21:49:55 WARN internals.StreamThread: Could not create task 0_93.
> Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_93] Failed to lock
> the state directory: /tmp/kafka-streams/text_pipeline_id/0_93
> #011at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> 
> #011at
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
> 
> #011at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
> 
> #011at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
> 
> #011at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> 
> #011at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> 
> #011at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> 
> #011at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> 
> 
> It is always partition 93, additionally, we've seen processing of all the
> other partitions without issue in between the crashes but the offset for 93
> hasn't moved.
> 
>> it is thrown as a exception all they way to the thread's
> setUncaughtExceptionHandler, in which case your code should have a
> `Unexpected Exception caught in thread` error log entry and the thread will
> die.
> 
> We have seen this too:
> 
> org.apache.kafka.streams.errors.ProcessorStateException: task directory
> [/tmp/kafka-streams/dougal_text_pipeline_id/0_94] doesn't exist and
> couldn't be created
> #011at
> org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:75)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StateDirectory.lock(StateDirectory.java:102)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:205)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(StreamThread.java:753)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:664)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> 
> 17/02/13 21:11:01 INFO internals.StreamThread: stream-thread
> [StreamThread-1] Committing all tasks because the commit interval 30000ms
> has elapsed
> 17/02/13 21:11:01 INFO internals.StreamThread: stream-thread
> [StreamThread-1] Committing task StreamTask 0_70
> 17/02/13 21:11:01 INFO internals.StateDirectory: Deleting obsolete state
> directory 0_61 for task 0_61
> 
> Our initial thoughts with "doesn't exist and couldn't be created" made us
> think it might be a disk space or permissions issue but we confirmed this
> not to be the case.
> 
>> Could you monitor the number of live stream threads during the runtime
> (originally there should be 8 according to your configs) and see if the
> count has decreased, meaning some threads have been dead?
> 
> I'll try get some data on this for you tomorrow.
> 
>> When you reported that the process "just hang on attempting to rejoin",
> how long have you observed it hanging before killed the process?
> 
> With the 10.1.1 client we waited up to an hour, it fails to recover at all.
> 
> With the 10.2 client today it consistently self-recovered but was quickly
> switching back into a rebalancing state.
> We've been unable to identify a cause for rebalancing so frequently. We
> thought it be because processing time can potentially be pretty long
> (greater than 5min less than10min)
> 
>> Could you try with fewer number of partitions, from 96 to 8 just for the
> purpose of trouble shooting to see if this issue still exists?
> 
> Will try this tomorrow, I would expect we would see a lot less rebalancing
> with fewer partitions.
> 
> Thanks again for all your help.
> Ian.
> 
> On 13 February 2017 at 19:55, Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> @Ian, Nina
>>
>> Thanks for the detailed description of your apps. A couple of follow-up
>> questions I have to help us further investigate your issue:
>>
>> 1. About the stack trace pasted before: is it tailing some warning logs
>> like "Could not create task ... Will retry" (i.e. it is part of that
>> warning log) or it is thrown as a exception all they way to the
>> thread's setUncaughtExceptionHandler,
>> in which case your code should have a `Unexpected Exception caught in
>> thread` error log entry and the thread will die.
>>
>> 2. Could you monitor the number of live stream threads during the runtime
>> (originally there should be 8 according to your configs) and see if the
>> count has decreased, meaning some threads have been dead?
>>
>> 3. When you reported that the process "just hang on attempting to rejoin",
>> how long have you observed it hanging before killed the process?
>>
>> 4. Could you try with fewer number of partitions, from 96 to 8 just for the
>> purpose of trouble shooting to see if this issue still exists?
>>
>>
>> Guozhang
>>
>>
>> On Mon, Feb 13, 2017 at 11:38 AM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>
>>> Thanks for reporting the JIRA Swen.
>>>
>>> Jason has a patch ready under KAFKA-4761 and I have reviewed it. You
>> could
>>> try it out and see if it has fixed your issue.
>>>
>>> After this is merged in, we will need another RC.
>>>
>>>
>>> Guozhang
>>>
>>> On Mon, Feb 13, 2017 at 9:52 AM, Moczarski, Swen <smoczarski@ebay-
>>> kleinanzeigen.de> wrote:
>>>
>>>> +0 (non-binding)
>>>>
>>>> Thanks for compiling a new release candidate.
>>>>
>>>> I get an NullPointerException when setting batch.size=0 on producer
>>>> config. This worked before with 0.10.1.1.
>>>> See https://issues.apache.org/jira/browse/KAFKA-4761
>>>>
>>>> Regards,
>>>> Swen
>>>>
>>>> Am 2/10/17, 5:51 PM schrieb "Ewen Cheslack-Postava" <e...@confluent.io
>>> :
>>>>
>>>>     Hello Kafka users, developers and client-developers,
>>>>
>>>>     This is RC1 for release of Apache Kafka 0.10.2.0.
>>>>
>>>>     This is a minor version release of Apache Kafka. It includes 19 new
>>>> KIPs.
>>>>     See the release notes and release plan (https://cwiki.apache.org/
>>>>     confluence/display/KAFKA/Release+Plan+0.10.2.0) for more details. A
>>>> few
>>>>     feature highlights: SASL-SCRAM support, improved client
>> compatibility
>>>> to
>>>>     allow use of clients newer than the broker, session windows and
>> global
>>>>     tables in the Kafka Streams API, single message transforms in the
>>>> Kafka
>>>>     Connect framework.
>>>>
>>>>     Important note: in addition to the artifacts generated using JDK7
>> for
>>>> Scala
>>>>     2.10 and 2.11, this release also includes experimental artifacts
>> built
>>>>     using JDK8 for Scala 2.12.
>>>>
>>>>     Important code changes since RC0 (non-docs, non system tests):
>>>>
>>>>     * KAFKA-4728; KafkaConsumer#commitSync should copy its input
>>>>     * KAFKA-4441; Monitoring incorrect during topic creation and
>> deletion
>>>>     * KAFKA-4734; Trim the time index on old segments
>>>>     * KAFKA-4725; Stop leaking messages in produce request body when
>>>> requests
>>>>     are delayed
>>>>     * KAFKA-4716: Fix case when controller cannot be reached
>>>>
>>>>     Release notes for the 0.10.2.0 release:
>>>>     http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/
>> RELEASE_NOTES.html
>>>>
>>>>     *** Please download, test and vote by Monday, Feb 13, 5pm PT ***
>>>>
>>>>     Kafka's KEYS file containing PGP keys we use to sign the release:
>>>>     http://kafka.apache.org/KEYS
>>>>
>>>>     * Release artifacts to be voted upon (source and binary):
>>>>     http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/
>>>>
>>>>     * Maven artifacts to be voted upon:
>>>>     https://repository.apache.org/content/groups/staging/
>>>>
>>>>     * Javadoc:
>>>>     http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/javadoc/
>>>>
>>>>     * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.0 tag:
>>>>     https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
>>>> e825b7994bf8c8c4871d1e0973e287e6d31c7ae4
>>>>
>>>>
>>>>     * Documentation:
>>>>     http://kafka.apache.org/0102/documentation.html
>>>>
>>>>     * Protocol:
>>>>     http://kafka.apache.org/0102/protocol.html
>>>>
>>>>     * Successful Jenkins builds for the 0.10.2 branch:
>>>>     Unit/integration tests: https://builds.apache.org/job/
>>>> kafka-0.10.2-jdk7/74/
>>>>     System tests: https://jenkins.confluent.io/j
>>>> ob/system-test-kafka-0.10.2/25/
>>>>
>>>>     /**************************************
>>>>
>>>>     Thanks,
>>>>     Ewen
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to