streaming job reading from kafka stuck while cancelling

2016-03-09 Thread Maciek Próchniak

Hi,

from time to time when we cancel streaming jobs (or they are failing for 
some reason) we encounter:


2016-03-09 10:25:29,799 [Canceler for Source: read objects from topic: 
(...) ' did not react to cancelling signal, but is stuck in method:

 java.lang.Object.wait(Native Method)
java.lang.Thread.join(Thread.java:1253)
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)


Now, relevant stacktrace is this:

"Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=...  nid=0x2e96 in 
Object.wait() [0x7f2bac847000]

   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)

- locked <0x00041ae00180> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
- locked <0x0004be0002f0> (a 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)

at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)

- locked <0x00041ae001c8> (a java.lang.Object)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)

- locked <0x00041ae001c8> (a java.lang.Object)

and also:
"OutputFlusher" #1244231 daemon prio=5 os_prio=0 tid=0x7f2a39d4e800 
nid=0x2e7d waiting for monitor entry [0x7f2a3e5e4000]

   java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:172)
- waiting to lock <0x0004be0002f0> (a 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176)



- l

Re: streaming job reading from kafka stuck while cancelling

2016-03-09 Thread Stephan Ewen
Hi!

Thanks for the debugging this, I think there is in fact an issue in the 0.9
consumer.

I'll open a ticket for it, will try to fix that as soon as possible...

Stephan


On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak  wrote:

> Hi,
>
> from time to time when we cancel streaming jobs (or they are failing for
> some reason) we encounter:
>
> 2016-03-09 10:25:29,799 [Canceler for Source: read objects from topic:
> (...) ' did not react to cancelling signal, but is stuck in method:
>  java.lang.Object.wait(Native Method)
> java.lang.Thread.join(Thread.java:1253)
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> java.lang.Thread.run(Thread.java:745)
>
>
> Now, relevant stacktrace is this:
>
> "Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=...  nid=0x2e96 in
> Object.wait() [0x7f2bac847000]
>java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
> - locked <0x00041ae00180> (a java.util.ArrayDeque)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
> - locked <0x0004be0002f0> (a
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
> - locked <0x00041ae001c8> (a java.lang.Object)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
> - locked <0x00041ae001c8> (a java.lang.Object)
>
> and also:
> "OutputFlusher" #1244231 daemon prio=5 os_prio=0 tid=0x7f2a39d4e800
> nid=0x2e7d waiting for monitor entry [0x7f2a3e5e4000]
>java.lang.Thread.State: BLOCKED (on o

Re: streaming job reading from kafka stuck while cancelling

2016-03-09 Thread Stephan Ewen
Here is the Jira issue: https://issues.apache.org/jira/browse/FLINK-3595

On Wed, Mar 9, 2016 at 2:06 PM, Stephan Ewen  wrote:

> Hi!
>
> Thanks for the debugging this, I think there is in fact an issue in the
> 0.9 consumer.
>
> I'll open a ticket for it, will try to fix that as soon as possible...
>
> Stephan
>
>
> On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak  wrote:
>
>> Hi,
>>
>> from time to time when we cancel streaming jobs (or they are failing for
>> some reason) we encounter:
>>
>> 2016-03-09 10:25:29,799 [Canceler for Source: read objects from topic:
>> (...) ' did not react to cancelling signal, but is stuck in method:
>>  java.lang.Object.wait(Native Method)
>> java.lang.Thread.join(Thread.java:1253)
>>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)
>>
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> java.lang.Thread.run(Thread.java:745)
>>
>>
>> Now, relevant stacktrace is this:
>>
>> "Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=...  nid=0x2e96 in
>> Object.wait() [0x7f2bac847000]
>>java.lang.Thread.State: TIMED_WAITING (on object monitor)
>> at java.lang.Object.wait(Native Method)
>> at
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
>> - locked <0x00041ae00180> (a java.util.ArrayDeque)
>> at
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>> at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
>> - locked <0x0004be0002f0> (a
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>> at
>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>> at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>> at scala.collection.immutable.List.foreach(List.scala:381)
>> at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
>> at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>> at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>> at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>> at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>> at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>> - locked <0x00041ae001c8> (a java.lang.Object)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>> - locked

Re: streaming job reading from kafka stuck while cancelling

2016-03-09 Thread Maciek Próchniak

Thanks,

that makes sense...
Guess I'll try some dirty workaround for now by interrupting consumer 
thread if it's doesn't finish after some time...


maciek

On 09/03/2016 14:42, Stephan Ewen wrote:

Here is the Jira issue: https://issues.apache.org/jira/browse/FLINK-3595

On Wed, Mar 9, 2016 at 2:06 PM, Stephan Ewen > wrote:


Hi!

Thanks for the debugging this, I think there is in fact an issue
in the 0.9 consumer.

I'll open a ticket for it, will try to fix that as soon as possible...

Stephan


On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak mailto:m...@touk.pl>> wrote:

Hi,

from time to time when we cancel streaming jobs (or they are
failing for some reason) we encounter:

2016-03-09 10:25:29,799 [Canceler for Source: read objects
from topic: (...) ' did not react to cancelling signal, but is
stuck in method:
 java.lang.Object.wait(Native Method)
java.lang.Thread.join(Thread.java:1253)

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)

org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)

org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)


Now, relevant stacktrace is this:

"Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=... 
nid=0x2e96 in Object.wait() [0x7f2bac847000]

   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at

org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
- locked <0x00041ae00180> (a java.util.ArrayDeque)
at

org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at

org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
- locked <0x0004be0002f0> (a

org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
at

org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at

org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
at

org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at

org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at

org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
at

org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
at scala.collection.immutable.List.foreach(List.scala:381)
at

org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
at

org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at

org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at

org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at

org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at

org.apache.flink.streaming.api.operators.StreamMap.processElemen

Re: streaming job reading from kafka stuck while cancelling

2016-03-09 Thread Stephan Ewen
The reason that the consumer thread is not interrupted (which is the reason
why there is a separate consumer thread in the first place) is that Kafka
has a bug (or design issue) where thread interrupting may lead to a
deadlock in the thread.

Interrupting the thread would need to make sure that interruption never
happens while the thread is in the Kafka function stack, only while it is
in Flink code.

On Wed, Mar 9, 2016 at 4:25 PM, Maciek Próchniak  wrote:

> Thanks,
>
> that makes sense...
> Guess I'll try some dirty workaround for now by interrupting consumer
> thread if it's doesn't finish after some time...
>
> maciek
>
>
> On 09/03/2016 14:42, Stephan Ewen wrote:
>
> Here is the Jira issue: https://issues.apache.org/jira/browse/FLINK-3595
>
> On Wed, Mar 9, 2016 at 2:06 PM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> Thanks for the debugging this, I think there is in fact an issue in the
>> 0.9 consumer.
>>
>> I'll open a ticket for it, will try to fix that as soon as possible...
>>
>> Stephan
>>
>>
>> On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak < 
>> m...@touk.pl> wrote:
>>
>>> Hi,
>>>
>>> from time to time when we cancel streaming jobs (or they are failing for
>>> some reason) we encounter:
>>>
>>> 2016-03-09 10:25:29,799 [Canceler for Source: read objects from topic:
>>> (...) ' did not react to cancelling signal, but is stuck in method:
>>>  java.lang.Object.wait(Native Method)
>>> java.lang.Thread.join(Thread.java:1253)
>>>
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)
>>>
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>>
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>>
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> Now, relevant stacktrace is this:
>>>
>>> "Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=...  nid=0x2e96 in
>>> Object.wait() [0x7f2bac847000]
>>>java.lang.Thread.State: TIMED_WAITING (on object monitor)
>>> at java.lang.Object.wait(Native Method)
>>> at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
>>> - locked <0x00041ae00180> (a java.util.ArrayDeque)
>>> at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
>>> - locked <0x0004be0002f0> (a
>>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>> at
>>> org.apach

Re: streaming job reading from kafka stuck while cancelling

2016-03-10 Thread Ufuk Celebi
Hey Maciek! I'm working on the other proposed fix by closing the
buffer pool early. I expect the fix to make it into the next bugfix
release 1.0.1 (or 1.0.2 if 1.0.1 comes very soon).

– Ufuk