Hi Ara,

There are some instructions here for monitoring whether RocksDb is having its 
writes stalled: https://github.com/facebook/rocksdb/wiki/Write-Stalls 
<https://github.com/facebook/rocksdb/wiki/Write-Stalls>, together with some 
instructions for tuning. Could you share RocksDb's LOG file? 

In addition, are you seeing any other behaviour in the system that could 
explain the slowdown, like increased CPU utilisation? Would be good to know the 
CPU and disk utilization when the slow down starts to happen.

Thanks
Eno

> On 25 Feb 2017, at 19:20, Ara Ebrahimi <ara.ebrah...@argyledata.com> wrote:
> 
> Hi,
> 
> Thanks for the reply.
> 
> Let me give you more information:
> 
> - we have a group by + aggregate. The hopping time window is 10 minutes but 
> the maintainMs is 180 days (we’re trying to reprocess the entire data set of 
> billions of records).
> 
> - Over time, after just a few hours, the aggregate slows down considerably. 
> Initially it keeps up with the ingest rate just fine, but then after a few 
> hours we see that a counter we have in the body of the aggregate block stops 
> incrementing completely and resumes after a few minutes! It seems to me it’s 
> trying to roll the rocksdb files and pauses the whole StreamThread? Is that 
> possible? Is that the root cause of this issue? What’s the best way to 
> monitor rocksdb rolling? Can’t find much in the LOG file. I don’t see any 
> stalling.
> 
> - If above case is possible then what’s the solution? Much smaller 
> maintainMs? Or tuning rocksdb? This is on SSD. Our records have windowed keys 
> of 28 bytes and 200 bytes average value size.
> 
> Ara.
> 
>> On Feb 24, 2017, at 3:15 AM, Damian Guy <damian....@gmail.com> wrote:
>> 
>> Hi Ara,
>> 
>> This usually means that one, or more, of your StreamThreads is taking a
>> long time during recovery or processing. So the rebalance times out and you
>> get another rebalance.
>> The second exception, i.e.,
>> 2017-02-22 20:12:48 WARN  StreamThread:1184 - Could not create task 3_0.
>> Will retry.
>> org.apache.kafka.streams.errors.LockException: task [3_0] Failed to lock
>> the state directory: /kafka/1/kafka-streams/argyle-streams/3_0
>> 
>> Is because you have at least one thread that is still processing and hasn't
>> given up its tasks that have been revoked yet.
>> You probably need to get a thread dump to see what the threads are doing at
>> the time. As the first exception suggests, you may need to increase the
>> max.poll.interval.ms
>> 
>> Thanks,
>> Damian
>> 
>> On Thu, 23 Feb 2017 at 18:26 Ara Ebrahimi <ara.ebrah...@argyledata.com>
>> wrote:
>> 
>>> Hi,
>>> 
>>> After upgrading to 0.10.20.0 I got this:
>>> 
>>> Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>> cannot be completed since the group has already rebalanced and assigned the
>>> partitions to another member. This means that the time between subsequent
>>> calls to poll() was longer than the configured max.poll.interval.ms,
>>> which typically implies that the poll loop is spending too much time
>>> message processing. You can address this either by increasing the session
>>> timeout or by reducing the maximum size of batches returned in poll() with
>>> max.poll.records.
>>> at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
>>> at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
>>> ... 10 more
>>> 
>>> Which lead to this:
>>> 
>>> 2017-02-22 20:12:16 ERROR StreamThread:505 - stream-thread
>>> [StreamThread-9] Failed while executing StreamTask 4_6 due to commit
>>> consumer offsets:
>>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
>>> completed since the group has already rebalanced and assigned the
>>> partitions to another member. This means that the time between subsequent
>>> calls to poll() was longer than the configured max.poll.interval.ms,
>>> which typically implies that the poll loop is spending too much time
>>> message processing. You can address this either by increasing the session
>>> timeout or by reducing the maximum size of batches returned in poll() with
>>> max.poll.records.
>>> at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
>>> at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
>>> at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396)
>>> at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329)
>>> at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>>> at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>>> 2017-02-22 20:12:16 ERROR ConsumerCoordinator:400 - User provided listener
>>> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
>>> argyle-streams failed on partition revocation
>>> org.apache.kafka.streams.errors.StreamsException: stream-thread
>>> [StreamThread-9] failed to suspend stream tasks
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
>>> at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396)
>>> at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329)
>>> at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>>> at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>>> 
>>> And an infinite loop of these retries:
>>> 
>>> 2017-02-22 20:12:48 WARN  StreamThread:1184 - Could not create task 3_0.
>>> Will retry.
>>> org.apache.kafka.streams.errors.LockException: task [3_0] Failed to lock
>>> the state directory: /kafka/1/kafka-streams/argyle-streams/3_0
>>> at
>>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
>>> at
>>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>>> at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>>> at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>>> at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>>> at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>>> 
>>> Previously this would not retry and we had a chance to restart the process.
>>> 
>>> After a manual restart processing resumed for everything excepts parts
>>> dealing with ktables, as if it couldn’t recover rocksdb files. What’s the
>>> recovery process for reading back rocksdb files and resuming processing
>>> exactly?
>>> 
>>> Aside from setting better values for kafka consumer polling, what’s the
>>> best strategy for dealing with such failures? I’m worried that if such a
>>> things happens recovery doesn’t take place and we can’t resume processing
>>> unless we delete kafka streams state dir.
>>> 
>>> Ara.
>>> 
>>> 
>>> 
>>> ________________________________
>>> 
>>> This message is for the designated recipient only and may contain
>>> privileged, proprietary, or otherwise confidential information. If you have
>>> received it in error, please notify the sender immediately and delete the
>>> original. Any other use of the e-mail by you is prohibited. Thank you in
>>> advance for your cooperation.
>>> 
>>> ________________________________
>>> 
>> 
>> 
>> 
>> ________________________________
>> 
>> This message is for the designated recipient only and may contain 
>> privileged, proprietary, or otherwise confidential information. If you have 
>> received it in error, please notify the sender immediately and delete the 
>> original. Any other use of the e-mail by you is prohibited. Thank you in 
>> advance for your cooperation.
>> 
>> ________________________________
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________

Reply via email to