Hi!

Thanks for the debugging this, I think there is in fact an issue in the 0.9
consumer.

I'll open a ticket for it, will try to fix that as soon as possible...

Stephan


On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak <m...@touk.pl> wrote:

> Hi,
>
> from time to time when we cancel streaming jobs (or they are failing for
> some reason) we encounter:
>
> 2016-03-09 10:25:29,799 [Canceler for Source: read objects from topic:
> (...) ' did not react to cancelling signal, but is stuck in method:
>  java.lang.Object.wait(Native Method)
> java.lang.Thread.join(Thread.java:1253)
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> java.lang.Thread.run(Thread.java:745)
>
>
> Now, relevant stacktrace is this:
>
> "Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=...  nid=0x2e96 in
> Object.wait() [0x00007f2bac847000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
>         - locked <0x000000041ae00180> (a java.util.ArrayDeque)
>         at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>         at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
>         - locked <0x00000004be0002f0> (a
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>         at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>         at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>         at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>         at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
>         at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>         at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>         at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>         at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>         at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>         at
> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>         - locked <0x000000041ae001c8> (a java.lang.Object)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>         - locked <0x000000041ae001c8> (a java.lang.Object)
>
> and also:
> "OutputFlusher" #1244231 daemon prio=5 os_prio=0 tid=0x00007f2a39d4e800
> nid=0x2e7d waiting for monitor entry [0x00007f2a3e5e4000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:172)
>         - waiting to lock <0x00000004be0002f0> (a
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>         at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176)
>
>
> - looks like source tries to process remaining kafka messages, but it is
> stuck on partitioning by key.
>
> I managed to take heap dump and look what's going on inside LocalBufferPool
> - availableMemorySegments queue is empty
> - numberOfRequestedMemorySegments == currentPoolSize == 16
> - there are not registeredListeners
>
> Now, it seems that loop in LocalBufferPool#142 without end, waiting for
> buffer recycle - but from what I see it won't happen because OutputFlusher
> is blocked by this loop.
>
> The problem occurs (it seems) when more or less at the same time as job
> cancellation we start new job (e.g. taskmanager is restarted, one job is
> failing because of some problem,
> and another one is just starting) - so I wonder could it be some problem
> with setNumBuffers method - although it looks synchronized enough...
>
> We are using version 1.0.0 (RC4) btw
>
> I hope to dig further into this - but for now this is all I managed to
> find.
>
> thanks,
> maciek
>

Reply via email to