[jira] [Commented] (FLINK-9290) The job is unable to recover from a checkpoint

2018-05-02 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462010#comment-16462010
 ] 

Sihua Zhou commented on FLINK-9290:
---

Hi [~narayaruna] could you please close this? Cause it is the same issue that 
we have addressed in 
[FLINK-9263|https://issues.apache.org/jira/browse/FLINK-9263] in 1.5, if you 
would like to know more about what happen there, you could checkout the 
[PR|https://github.com/apache/flink/pull/5930] of FLINK-9263 to get more 
information.

> The job is unable to recover from a checkpoint
> --
>
> Key: FLINK-9290
> URL: https://issues.apache.org/jira/browse/FLINK-9290
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Blocker
>
> Using rocksdb state backend.
> The jobs runs fine for more than 24 hours and attempts recovery because of an 
> error from the sink. It continues to fail at the time recovery with the 
> following error. The workaround is to cancel the job and start it again.
> java.lang.IllegalStateException: Could not initialize operator state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
> Serialization trace:
> topic 
> (org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:300)
>   ... 6 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
>   at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>   at java.util.ArrayList.get(ArrayList.java:433)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9291) Checkpoint failure (CIRCULAR REFERENCE:java.lang.NegativeArraySizeException)

2018-05-02 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462005#comment-16462005
 ] 

Sihua Zhou commented on FLINK-9291:
---

I think this looks like the same problem as what happen in 
[FLINK-9268|https://issues.apache.org/jira/browse/FLINK-9268], and 
[~srichter]'s comments could make sense here as well in my mind.

> Checkpoint failure (CIRCULAR REFERENCE:java.lang.NegativeArraySizeException)
> 
>
> Key: FLINK-9291
> URL: https://issues.apache.org/jira/browse/FLINK-9291
> Project: Flink
>  Issue Type: Bug
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> Using rocksdb for state and after running for few hours, checkpointing fails 
> with the following error. The job recovers fine after this.
> AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 
> 215 for operator makeSalpTrace -> countTraces -> makeZipkinTrace -> (Map -> 
> Sink: bs, Sink: es) (14/80).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 215 for 
> operator makeSalpTrace -> countTraces -> makeZipkinTrace -> (Map -> Sink: bs, 
> Sink: es) (14/80).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NegativeArraySizeException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NegativeArraySizeException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>   ... 7 more
>   Caused by: java.lang.NegativeArraySizeException
>   at org.rocksdb.RocksIterator.value0(Native Method)
>   at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBMergeIterator.value(RocksDBKeyedStateBackend.java:1898)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateData(RocksDBKeyedStateBackend.java:704)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:556)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:466)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:424)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   [CIRCULAR REFERENCE:java.lang.NegativeArraySizeException]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9178) Add rate control for kafka source

2018-05-02 Thread Tarush Grover (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461994#comment-16461994
 ] 

Tarush Grover edited comment on FLINK-9178 at 5/3/18 6:31 AM:
--

[~tzulitai] Actually I had checked this {{max.partition.fetch.bytes}}, there 
might be two ways that we can add rate control one way is by specifying number 
of bytes and other is by mentioning number of events eg. `{{x events per 
fetch}}`. We have the configuration `{{max.partition.fetch.bytes}}` which give 
the maximum amount of data per-partition the server. Now we have to decide 
whether we want to implement second way or we can go with first one.


was (Author: app-tarush):
[~tzulitai] Actually I had checked this `max.partition.fetch.bytes`, there 
might be two ways that we can add rate control one way is by specifying number 
of bytes and other is by mentioning number of events eg. `x events per fetch`. 
We have the configuration `max.partition.fetch.bytes` which give the maximum 
amount of data per-partition the server. Now we have to decide whether we want 
to implement second way or we can go with first one.

> Add rate control for kafka source
> -
>
> Key: FLINK-9178
> URL: https://issues.apache.org/jira/browse/FLINK-9178
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: buptljy
>Assignee: Tarush Grover
>Priority: Major
>
> When I want to run the flink program from the earliest offset in Kafka, it'll 
> be very easy to cause OOM if there are too much data, because of too many 
> HeapMemorySegment in NetworkBufferPool.
> Maybe we should have some settings to control the rate of the receiving data?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9178) Add rate control for kafka source

2018-05-02 Thread Tarush Grover (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461994#comment-16461994
 ] 

Tarush Grover commented on FLINK-9178:
--

[~tzulitai] Actually I had checked this `max.partition.fetch.bytes`, there 
might be two ways that we can add rate control one way is by specifying number 
of bytes and other is by mentioning number of events eg. `x events per fetch`. 
We have the configuration `max.partition.fetch.bytes` which give the maximum 
amount of data per-partition the server. Now we have to decide whether we want 
to implement second way or we can go with first one.

> Add rate control for kafka source
> -
>
> Key: FLINK-9178
> URL: https://issues.apache.org/jira/browse/FLINK-9178
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: buptljy
>Assignee: Tarush Grover
>Priority: Major
>
> When I want to run the flink program from the earliest offset in Kafka, it'll 
> be very easy to cause OOM if there are too much data, because of too many 
> HeapMemorySegment in NetworkBufferPool.
> Maybe we should have some settings to control the rate of the receiving data?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9291) Checkpoint failure (CIRCULAR REFERENCE:java.lang.NegativeArraySizeException)

2018-05-02 Thread Narayanan Arunachalam (JIRA)
Narayanan Arunachalam created FLINK-9291:


 Summary: Checkpoint failure (CIRCULAR 
REFERENCE:java.lang.NegativeArraySizeException)
 Key: FLINK-9291
 URL: https://issues.apache.org/jira/browse/FLINK-9291
 Project: Flink
  Issue Type: Bug
Reporter: Narayanan Arunachalam


Using rocksdb for state and after running for few hours, checkpointing fails 
with the following error. The job recovers fine after this.
AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 
215 for operator makeSalpTrace -> countTraces -> makeZipkinTrace -> (Map -> 
Sink: bs, Sink: es) (14/80).}
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 215 for 
operator makeSalpTrace -> countTraces -> makeZipkinTrace -> (Map -> Sink: bs, 
Sink: es) (14/80).
... 6 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.NegativeArraySizeException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
... 5 more
Suppressed: java.lang.Exception: Could not properly cancel managed 
keyed state future.
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
... 5 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.NegativeArraySizeException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at 
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
... 7 more
Caused by: java.lang.NegativeArraySizeException
at org.rocksdb.RocksIterator.value0(Native Method)
at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBMergeIterator.value(RocksDBKeyedStateBackend.java:1898)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateData(RocksDBKeyedStateBackend.java:704)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:556)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:466)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:424)
at 
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
... 5 more
[CIRCULAR REFERENCE:java.lang.NegativeArraySizeException]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461960#comment-16461960
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-8500 at 5/3/18 5:43 AM:


I'm still leaning towards the second approach, where we maintain our own class.
It'd be more work, but at least we ourselves will also gain a better picture of 
what we are exposing.

 [~FredTing] yes, maybe we should discuss those as a separate issue, and just 
focus on Kafka for this ticket.


was (Author: tzulitai):
I'm still leaning towards the second approach, where we maintain our own class.
[~FredTing] yes, maybe we should discuss those as a separate issue, and just 
focus on Kafka for this ticket.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461960#comment-16461960
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8500:


I'm still leaning towards the second approach, where we maintain our own class.
[~FredTing] yes, maybe we should discuss those as a separate issue, and just 
focus on Kafka for this ticket.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-02 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8500:
---
Comment: was deleted

(was: Let's go with the second approach. I've quickly checked Kafka 1.x, it 
seems like the \{{ConsumerRecord}} class will remain stable, at least for the 
near future.

 

[~FredTing] yes, for example the Kinesis deserialization schema could benefit 
from a more generic solution. And yes, lets discuss them elsewhere / have a 
separate Jira for them.)

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461955#comment-16461955
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8500:


Let's go with the second approach. I've quickly checked Kafka 1.x, it seems 
like the \{{ConsumerRecord}} class will remain stable, at least for the near 
future.

 

[~FredTing] yes, for example the Kinesis deserialization schema could benefit 
from a more generic solution. And yes, lets discuss them elsewhere / have a 
separate Jira for them.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-02 Thread Fred Teunissen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461953#comment-16461953
 ] 

Fred Teunissen commented on FLINK-8500:
---

Both approaches will work, but we have to choose. The first approach exposes 
the Kafka API as part of the Flink API. The second approach hides the Kafka API 
but will require a bit more resources to maintain. The second approach would 
have my vote.

I don’t want to introduce scope creep, but I think there are more input sources 
that could benefit from a more generic (de)serialization scheme. Should we look 
into that, or leave it for now (in issues 5479 the idea of a `common connector 
framework` is mentioned, should it be picked up there)?

 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9288) clarify a few points in the event time / watermark docs

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461941#comment-16461941
 ] 

ASF GitHub Bot commented on FLINK-9288:
---

Github user alpinegizmo closed the pull request at:

https://github.com/apache/flink/pull/5949


> clarify a few points in the event time / watermark docs
> ---
>
> Key: FLINK-9288
> URL: https://issues.apache.org/jira/browse/FLINK-9288
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
>Priority: Minor
> Fix For: 1.5.0, 1.6.0
>
>
> There are a few things that folks often seem to miss when reading the event 
> time and watermark docs. Adding a couple of sentences and a couple of links 
> should help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5949: [FLINK-9288][docs] clarify the event time / waterm...

2018-05-02 Thread alpinegizmo
GitHub user alpinegizmo reopened a pull request:

https://github.com/apache/flink/pull/5949

[FLINK-9288][docs] clarify the event time / watermark docs

This PR only affects the documentation (for event time and watermarks). I 
wanted to make a couple of things clearer, and to provide a couple of 
additional internal links.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alpinegizmo/flink event-time-watermarks-docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5949.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5949


commit a5c66a56845ada7b0a20471a4842175c5a6566d6
Author: David Anderson 
Date:   2018-05-02T11:50:48Z

[FLINK-9288][docs] clarify the event time / watermark docs

commit a100cab6fec6ab3affa4ecc13c46e0081bd19b62
Author: David Anderson 
Date:   2018-05-03T05:12:12Z

Reworked the section on event time to be less absolutist.




---


[jira] [Commented] (FLINK-9288) clarify a few points in the event time / watermark docs

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461942#comment-16461942
 ] 

ASF GitHub Bot commented on FLINK-9288:
---

GitHub user alpinegizmo reopened a pull request:

https://github.com/apache/flink/pull/5949

[FLINK-9288][docs] clarify the event time / watermark docs

This PR only affects the documentation (for event time and watermarks). I 
wanted to make a couple of things clearer, and to provide a couple of 
additional internal links.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alpinegizmo/flink event-time-watermarks-docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5949.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5949


commit a5c66a56845ada7b0a20471a4842175c5a6566d6
Author: David Anderson 
Date:   2018-05-02T11:50:48Z

[FLINK-9288][docs] clarify the event time / watermark docs

commit a100cab6fec6ab3affa4ecc13c46e0081bd19b62
Author: David Anderson 
Date:   2018-05-03T05:12:12Z

Reworked the section on event time to be less absolutist.




> clarify a few points in the event time / watermark docs
> ---
>
> Key: FLINK-9288
> URL: https://issues.apache.org/jira/browse/FLINK-9288
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
>Priority: Minor
> Fix For: 1.5.0, 1.6.0
>
>
> There are a few things that folks often seem to miss when reading the event 
> time and watermark docs. Adding a couple of sentences and a couple of links 
> should help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9288) clarify a few points in the event time / watermark docs

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461940#comment-16461940
 ] 

ASF GitHub Bot commented on FLINK-9288:
---

Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5949
  
@bowenli86 Thanks for the feedback. I've reworked that event time section. 
Hopefully it's now more complete and accurate without being too complex.


> clarify a few points in the event time / watermark docs
> ---
>
> Key: FLINK-9288
> URL: https://issues.apache.org/jira/browse/FLINK-9288
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
>Priority: Minor
> Fix For: 1.5.0, 1.6.0
>
>
> There are a few things that folks often seem to miss when reading the event 
> time and watermark docs. Adding a couple of sentences and a couple of links 
> should help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5949: [FLINK-9288][docs] clarify the event time / waterm...

2018-05-02 Thread alpinegizmo
Github user alpinegizmo closed the pull request at:

https://github.com/apache/flink/pull/5949


---


[GitHub] flink issue #5949: [FLINK-9288][docs] clarify the event time / watermark doc...

2018-05-02 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5949
  
@bowenli86 Thanks for the feedback. I've reworked that event time section. 
Hopefully it's now more complete and accurate without being too complex.


---


[jira] [Assigned] (FLINK-9276) Improve error message when TaskManager fails

2018-05-02 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9276:
---

Assignee: vinoyang

> Improve error message when TaskManager fails
> 
>
> Key: FLINK-9276
> URL: https://issues.apache.org/jira/browse/FLINK-9276
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Critical
>
> When a TaskManager fails, we frequently get a message
> {code}
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> container_1524853016208_0001_01_000102
> {code}
> This message is misleading in that it sounds like an intended operation, when 
> it really is a failure of a container that the {{ResourceManager}} reports to 
> the {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-05-02 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5928
  
@zentol I have addressed your comments.


---


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461726#comment-16461726
 ] 

ASF GitHub Bot commented on FLINK-9138:
---

Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/5860
  
@fhueske Can you PTAL and merge this PR? 


> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

2018-05-02 Thread glaksh100
Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/5860
  
@fhueske Can you PTAL and merge this PR? 


---


[jira] [Commented] (FLINK-9273) Class cast exception

2018-05-02 Thread Bob Lau (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461701#comment-16461701
 ] 

Bob Lau commented on FLINK-9273:


[~fhueske] Thanks for your response!  I've resolved the problem. As you say, 
it's a type problem.  Thank you again

> Class cast exception
> 
>
> Key: FLINK-9273
> URL: https://issues.apache.org/jira/browse/FLINK-9273
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Streaming, Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Priority: Major
>
> Exception stack is as follows:
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121)
> at 
> com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:385)
> at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
> at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:630)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:583)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:396)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:307)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> ... 1 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9281) LogBack not working

2018-05-02 Thread Tim (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461564#comment-16461564
 ] 

Tim commented on FLINK-9281:


Update:   Looks like this works with logback v1.1.3, but does not work with 
version 1.3.0-alpha4.     

Here's what I have in my $FLINK_DIR/lib folder:
 # logback-classic-1.1.3.jar
 # logback-core-1.1.3.jar
 # log4j-over-slf4j-1.7.7.jar

 

> LogBack not working
> ---
>
> Key: FLINK-9281
> URL: https://issues.apache.org/jira/browse/FLINK-9281
> Project: Flink
>  Issue Type: Bug
>  Components: Logging
>Affects Versions: 1.4.2
>Reporter: Tim
>Priority: Major
>
> I am trying to get Flink to work with Logback instead of Log4J.   However, it 
> is not working. 
> My setup is as follows the advice on this page: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/best_practices.html#use-logback-when-running-flink-on-a-cluster
>  * Flink v1.4.2 running a stand-alone cluster. 
>  * Started JobManager as a foreground process (bin/jobmanager.sh 
> start-foreground cluster).  I updated bin/flink-console.sh to reference 
> logback.xml via -Dlogback.configurationFile=file:/path/to/logfile.
>  * Removed log4j jars under libs/  (log4j-1.2.xx.jar and 
> sfl4j-log4j12-xxx.jar)
>  * Added logback jars under libs/   (logback-classic, logback-core, 
> log4j-over-slf4j.jar) 
> However, I am not getting any file created.   Also, as a dumb test I 
> referenced a non-existent logback.xml file (changed path to a non-existent 
> folder) just to see if any errors appear on stdout, but nothing.
> Thanks
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for FLIP-6

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461503#comment-16461503
 ] 

ASF GitHub Bot commented on FLINK-8286:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185606093
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   return null;
+   }
 
-   // configure local directory
-   if (configuration.contains(CoreOptions.TMP_DIRS)) {
-   LOG.info("Overriding YARN's temporary file 
directories with those " +
-   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-   }
-   else {
-   LOG.info("Setting directories for temporary 
files to: {}", localDirs);
-   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-   }
-
-   // tell akka to die in case of an error
-   
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+   // configure local directory
+   if (configuration.contains(CoreOptions.TMP_DIRS)) {
+   LOG.info("Overriding YARN's temporary file directories 
with those " +
+   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
+   }
+   else {
+   LOG.info("Setting directories for te

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

2018-05-02 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185606093
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   return null;
+   }
 
-   // configure local directory
-   if (configuration.contains(CoreOptions.TMP_DIRS)) {
-   LOG.info("Overriding YARN's temporary file 
directories with those " +
-   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-   }
-   else {
-   LOG.info("Setting directories for temporary 
files to: {}", localDirs);
-   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-   }
-
-   // tell akka to die in case of an error
-   
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+   // configure local directory
+   if (configuration.contains(CoreOptions.TMP_DIRS)) {
+   LOG.info("Overriding YARN's temporary file directories 
with those " +
+   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
+   }
+   else {
+   LOG.info("Setting directories for temporary files to: 
{}", localDirs);
+   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
+   }
 
-   String keytabPath = null;
-   if (remoteKeytabPath != null) {
-  

[jira] [Commented] (FLINK-9136) Remove StreamingProgramTestBase

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461488#comment-16461488
 ] 

ASF GitHub Bot commented on FLINK-9136:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5817


> Remove StreamingProgramTestBase
> ---
>
> Key: FLINK-9136
> URL: https://issues.apache.org/jira/browse/FLINK-9136
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.6.0
>
>
> The {{StreamingProgramTestBase}} should be removed. We can move all existing 
> tests to the {{AbstractTestBase}} with junit annotations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5817: [FLINK-9136][tests] Remove StreamingProgramTestBas...

2018-05-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5817


---


[jira] [Closed] (FLINK-9136) Remove StreamingProgramTestBase

2018-05-02 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9136.
---
Resolution: Fixed

master: de10b40095bc0109faec3874d452b33586f35e7a

> Remove StreamingProgramTestBase
> ---
>
> Key: FLINK-9136
> URL: https://issues.apache.org/jira/browse/FLINK-9136
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.6.0
>
>
> The {{StreamingProgramTestBase}} should be removed. We can move all existing 
> tests to the {{AbstractTestBase}} with junit annotations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9125) MiniClusterResource should set CoreOptions.TMP_DIRS

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461473#comment-16461473
 ] 

ASF GitHub Bot commented on FLINK-9125:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5933#discussion_r185596918
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -137,6 +142,7 @@ public int getWebUIPort() {
 
@Override
public void before() throws Exception {
+   temporaryFolder.create();
--- End diff --

good point


> MiniClusterResource should set CoreOptions.TMP_DIRS
> ---
>
> Key: FLINK-9125
> URL: https://issues.apache.org/jira/browse/FLINK-9125
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Major
>
> We've frequently seen test stability issues when creating temporary file that 
> use hard-coded paths, like {{/tmp}}, and generally tried to instead use 
> {{TemporaryFolders}} instead.
> The {{CoreOptions.TMP_DIRS}} option is used by some components to determine 
> where temporary files should be placed. By default this points to 
> {{System.getProperty("java.io.tmpdir")}}, which usually points to {{/tmp}}.
> This property is rarely set explicitly by tests, which leads to failures such 
> as this: https://travis-ci.org/apache/flink/jobs/361515791
> {code}
> org.apache.flink.test.api.java.operators.lambdas.JoinITCase  Time elapsed: 
> 3.025 sec  <<< ERROR!
> java.io.IOException: Could not create root directory for local recovery: 
> /tmp/localState
>   at 
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.(TaskExecutorLocalStateStoresManager.java:89)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:284)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:303)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:738)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:306)
>   at 
> org.apache.flink.test.util.MiniClusterResource.startMiniCluster(MiniClusterResource.java:217)
>   at 
> org.apache.flink.test.util.MiniClusterResource.startJobExecutorService(MiniClusterResource.java:171)
>   at 
> org.apache.flink.test.util.MiniClusterResource.before(MiniClusterResource.java:121)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> {code}
> It would be nice if the MiniClusterResource inherently contains a 
> {{TemporaryFolder}} which is used to configure {{CoreOptions.TMP_DIRS}} as 
> well as other tmp dir options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5933: [FLINK-9125] MiniClusterResource should set CoreOp...

2018-05-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5933#discussion_r185596918
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -137,6 +142,7 @@ public int getWebUIPort() {
 
@Override
public void before() throws Exception {
+   temporaryFolder.create();
--- End diff --

good point


---


[jira] [Commented] (FLINK-9288) clarify a few points in the event time / watermark docs

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461415#comment-16461415
 ] 

ASF GitHub Bot commented on FLINK-9288:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5949#discussion_r185588020
  
--- Diff: docs/dev/event_time.md ---
@@ -35,30 +35,32 @@ Flink supports different notions of *time* in streaming 
programs.
 respective operation.
 
 When a streaming program runs on processing time, all time-based 
operations (like time windows) will
-use the system clock of the machines that run the respective operator. 
For example, an hourly
+use the system clock of the machines that run the respective operator. 
An hourly
 processing time window will include all records that arrived at a 
specific operator between the
-times when the system clock indicated the full hour.
+times when the system clock indicated the full hour. For example, if 
an application
+begins running at 9:15am, the first hourly processing time window will 
include events
+processed between 9:15am and 10:00am, the next window will include 
events processed between 10:00am and 11:00am, and so on.
 
 Processing time is the simplest notion of time and requires no 
coordination between streams and machines.
 It provides the best performance and the lowest latency. However, in 
distributed and asynchronous
 environments processing time does not provide determinism, because it 
is susceptible to the speed at which
-records arrive in the system (for example from the message queue), and 
to the speed at which the
-records flow between operators inside the system.
+records arrive in the system (for example from the message queue), to 
the speed at which the
+records flow between operators inside the system, and to outages 
(scheduled, or otherwise).
 
 - **Event time:** Event time is the time that each individual event 
occurred on its producing device.
-This time is typically embedded within the records before they enter 
Flink and that *event timestamp*
-can be extracted from the record. An hourly event time window will 
contain all records that carry an
-event timestamp that falls into that hour, regardless of when the 
records arrive, and in what order
-they arrive.
+This time is typically embedded within the records before they enter 
Flink, and that *event timestamp*
+can be extracted from each record. An hourly event time window will 
contain all records that carry an
--- End diff --

better mention allowed lateness here. “...will contain all records, ..., 
regardless of when the records arrive” sounds too absolute, the guarantee can 
only be achieved with lateness requirements


> clarify a few points in the event time / watermark docs
> ---
>
> Key: FLINK-9288
> URL: https://issues.apache.org/jira/browse/FLINK-9288
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
>Priority: Minor
> Fix For: 1.5.0, 1.6.0
>
>
> There are a few things that folks often seem to miss when reading the event 
> time and watermark docs. Adding a couple of sentences and a couple of links 
> should help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5949: [FLINK-9288][docs] clarify the event time / waterm...

2018-05-02 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5949#discussion_r185588020
  
--- Diff: docs/dev/event_time.md ---
@@ -35,30 +35,32 @@ Flink supports different notions of *time* in streaming 
programs.
 respective operation.
 
 When a streaming program runs on processing time, all time-based 
operations (like time windows) will
-use the system clock of the machines that run the respective operator. 
For example, an hourly
+use the system clock of the machines that run the respective operator. 
An hourly
 processing time window will include all records that arrived at a 
specific operator between the
-times when the system clock indicated the full hour.
+times when the system clock indicated the full hour. For example, if 
an application
+begins running at 9:15am, the first hourly processing time window will 
include events
+processed between 9:15am and 10:00am, the next window will include 
events processed between 10:00am and 11:00am, and so on.
 
 Processing time is the simplest notion of time and requires no 
coordination between streams and machines.
 It provides the best performance and the lowest latency. However, in 
distributed and asynchronous
 environments processing time does not provide determinism, because it 
is susceptible to the speed at which
-records arrive in the system (for example from the message queue), and 
to the speed at which the
-records flow between operators inside the system.
+records arrive in the system (for example from the message queue), to 
the speed at which the
+records flow between operators inside the system, and to outages 
(scheduled, or otherwise).
 
 - **Event time:** Event time is the time that each individual event 
occurred on its producing device.
-This time is typically embedded within the records before they enter 
Flink and that *event timestamp*
-can be extracted from the record. An hourly event time window will 
contain all records that carry an
-event timestamp that falls into that hour, regardless of when the 
records arrive, and in what order
-they arrive.
+This time is typically embedded within the records before they enter 
Flink, and that *event timestamp*
+can be extracted from each record. An hourly event time window will 
contain all records that carry an
--- End diff --

better mention allowed lateness here. “...will contain all records, ..., 
regardless of when the records arrive” sounds too absolute, the guarantee can 
only be achieved with lateness requirements


---


[jira] [Assigned] (FLINK-8726) Code highlighting partially broken

2018-05-02 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-8726:
---

Assignee: Chesnay Schepler

> Code highlighting partially broken
> --
>
> Key: FLINK-8726
> URL: https://issues.apache.org/jira/browse/FLINK-8726
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> With the recent changes around the documentation build dependencies code 
> highlighting is no longer fully working.
> Sections as below are rendered without any background [like 
> here|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html].
> {code}
> ~~~bash
> # get the hadoop2 package from the Flink download page at
> # {{ site.download_url }}
> curl -O 
> tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
> cd flink-{{ site.version }}/
> ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
> ./examples/batch/WordCount.jar
> ~~~
> {code}
> Sections using the {{\{% highlight java %}}} syntax are still working.
> We may have to do a sweep over the docs and port all code sections to the 
> working syntax.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9218) Docs for old versions aren't building successfully

2018-05-02 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9218:

Description: 
The [1.0|https://ci.apache.org/builders/flink-docs-release-1.0], 
[1.1|https://ci.apache.org/builders/flink-docs-release-1.1] and 
[1.2|https://ci.apache.org/builders/flink-docs-release-1.2] docs aren't 
currently building.

My guess is this has to do with the dependency changes we recently made.

  was:
The [1.0|https://ci.apache.org/builders/flink-docs-release-1.0], 
[1.1|https://ci.apache.org/builders/flink-docs-release-1.1] and 
[1.2|https://ci.apache.org/builders/flink-docs-release-1.2] aren't currently 
building.

My guess is this has to do with the dependency changes we recently made.


> Docs for old versions aren't building successfully
> --
>
> Key: FLINK-9218
> URL: https://issues.apache.org/jira/browse/FLINK-9218
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Project Website
>Affects Versions: 1.0.3, 1.1.4, 1.2.1
>Reporter: Chesnay Schepler
>Priority: Major
>
> The [1.0|https://ci.apache.org/builders/flink-docs-release-1.0], 
> [1.1|https://ci.apache.org/builders/flink-docs-release-1.1] and 
> [1.2|https://ci.apache.org/builders/flink-docs-release-1.2] docs aren't 
> currently building.
> My guess is this has to do with the dependency changes we recently made.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-05-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5928#discussion_r185569814
  
--- Diff: docs/dev/stream/state/checkpointing.md ---
@@ -137,11 +137,9 @@ Some more parameters and/or defaults may be set via 
`conf/flink-conf.yaml` (see
-  `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's 
memory. Should be used only for minimal state (Kafka offsets) or testing and 
local debugging.
-  `filesystem`: State is in-memory on the TaskManagers, and state 
snapshots are stored in a file system. Supported are all filesystems supported 
by Flink, for example HDFS, S3, ...
 
-- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a 
Flink supported filesystem. Note: State backend must be accessible from the 
JobManager, use `file://` only for local setups.
+- `state.checkpoints.dir`: The target directory for storing checkpoints 
data files and meta data of [externalized checkpoints]({{ site.baseurl 
}}/ops/state/checkpoints.html#externalized-checkpoints) in a Flink supported 
filesystem. Note: the storage path must be accessible from all participating 
processes/nodes(i.e. all TaskManagers and JobManagers).
--- End diff --

in fact, you could replace this entire section with `{% include 
generated/checkpointing_configuration.html %}`


---


[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-05-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5928#discussion_r185569523
  
--- Diff: docs/dev/stream/state/checkpointing.md ---
@@ -137,11 +137,9 @@ Some more parameters and/or defaults may be set via 
`conf/flink-conf.yaml` (see
-  `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's 
memory. Should be used only for minimal state (Kafka offsets) or testing and 
local debugging.
-  `filesystem`: State is in-memory on the TaskManagers, and state 
snapshots are stored in a file system. Supported are all filesystems supported 
by Flink, for example HDFS, S3, ...
 
-- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a 
Flink supported filesystem. Note: State backend must be accessible from the 
JobManager, use `file://` only for local setups.
+- `state.checkpoints.dir`: The target directory for storing checkpoints 
data files and meta data of [externalized checkpoints]({{ site.baseurl 
}}/ops/state/checkpoints.html#externalized-checkpoints) in a Flink supported 
filesystem. Note: the storage path must be accessible from all participating 
processes/nodes(i.e. all TaskManagers and JobManagers).
--- End diff --

please sync these entries with the descriptions in `CheckpointingOptions`, 
so that they are identical with the one seen on the configuration page.


---


[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-05-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5928#discussion_r185568867
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -60,25 +60,29 @@ The `ExternalizedCheckpointCleanup` mode configures 
what happens with externaliz
 
 Similarly to [savepoints](savepoints.html), an externalized checkpoint 
consists
 of a meta data file and, depending on the state back-end, some additional 
data
-files. The **target directory** for the externalized checkpoint's meta 
data is
-determined from the configuration key `state.checkpoints.dir` which, 
currently,
-can only be set via the configuration files.
+files. The externalized checkpoint's meta data is stored in the same 
directory 
+as data files. So the **target directory** can be set via configuration 
key 
+`state.checkpoints.dir` in the configuration files, and also can be 
specified 
+for per job in the code.
 
+- Configure globally via configuration files
 ```
 state.checkpoints.dir: hdfs:///checkpoints/
 ```
 
+- Configure for per job via code 
+```java
--- End diff --

please use `{% highlight java %}` syntax instead


---


[jira] [Commented] (FLINK-9125) MiniClusterResource should set CoreOptions.TMP_DIRS

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461314#comment-16461314
 ] 

ASF GitHub Bot commented on FLINK-9125:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5933#discussion_r184928655
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -59,6 +61,9 @@
 
public static final String NEW_CODEBASE = "new";
 
+   @Rule
--- End diff --

does this annotation actually work in arbitrary classes?


> MiniClusterResource should set CoreOptions.TMP_DIRS
> ---
>
> Key: FLINK-9125
> URL: https://issues.apache.org/jira/browse/FLINK-9125
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Major
>
> We've frequently seen test stability issues when creating temporary file that 
> use hard-coded paths, like {{/tmp}}, and generally tried to instead use 
> {{TemporaryFolders}} instead.
> The {{CoreOptions.TMP_DIRS}} option is used by some components to determine 
> where temporary files should be placed. By default this points to 
> {{System.getProperty("java.io.tmpdir")}}, which usually points to {{/tmp}}.
> This property is rarely set explicitly by tests, which leads to failures such 
> as this: https://travis-ci.org/apache/flink/jobs/361515791
> {code}
> org.apache.flink.test.api.java.operators.lambdas.JoinITCase  Time elapsed: 
> 3.025 sec  <<< ERROR!
> java.io.IOException: Could not create root directory for local recovery: 
> /tmp/localState
>   at 
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.(TaskExecutorLocalStateStoresManager.java:89)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:284)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:303)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:738)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:306)
>   at 
> org.apache.flink.test.util.MiniClusterResource.startMiniCluster(MiniClusterResource.java:217)
>   at 
> org.apache.flink.test.util.MiniClusterResource.startJobExecutorService(MiniClusterResource.java:171)
>   at 
> org.apache.flink.test.util.MiniClusterResource.before(MiniClusterResource.java:121)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> {code}
> It would be nice if the MiniClusterResource inherently contains a 
> {{TemporaryFolder}} which is used to configure {{CoreOptions.TMP_DIRS}} as 
> well as other tmp dir options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9119) example code error in Concepts & Common API

2018-05-02 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9119.
---
   Resolution: Fixed
Fix Version/s: 1.4.3
   1.5.0

master: 0973e34b3c596c23df11335b3b8e657e85dd12e6
1.5: 108d405aecbe14edd8f919e28d260a8f4fc513ee
1.4: e776215bc451a8a78ef844bf70b7eeec31999a10

> example code error in Concepts & Common API
> ---
>
> Key: FLINK-9119
> URL: https://issues.apache.org/jira/browse/FLINK-9119
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.2
>Reporter: chillon.m
>Assignee: vinoyang
>Priority: Minor
> Fix For: 1.5.0, 1.4.3
>
>
> // Table is the result of a simple projection query Table projTable = 
> tableEnv.scan("X").project(...);
> {{scan return a table object,not have a project method.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5933: [FLINK-9125] MiniClusterResource should set CoreOp...

2018-05-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5933#discussion_r184928655
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -59,6 +61,9 @@
 
public static final String NEW_CODEBASE = "new";
 
+   @Rule
--- End diff --

does this annotation actually work in arbitrary classes?


---


[jira] [Commented] (FLINK-9125) MiniClusterResource should set CoreOptions.TMP_DIRS

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461315#comment-16461315
 ] 

ASF GitHub Bot commented on FLINK-9125:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5933#discussion_r184928993
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -137,6 +142,7 @@ public int getWebUIPort() {
 
@Override
public void before() throws Exception {
+   temporaryFolder.create();
--- End diff --

could be necessary if the `@Rule` annotation doesn't work in arbitrary 
(i..e. non-test) classes. Then we have to manage it manually, with `create()` 
and `before()`.


> MiniClusterResource should set CoreOptions.TMP_DIRS
> ---
>
> Key: FLINK-9125
> URL: https://issues.apache.org/jira/browse/FLINK-9125
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Major
>
> We've frequently seen test stability issues when creating temporary file that 
> use hard-coded paths, like {{/tmp}}, and generally tried to instead use 
> {{TemporaryFolders}} instead.
> The {{CoreOptions.TMP_DIRS}} option is used by some components to determine 
> where temporary files should be placed. By default this points to 
> {{System.getProperty("java.io.tmpdir")}}, which usually points to {{/tmp}}.
> This property is rarely set explicitly by tests, which leads to failures such 
> as this: https://travis-ci.org/apache/flink/jobs/361515791
> {code}
> org.apache.flink.test.api.java.operators.lambdas.JoinITCase  Time elapsed: 
> 3.025 sec  <<< ERROR!
> java.io.IOException: Could not create root directory for local recovery: 
> /tmp/localState
>   at 
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.(TaskExecutorLocalStateStoresManager.java:89)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:284)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:303)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:738)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:306)
>   at 
> org.apache.flink.test.util.MiniClusterResource.startMiniCluster(MiniClusterResource.java:217)
>   at 
> org.apache.flink.test.util.MiniClusterResource.startJobExecutorService(MiniClusterResource.java:171)
>   at 
> org.apache.flink.test.util.MiniClusterResource.before(MiniClusterResource.java:121)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> {code}
> It would be nice if the MiniClusterResource inherently contains a 
> {{TemporaryFolder}} which is used to configure {{CoreOptions.TMP_DIRS}} as 
> well as other tmp dir options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9119) example code error in Concepts & Common API

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461316#comment-16461316
 ] 

ASF GitHub Bot commented on FLINK-9119:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5935


> example code error in Concepts & Common API
> ---
>
> Key: FLINK-9119
> URL: https://issues.apache.org/jira/browse/FLINK-9119
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.2
>Reporter: chillon.m
>Assignee: vinoyang
>Priority: Minor
> Fix For: 1.5.0, 1.4.3
>
>
> // Table is the result of a simple projection query Table projTable = 
> tableEnv.scan("X").project(...);
> {{scan return a table object,not have a project method.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5935: [FLINK-9119] example code error in Concepts & Comm...

2018-05-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5935


---


[GitHub] flink pull request #5933: [FLINK-9125] MiniClusterResource should set CoreOp...

2018-05-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5933#discussion_r184928993
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -137,6 +142,7 @@ public int getWebUIPort() {
 
@Override
public void before() throws Exception {
+   temporaryFolder.create();
--- End diff --

could be necessary if the `@Rule` annotation doesn't work in arbitrary 
(i..e. non-test) classes. Then we have to manage it manually, with `create()` 
and `before()`.


---


[jira] [Commented] (FLINK-9119) example code error in Concepts & Common API

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461309#comment-16461309
 ] 

ASF GitHub Bot commented on FLINK-9119:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5935
  
merging


> example code error in Concepts & Common API
> ---
>
> Key: FLINK-9119
> URL: https://issues.apache.org/jira/browse/FLINK-9119
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.2
>Reporter: chillon.m
>Assignee: vinoyang
>Priority: Minor
>
> // Table is the result of a simple projection query Table projTable = 
> tableEnv.scan("X").project(...);
> {{scan return a table object,not have a project method.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5935: [FLINK-9119] example code error in Concepts & Common API

2018-05-02 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5935
  
merging


---


[jira] [Updated] (FLINK-9136) Remove StreamingProgramTestBase

2018-05-02 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9136:

Fix Version/s: 1.6.0

> Remove StreamingProgramTestBase
> ---
>
> Key: FLINK-9136
> URL: https://issues.apache.org/jira/browse/FLINK-9136
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.6.0
>
>
> The {{StreamingProgramTestBase}} should be removed. We can move all existing 
> tests to the {{AbstractTestBase}} with junit annotations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9216) Fix comparator violation

2018-05-02 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9216.
---
Resolution: Fixed

1.4: 9b59e6ff5adeeef63a696c2837a47b77f4741a57

> Fix comparator violation
> 
>
> Key: FLINK-9216
> URL: https://issues.apache.org/jira/browse/FLINK-9216
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
> Environment: {{JSONGenerator}} uses an improper {{Comparator}} for 
> sorting Operator ID, which might cause 
> {{java.lang.IllegalArgumentException: Comparison method violates its general 
> contract!}}
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9061) add entropy to s3 path for better scalability

2018-05-02 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu reassigned FLINK-9061:
-

Assignee: Indrajit Roychoudhury
 Summary: add entropy to s3 path for better scalability  (was: S3 
checkpoint data not partitioned well -- causes errors and poor performance)

> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461254#comment-16461254
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5580
  
I've addressed @zentol last comments and rebased to fix conflicts. Will 
merge after travis gives green.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

2018-05-02 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5580
  
I've addressed @zentol last comments and rebased to fix conflicts. Will 
merge after travis gives green.


---


[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator

2018-05-02 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461231#comment-16461231
 ] 

Stefan Richter commented on FLINK-9268:
---

I don't think that is the cause. The exception comes right from the RocksDB JNI 
bridge and that is a known cause for exactly this exception. From the stack 
trace I can see that your job is using triggers in processing time and maybe it 
was just running a bit slower without incremental checkpoints, so the windows 
could not collect enough data to exceed the limit before firing on processing 
time.

> RockDB errors from WindowOperator
> -
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and 
> uses processing time. The job fails with the error given below after running 
> for few hours. The only way to recover from this error is to cancel the job 
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=90
> checkpoint.timeout=30
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461225#comment-16461225
 ] 

ASF GitHub Bot commented on FLINK-8978:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5947#discussion_r185541401
  
--- Diff: 
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import 
org.apache.flink.streaming.tests.artificialstate.eventpayload.ComplexPayload;
+
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test upgrade of generic stateful job for Flink's DataStream API 
operators and primitives.
+ *
+ * The job is constructed of generic components from {@link 
DataStreamAllroundTestJobFactory}.
+ * The gaol is to test successful state restoration after taking savepoint 
and recovery with new job version.
+ * It can be configured with '--test.job.variant' to run different 
variants of it:
+ * 
+ * original: includes 2 custom stateful map operators
+ * upgraded: changes order of 2 custom stateful map 
operators and adds one more
+ * 
+ */
+public class StatefulStreamJobUpgradeTestProgram {
+   private static final String TEST_JOB_VARIANT_ORIGINAL = "original";
+   private static final String TEST_JOB_VARIANT_UPGRADED = "upgraded";
+
+   private static final JoinFunction SIMPLE_STATE_UPDATE =
+   (Event first, ComplexPayload second) -> new 
ComplexPayload(first);
+   private static final JoinFunction LAST_EVENT_STATE_UPDATE =
+   (Event first, ComplexPayload second) ->
+   (second != null && first.getEventTime() <= 
second.getEventTime()) ? second : new ComplexPayload(first);
+
+   private static final ConfigOption TEST_JOB_VARIANT = 
ConfigOptions
+   .key("test.job.variant")
+   .defaultValue(TEST_JOB_VARIANT_ORIGINAL)
+   .withDescription(String.format("This configures the job variant 
to test. Can be '%s' or '%s'",
+   TEST_JOB_VARIANT_ORIGINAL, TEST_JOB_VARIANT_UPGRADED));
+
+   public static void main(String[] args) throws Exception {
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   setupEnvironment(env, pt);
+
+   KeyedStream source = 
env.addSource(createEventSource(pt))
+   
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
  

[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461223#comment-16461223
 ] 

ASF GitHub Bot commented on FLINK-8978:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5947#discussion_r185537151
  
--- Diff: 
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import 
org.apache.flink.streaming.tests.artificialstate.eventpayload.ComplexPayload;
+
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test upgrade of generic stateful job for Flink's DataStream API 
operators and primitives.
+ *
+ * The job is constructed of generic components from {@link 
DataStreamAllroundTestJobFactory}.
+ * The gaol is to test successful state restoration after taking savepoint 
and recovery with new job version.
--- End diff --

typo `gaol`


> End-to-end test: Job upgrade
> 
>
> Key: FLINK-8978
> URL: https://issues.apache.org/jira/browse/FLINK-8978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Job upgrades usually happen during the lifetime of a real world Flink job. 
> Therefore, we should add an end-to-end test which exactly covers this 
> scenario. I suggest to do the follwoing:
> # run the general purpose testing job FLINK-8971
> # take a savepoint
> # Modify the job by introducing a new operator and changing the order of 
> others
> # Resume the modified job from the savepoint
> # Verify that everything went correctly



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461224#comment-16461224
 ] 

ASF GitHub Bot commented on FLINK-8978:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5947#discussion_r185537704
  
--- Diff: 
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import 
org.apache.flink.streaming.tests.artificialstate.eventpayload.ComplexPayload;
+
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test upgrade of generic stateful job for Flink's DataStream API 
operators and primitives.
+ *
+ * The job is constructed of generic components from {@link 
DataStreamAllroundTestJobFactory}.
+ * The gaol is to test successful state restoration after taking savepoint 
and recovery with new job version.
+ * It can be configured with '--test.job.variant' to run different 
variants of it:
+ * 
+ * original: includes 2 custom stateful map operators
+ * upgraded: changes order of 2 custom stateful map 
operators and adds one more
+ * 
+ */
--- End diff --

I think we should add into the comment on job classes all possible 
configuration options are in the comment of `DataStreamAllroundTestJobFactory` 
so that user can easily find them.


> End-to-end test: Job upgrade
> 
>
> Key: FLINK-8978
> URL: https://issues.apache.org/jira/browse/FLINK-8978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Job upgrades usually happen during the lifetime of a real world Flink job. 
> Therefore, we should add an end-to-end test which exactly covers this 
> scenario. I suggest to do the follwoing:
> # run the general purpose testing job FLINK-8971
> # take a savepoint
> # Modify the job by introducing a new operator and changing the order of 
> others
> # Resume the modified job from the savepoint
> # Verify that everything went correctly



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5947: [FLINK-8978] Stateful generic stream job upgrade e...

2018-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5947#discussion_r185537704
  
--- Diff: 
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import 
org.apache.flink.streaming.tests.artificialstate.eventpayload.ComplexPayload;
+
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test upgrade of generic stateful job for Flink's DataStream API 
operators and primitives.
+ *
+ * The job is constructed of generic components from {@link 
DataStreamAllroundTestJobFactory}.
+ * The gaol is to test successful state restoration after taking savepoint 
and recovery with new job version.
+ * It can be configured with '--test.job.variant' to run different 
variants of it:
+ * 
+ * original: includes 2 custom stateful map operators
+ * upgraded: changes order of 2 custom stateful map 
operators and adds one more
+ * 
+ */
--- End diff --

I think we should add into the comment on job classes all possible 
configuration options are in the comment of `DataStreamAllroundTestJobFactory` 
so that user can easily find them.


---


[GitHub] flink pull request #5947: [FLINK-8978] Stateful generic stream job upgrade e...

2018-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5947#discussion_r185537151
  
--- Diff: 
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import 
org.apache.flink.streaming.tests.artificialstate.eventpayload.ComplexPayload;
+
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test upgrade of generic stateful job for Flink's DataStream API 
operators and primitives.
+ *
+ * The job is constructed of generic components from {@link 
DataStreamAllroundTestJobFactory}.
+ * The gaol is to test successful state restoration after taking savepoint 
and recovery with new job version.
--- End diff --

typo `gaol`


---


[GitHub] flink pull request #5947: [FLINK-8978] Stateful generic stream job upgrade e...

2018-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5947#discussion_r185541401
  
--- Diff: 
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import 
org.apache.flink.streaming.tests.artificialstate.eventpayload.ComplexPayload;
+
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test upgrade of generic stateful job for Flink's DataStream API 
operators and primitives.
+ *
+ * The job is constructed of generic components from {@link 
DataStreamAllroundTestJobFactory}.
+ * The gaol is to test successful state restoration after taking savepoint 
and recovery with new job version.
+ * It can be configured with '--test.job.variant' to run different 
variants of it:
+ * 
+ * original: includes 2 custom stateful map operators
+ * upgraded: changes order of 2 custom stateful map 
operators and adds one more
+ * 
+ */
+public class StatefulStreamJobUpgradeTestProgram {
+   private static final String TEST_JOB_VARIANT_ORIGINAL = "original";
+   private static final String TEST_JOB_VARIANT_UPGRADED = "upgraded";
+
+   private static final JoinFunction SIMPLE_STATE_UPDATE =
+   (Event first, ComplexPayload second) -> new 
ComplexPayload(first);
+   private static final JoinFunction LAST_EVENT_STATE_UPDATE =
+   (Event first, ComplexPayload second) ->
+   (second != null && first.getEventTime() <= 
second.getEventTime()) ? second : new ComplexPayload(first);
+
+   private static final ConfigOption TEST_JOB_VARIANT = 
ConfigOptions
+   .key("test.job.variant")
+   .defaultValue(TEST_JOB_VARIANT_ORIGINAL)
+   .withDescription(String.format("This configures the job variant 
to test. Can be '%s' or '%s'",
+   TEST_JOB_VARIANT_ORIGINAL, TEST_JOB_VARIANT_UPGRADED));
+
+   public static void main(String[] args) throws Exception {
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   setupEnvironment(env, pt);
+
+   KeyedStream source = 
env.addSource(createEventSource(pt))
+   
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+   .keyBy(Event::getKey);
+
+   List> stateSer =
+   Collections.singletonList(new 
KryoSerializer<>(ComplexPayload.class, env.getConfig()));
+
+   boolean isOriginal = 
pt.get(TEST_JOB

[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator

2018-05-02 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461215#comment-16461215
 ] 

Narayanan Arunachalam commented on FLINK-9268:
--

Not sure it is related but I don't see this error with incremental 
checkpointing disabled.

> RockDB errors from WindowOperator
> -
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and 
> uses processing time. The job fails with the error given below after running 
> for few hours. The only way to recover from this error is to cancel the job 
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=90
> checkpoint.timeout=30
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-6306) Sink for eventually consistent file systems

2018-05-02 Thread Seth Wiesman (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seth Wiesman closed FLINK-6306.
---
Resolution: Won't Do

Instead of having multiple filesystem sink, improvements will be made to the 
bucketing sink to support eventually consistent stores. 

> Sink for eventually consistent file systems
> ---
>
> Key: FLINK-6306
> URL: https://issues.apache.org/jira/browse/FLINK-6306
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
> Attachments: eventually-consistent-sink
>
>
> Currently Flink provides the BucketingSink as an exactly once method for 
> writing out to a file system. It provides these guarantees by moving files 
> through several stages and deleting or truncating files that get into a bad 
> state. While this is a powerful abstraction, it causes issues with eventually 
> consistent file systems such as Amazon's S3 where most operations (ie rename, 
> delete, truncate) are not guaranteed to become consistent within a reasonable 
> amount of time. Flink should provide a sink that provides exactly once writes 
> to a file system where only PUT operations are considered consistent. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6306) Sink for eventually consistent file systems

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461205#comment-16461205
 ] 

ASF GitHub Bot commented on FLINK-6306:
---

Github user sjwiesman commented on the issue:

https://github.com/apache/flink/pull/4607
  
* clicked the wrong button 


> Sink for eventually consistent file systems
> ---
>
> Key: FLINK-6306
> URL: https://issues.apache.org/jira/browse/FLINK-6306
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
> Attachments: eventually-consistent-sink
>
>
> Currently Flink provides the BucketingSink as an exactly once method for 
> writing out to a file system. It provides these guarantees by moving files 
> through several stages and deleting or truncating files that get into a bad 
> state. While this is a powerful abstraction, it causes issues with eventually 
> consistent file systems such as Amazon's S3 where most operations (ie rename, 
> delete, truncate) are not guaranteed to become consistent within a reasonable 
> amount of time. Flink should provide a sink that provides exactly once writes 
> to a file system where only PUT operations are considered consistent. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4607: [FLINK-6306][connectors] Sink for eventually consi...

2018-05-02 Thread sjwiesman
Github user sjwiesman closed the pull request at:

https://github.com/apache/flink/pull/4607


---


[jira] [Commented] (FLINK-6306) Sink for eventually consistent file systems

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461206#comment-16461206
 ] 

ASF GitHub Bot commented on FLINK-6306:
---

Github user sjwiesman closed the pull request at:

https://github.com/apache/flink/pull/4607


> Sink for eventually consistent file systems
> ---
>
> Key: FLINK-6306
> URL: https://issues.apache.org/jira/browse/FLINK-6306
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
> Attachments: eventually-consistent-sink
>
>
> Currently Flink provides the BucketingSink as an exactly once method for 
> writing out to a file system. It provides these guarantees by moving files 
> through several stages and deleting or truncating files that get into a bad 
> state. While this is a powerful abstraction, it causes issues with eventually 
> consistent file systems such as Amazon's S3 where most operations (ie rename, 
> delete, truncate) are not guaranteed to become consistent within a reasonable 
> amount of time. Flink should provide a sink that provides exactly once writes 
> to a file system where only PUT operations are considered consistent. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #4607: [FLINK-6306][connectors] Sink for eventually consistent f...

2018-05-02 Thread sjwiesman
Github user sjwiesman commented on the issue:

https://github.com/apache/flink/pull/4607
  
* clicked the wrong button 


---


[jira] [Commented] (FLINK-6306) Sink for eventually consistent file systems

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461202#comment-16461202
 ] 

ASF GitHub Bot commented on FLINK-6306:
---

Github user sjwiesman commented on the issue:

https://github.com/apache/flink/pull/4607
  
I'm going to go ahead and close this pr and issue to avoid confusion. 


> Sink for eventually consistent file systems
> ---
>
> Key: FLINK-6306
> URL: https://issues.apache.org/jira/browse/FLINK-6306
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
> Attachments: eventually-consistent-sink
>
>
> Currently Flink provides the BucketingSink as an exactly once method for 
> writing out to a file system. It provides these guarantees by moving files 
> through several stages and deleting or truncating files that get into a bad 
> state. While this is a powerful abstraction, it causes issues with eventually 
> consistent file systems such as Amazon's S3 where most operations (ie rename, 
> delete, truncate) are not guaranteed to become consistent within a reasonable 
> amount of time. Flink should provide a sink that provides exactly once writes 
> to a file system where only PUT operations are considered consistent. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #4607: [FLINK-6306][connectors] Sink for eventually consistent f...

2018-05-02 Thread sjwiesman
Github user sjwiesman commented on the issue:

https://github.com/apache/flink/pull/4607
  
I'm going to go ahead and close this pr and issue to avoid confusion. 


---


[jira] [Assigned] (FLINK-9257) End-to-end tests prints "All tests PASS" even if individual test-script returns non-zero exit code

2018-05-02 Thread Florian Schmidt (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florian Schmidt reassigned FLINK-9257:
--

Assignee: Florian Schmidt

> End-to-end tests prints "All tests PASS" even if individual test-script 
> returns non-zero exit code
> --
>
> Key: FLINK-9257
> URL: https://issues.apache.org/jira/browse/FLINK-9257
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Critical
> Fix For: 1.5.0
>
>
> In some cases the test-suite exits with non-zero exit code but still prints 
> "All tests PASS" to stdout. This happens because how the test runner works, 
> which is roughly as follows
>  # Either run-nightly-tests.sh or run-precommit-tests.sh executes a suite of 
> tests consisting of one multiple bash scripts.
>  # As soon as one of those bash scripts exists with non-zero exit code, the 
> tests won't continue to run and the test-suite will also exit with non-zero 
> exit code.
>  # *During the cleanup hook (trap cleanup EXIT in common.sh) it will be 
> checked whether there are non-empty out files or log files with certain 
> exceptions. If a tests fails with non-zero exit code, but does not have any 
> exceptions or .out files, this will still print "All tests PASS" to stdout, 
> even though they don't*
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8297) RocksDBListState stores whole list in single byte[]

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461186#comment-16461186
 ] 

ASF GitHub Bot commented on FLINK-8297:
---

Github user je-ik commented on the issue:

https://github.com/apache/flink/pull/5185
  
@StefanRRichter I think that was exactly the initial idea, but then we came 
into troubles with the savepoints and changing list type. Also as @aljoscha 
mentioned, it can be confusing for users to see `MapState` instead of 
`ListState` after inspecting the savepoint. Unfortunately, I currently don't 
have time to work on this, so if anyone would be interested in getting this 
done, that would be awesome.


> RocksDBListState stores whole list in single byte[]
> ---
>
> Key: FLINK-8297
> URL: https://issues.apache.org/jira/browse/FLINK-8297
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Jan Lukavský
>Priority: Major
>
> RocksDBListState currently keeps whole list of data in single RocksDB 
> key-value pair, which implies that the list actually must fit into memory. 
> Larger lists are not supported and end up with OOME or other error. The 
> RocksDBListState could be modified so that individual items in list are 
> stored in separate keys in RocksDB and can then be iterated over. A simple 
> implementation could reuse existing RocksDBMapState, with key as index to the 
> list and a single RocksDBValueState keeping track of how many items has 
> already been added to the list. Because this implementation might be less 
> efficient in come cases, it would be good to make it opt-in by a construct 
> like
> {{new RocksDBStateBackend().enableLargeListsPerKey()}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5185: [FLINK-8297] [flink-rocksdb] Optionally store elements of...

2018-05-02 Thread je-ik
Github user je-ik commented on the issue:

https://github.com/apache/flink/pull/5185
  
@StefanRRichter I think that was exactly the initial idea, but then we came 
into troubles with the savepoints and changing list type. Also as @aljoscha 
mentioned, it can be confusing for users to see `MapState` instead of 
`ListState` after inspecting the savepoint. Unfortunately, I currently don't 
have time to work on this, so if anyone would be interested in getting this 
done, that would be awesome.


---


[jira] [Comment Edited] (FLINK-9290) The job is unable to recover from a checkpoint

2018-05-02 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461153#comment-16461153
 ] 

Sihua Zhou edited comment on FLINK-9290 at 5/2/18 3:21 PM:
---

This looks like the same problem as what have been fixed in 
[FLINK-9263|https://issues.apache.org/jira/browse/FLINK-9263]. It's looks like 
that the serializer is not threadsafe in this case. So even the checkpoint's 
completed successfully, but the information of the serializer is incorrect 
because of the concurrency problem, so when recovery from the checkpoint, it 
just failed, but I'm not so sure about that, [~srichter] what do you think? 
Please correct me If I'm wrong.


was (Author: sihuazhou):
This looks like the same problem as what have been fixed in 
[FLINK-9263|https://issues.apache.org/jira/browse/FLINK-9263], but I'm not so 
sure, [~srichter] what do you think?

> The job is unable to recover from a checkpoint
> --
>
> Key: FLINK-9290
> URL: https://issues.apache.org/jira/browse/FLINK-9290
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Blocker
>
> Using rocksdb state backend.
> The jobs runs fine for more than 24 hours and attempts recovery because of an 
> error from the sink. It continues to fail at the time recovery with the 
> following error. The workaround is to cancel the job and start it again.
> java.lang.IllegalStateException: Could not initialize operator state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
> Serialization trace:
> topic 
> (org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:300)
>   ... 6 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
>   at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>   at java.util.ArrayList.get(ArrayList.java:433)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9290) The job is unable to recover from a checkpoint

2018-05-02 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461167#comment-16461167
 ] 

Stefan Richter commented on FLINK-9290:
---

[~sihuazhou] yes, looks like it.

> The job is unable to recover from a checkpoint
> --
>
> Key: FLINK-9290
> URL: https://issues.apache.org/jira/browse/FLINK-9290
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Blocker
>
> Using rocksdb state backend.
> The jobs runs fine for more than 24 hours and attempts recovery because of an 
> error from the sink. It continues to fail at the time recovery with the 
> following error. The workaround is to cancel the job and start it again.
> java.lang.IllegalStateException: Could not initialize operator state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
> Serialization trace:
> topic 
> (org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:300)
>   ... 6 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
>   at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>   at java.util.ArrayList.get(ArrayList.java:433)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9287) KafkaProducer011 seems to leak threads when not in exactly-once mode

2018-05-02 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9287:
--
Priority: Blocker  (was: Minor)

> KafkaProducer011 seems to leak threads when not in exactly-once mode
> 
>
> Key: FLINK-9287
> URL: https://issues.apache.org/jira/browse/FLINK-9287
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Christopher Ng
>Priority: Blocker
>
> {{KafkaProducer011}} appears to be leaking {{kafka-producer-network-thread}} 
> threads.  As far as I can tell it happens when it is not in EXACTLY_ONCE 
> mode, it seems that it creates a {{FlinkKafkaProducer}} but never closes it, 
> even when the {{FlinkKafkaProducer011}} itself is closed.
> I observed this when running a local cluster and submitting and then 
> cancelling a job, a lot of kafka threads were left alive afterwards.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9287) KafkaProducer011 seems to leak threads when not in exactly-once mode

2018-05-02 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9287:
--
Affects Version/s: 1.5.0

> KafkaProducer011 seems to leak threads when not in exactly-once mode
> 
>
> Key: FLINK-9287
> URL: https://issues.apache.org/jira/browse/FLINK-9287
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Christopher Ng
>Priority: Blocker
>
> {{KafkaProducer011}} appears to be leaking {{kafka-producer-network-thread}} 
> threads.  As far as I can tell it happens when it is not in EXACTLY_ONCE 
> mode, it seems that it creates a {{FlinkKafkaProducer}} but never closes it, 
> even when the {{FlinkKafkaProducer011}} itself is closed.
> I observed this when running a local cluster and submitting and then 
> cancelling a job, a lot of kafka threads were left alive afterwards.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8297) RocksDBListState stores whole list in single byte[]

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461160#comment-16461160
 ] 

ASF GitHub Bot commented on FLINK-8297:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5185
  
Just a thought, how about having all this implemented on top of a map 
state, and also include the current size as a special entry in the map (with 
the size field having a key that makes it lexicographically the first entry, so 
that the iteration can easily skip it). Then we could have a util that wraps a 
map state into a list state. So the user can register a map state and enhance 
it to operate as a list state. From Flink's perspective it is still a map state 
in savepoints and only the user code reinterprets it as list state. Obviously 
this does not solve the problem of migrating between different list types, but 
it also does need to introduce a second list type and keeps the window operator 
as is.



> RocksDBListState stores whole list in single byte[]
> ---
>
> Key: FLINK-8297
> URL: https://issues.apache.org/jira/browse/FLINK-8297
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Jan Lukavský
>Priority: Major
>
> RocksDBListState currently keeps whole list of data in single RocksDB 
> key-value pair, which implies that the list actually must fit into memory. 
> Larger lists are not supported and end up with OOME or other error. The 
> RocksDBListState could be modified so that individual items in list are 
> stored in separate keys in RocksDB and can then be iterated over. A simple 
> implementation could reuse existing RocksDBMapState, with key as index to the 
> list and a single RocksDBValueState keeping track of how many items has 
> already been added to the list. Because this implementation might be less 
> efficient in come cases, it would be good to make it opt-in by a construct 
> like
> {{new RocksDBStateBackend().enableLargeListsPerKey()}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5185: [FLINK-8297] [flink-rocksdb] Optionally store elements of...

2018-05-02 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5185
  
Just a thought, how about having all this implemented on top of a map 
state, and also include the current size as a special entry in the map (with 
the size field having a key that makes it lexicographically the first entry, so 
that the iteration can easily skip it). Then we could have a util that wraps a 
map state into a list state. So the user can register a map state and enhance 
it to operate as a list state. From Flink's perspective it is still a map state 
in savepoints and only the user code reinterprets it as list state. Obviously 
this does not solve the problem of migrating between different list types, but 
it also does need to introduce a second list type and keeps the window operator 
as is.



---


[jira] [Commented] (FLINK-9287) KafkaProducer011 seems to leak threads when not in exactly-once mode

2018-05-02 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461158#comment-16461158
 ] 

Piotr Nowojski commented on FLINK-9287:
---

Yes, indeed there is such bug. The problem boils down to 
{{org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011#abort}} 
being used for both aborting transactions and closing them at the end of the 
operator's lifecycle. For EXACTLY_ONCE in both cases we should be closing 
{{FlinkKafkaProducer}} but for AT_LEAST_ONCE and NONE we should be closing them 
only during operator close - which is not happening.

I'm not sure, but maybe we should split EXACTLY_ONCE and NON-EXACTLY_ONCE 
implementations, instead of trying to squeeze in both of them at the same time 
into one class. 

CC:
[~tzulitai]  [~gjy]

> KafkaProducer011 seems to leak threads when not in exactly-once mode
> 
>
> Key: FLINK-9287
> URL: https://issues.apache.org/jira/browse/FLINK-9287
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2
>Reporter: Christopher Ng
>Priority: Minor
>
> {{KafkaProducer011}} appears to be leaking {{kafka-producer-network-thread}} 
> threads.  As far as I can tell it happens when it is not in EXACTLY_ONCE 
> mode, it seems that it creates a {{FlinkKafkaProducer}} but never closes it, 
> even when the {{FlinkKafkaProducer011}} itself is closed.
> I observed this when running a local cluster and submitting and then 
> cancelling a job, a lot of kafka threads were left alive afterwards.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9290) The job is unable to recover from a checkpoint

2018-05-02 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9290:
--
Priority: Blocker  (was: Major)

> The job is unable to recover from a checkpoint
> --
>
> Key: FLINK-9290
> URL: https://issues.apache.org/jira/browse/FLINK-9290
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Blocker
>
> Using rocksdb state backend.
> The jobs runs fine for more than 24 hours and attempts recovery because of an 
> error from the sink. It continues to fail at the time recovery with the 
> following error. The workaround is to cancel the job and start it again.
> java.lang.IllegalStateException: Could not initialize operator state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
> Serialization trace:
> topic 
> (org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:300)
>   ... 6 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
>   at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>   at java.util.ArrayList.get(ArrayList.java:433)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9290) The job is unable to recover from a checkpoint

2018-05-02 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461153#comment-16461153
 ] 

Sihua Zhou commented on FLINK-9290:
---

This looks like the same problem as what have been fixed in 
[FLINK-9263|https://issues.apache.org/jira/browse/FLINK-9263], but I'm not so 
sure, [~srichter] what do you think?

> The job is unable to recover from a checkpoint
> --
>
> Key: FLINK-9290
> URL: https://issues.apache.org/jira/browse/FLINK-9290
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> Using rocksdb state backend.
> The jobs runs fine for more than 24 hours and attempts recovery because of an 
> error from the sink. It continues to fail at the time recovery with the 
> following error. The workaround is to cancel the job and start it again.
> java.lang.IllegalStateException: Could not initialize operator state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
> Serialization trace:
> topic 
> (org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:300)
>   ... 6 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
>   at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>   at java.util.ArrayList.get(ArrayList.java:433)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9290) The job is unable to recover from a checkpoint

2018-05-02 Thread Narayanan Arunachalam (JIRA)
Narayanan Arunachalam created FLINK-9290:


 Summary: The job is unable to recover from a checkpoint
 Key: FLINK-9290
 URL: https://issues.apache.org/jira/browse/FLINK-9290
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.4.2
Reporter: Narayanan Arunachalam


Using rocksdb state backend.

The jobs runs fine for more than 24 hours and attempts recovery because of an 
error from the sink. It continues to fail at the time recovery with the 
following error. The workaround is to cancel the job and start it again.
java.lang.IllegalStateException: Could not initialize operator state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
Serialization trace:
topic 
(org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:300)
... 6 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9287) KafkaProducer011 seems to leak threads when not in exactly-once mode

2018-05-02 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-9287:

Description: 
{{KafkaProducer011}} appears to be leaking {{kafka-producer-network-thread}} 
threads.  As far as I can tell it happens when it is not in EXACTLY_ONCE mode, 
it seems that it creates a {{FlinkKafkaProducer}} but never closes it, even 
when the {{FlinkKafkaProducer011}} itself is closed.

I observed this when running a local cluster and submitting and then cancelling 
a job, a lot of kafka threads were left alive afterwards.

  was:
{{KafkaProducer011}} appears to be leaking {{kafka-producer-network-thread 
}}threads.  As far as I can tell it happens when it is not in EXACTLY_ONCE 
mode, it seems that it creates a {{FlinkKafkaProducer}} but never closes it, 
even when the {{FlinkKafkaProducer011}} itself is closed.

I observed this when running a local cluster and submitting and then cancelling 
a job, a lot of kafka threads were left alive afterwards.


> KafkaProducer011 seems to leak threads when not in exactly-once mode
> 
>
> Key: FLINK-9287
> URL: https://issues.apache.org/jira/browse/FLINK-9287
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2
>Reporter: Christopher Ng
>Priority: Minor
>
> {{KafkaProducer011}} appears to be leaking {{kafka-producer-network-thread}} 
> threads.  As far as I can tell it happens when it is not in EXACTLY_ONCE 
> mode, it seems that it creates a {{FlinkKafkaProducer}} but never closes it, 
> even when the {{FlinkKafkaProducer011}} itself is closed.
> I observed this when running a local cluster and submitting and then 
> cancelling a job, a lot of kafka threads were left alive afterwards.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461101#comment-16461101
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r185517079
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that {@link FileCache} can read zipped directories from 
BlobServer and properly cleans them after.
+ */
+public class FileCacheDirectoriesTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die 
drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren 
Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr 
Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der 
Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. 
Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen 
auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom 
Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings 
umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. 
Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   final File zipArchive = 
temporaryFolder.newFile("zipArchive");
+   try (ZipOutputStrea

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-05-02 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r185517079
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that {@link FileCache} can read zipped directories from 
BlobServer and properly cleans them after.
+ */
+public class FileCacheDirectoriesTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die 
drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren 
Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr 
Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der 
Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. 
Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen 
auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom 
Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings 
umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. 
Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   final File zipArchive = 
temporaryFolder.newFile("zipArchive");
+   try (ZipOutputStream zis = new 
ZipOutputStream(new FileOutputStream(zipArchive))) {
+
+   final ZipEntry zipEntry = new 
ZipEntry("cacheFile");
+   zis.putNextEntry(zipEntry);
+
+   

[jira] [Assigned] (FLINK-9289) Parallelism of generated operators should have max parallism of input

2018-05-02 Thread Xingcan Cui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui reassigned FLINK-9289:
--

Assignee: Xingcan Cui

> Parallelism of generated operators should have max parallism of input
> -
>
> Key: FLINK-9289
> URL: https://issues.apache.org/jira/browse/FLINK-9289
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>Priority: Major
>
> The DataSet API aims to chain generated operators such as key extraction 
> mappers to their predecessor. This is done by assigning the same parallelism 
> as the input operator.
> If a generated operator has more than two inputs, the operator cannot be 
> chained anymore and the operator is generated with default parallelism. This 
> can lead to a {code}NoResourceAvailableException: Not enough free slots 
> available to run the job.{code} as reported by a user on the mailing list: 
> https://lists.apache.org/thread.html/60a8bffcce54717b6273bf3de0f43f1940fbb711590f4b90cd666c9a@%3Cuser.flink.apache.org%3E
> I suggest to set the parallelism of a generated operator to the max 
> parallelism of all of its inputs to fix this problem.
> Until the problem is fixed, a workaround is to set the default parallelism at 
> the {{ExecutionEnvironment}}:
> {code}
> ExecutionEnvironment env = ...
> env.setParallelism(2);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461077#comment-16461077
 ] 

ASF GitHub Bot commented on FLINK-9269:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5934
  
Thanks! Will merge once my travis run is green.


> Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
> -
>
> Key: FLINK-9269
> URL: https://issues.apache.org/jira/browse/FLINK-9269
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.6.0
>
>
> {code:java}
> @Test
> public void testConccurrencyProblem() throws Exception {
>   CheckpointStreamFactory streamFactory = createStreamFactory();
>   Environment env = new DummyEnvironment();
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE, env);
>   try {
>   long checkpointID = 0;
>   List futureList = new ArrayList();
>   for (int i = 0; i < 10; ++i) {
>   ValueStateDescriptor kvId = new 
> ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
>   ValueState state = 
> backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
>   ((InternalValueState) 
> state).setCurrentNamespace(VoidNamespace.INSTANCE);
>   backend.setCurrentKey(i);
>   state.update(i);
>   
> futureList.add(runSnapshotAsync(backend.snapshot(checkpointID++, 
> System.currentTimeMillis(), streamFactory, 
> CheckpointOptions.forCheckpointWithDefaultLocation(;
>   }
>   for (Future future : futureList) {
>   future.get();
>   }
>   } catch (Exception e) {
>   fail();
>   } finally {
>   backend.dispose();
>   }
> }
> protected Future runSnapshotAsync(
>   RunnableFuture> 
> snapshotRunnableFuture) throws Exception {
>   if (!snapshotRunnableFuture.isDone()) {
>   return Executors.newFixedThreadPool(5).submit(() -> {
>   try {
>   snapshotRunnableFuture.run();
>   snapshotRunnableFuture.get();
>   } catch (Exception e) {
>   e.printStackTrace();
>   fail();
>   }
>   });
>   }
>   return null;
> }
> {code}
> Place the above code in `StateBackendTestBase` and run 
> `AsyncMemoryStateBackendTest`, it will get the follows exception
> {code}
> java.util.concurrent.ExecutionException: java.lang.NullPointerException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:716)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:662)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:84)
>   ... 5 more
> java.util.concurrent.ExecutionException: java.lang.NullPointerException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecuto

[jira] [Commented] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461076#comment-16461076
 ] 

ASF GitHub Bot commented on FLINK-9269:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5934#discussion_r185508430
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
@@ -3594,6 +3599,58 @@ public String fold(String acc, Integer value) throws 
Exception {
}
}
 
+   @Test
+   public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() 
throws Exception {
+
+   CheckpointStreamFactory streamFactory = createStreamFactory();
+   Environment env = new DummyEnvironment();
+   AbstractKeyedStateBackend backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+   ExecutorService executorService = 
Executors.newScheduledThreadPool(1);
+   try {
+   long checkpointID = 0;
+   List futureList = new ArrayList();
+   for (int i = 0; i < 10; ++i) {
+   ValueStateDescriptor kvId = new 
ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
+   ValueState state = 
backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+   ((InternalValueState) 
state).setCurrentNamespace(VoidNamespace.INSTANCE);
+   backend.setCurrentKey(i);
+   state.update(i);
+
+   futureList.add(runSnapshotAsync(executorService,
+   backend.snapshot(checkpointID++, 
System.currentTimeMillis(), streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation(;
+   }
+
+   for (Future future : futureList) {
+   future.get(10, TimeUnit.SECONDS);
+   }
+   } catch (Exception e) {
+   fail();
+   } finally {
+   backend.dispose();
+   executorService.shutdown();
+   }
+   }
+
+   protected Future> runSnapshotAsync(
+   ExecutorService executorService,
+   RunnableFuture> 
snapshotRunnableFuture) throws Exception {
+
+   if (!snapshotRunnableFuture.isDone()) {
--- End diff --

Sorry, my bad, I overlooked that you are using the return value. I will 
revert this to your first approach before merging because this does not really 
improve it. 


> Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
> -
>
> Key: FLINK-9269
> URL: https://issues.apache.org/jira/browse/FLINK-9269
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.6.0
>
>
> {code:java}
> @Test
> public void testConccurrencyProblem() throws Exception {
>   CheckpointStreamFactory streamFactory = createStreamFactory();
>   Environment env = new DummyEnvironment();
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE, env);
>   try {
>   long checkpointID = 0;
>   List futureList = new ArrayList();
>   for (int i = 0; i < 10; ++i) {
>   ValueStateDescriptor kvId = new 
> ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
>   ValueState state = 
> backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
>   ((InternalValueState) 
> state).setCurrentNamespace(VoidNamespace.INSTANCE);
>   backend.setCurrentKey(i);
>   state.update(i);
>   
> futureList.add(runSnapshotAsync(backend.snapshot(checkpointID++, 
> System.currentTimeMillis(), streamFactory, 
> CheckpointOptions.forCheckpointWithDefaultLocation(;
>   }
>   for (Future future : futureList) {
>   future.get();
>   }
>   } catch (Exception e) {
>   fail();
>   } finally {
>   backend.dispose();
>   }
> }
> protected Future runSnapshotAsync(
>   RunnableFuture> 
> snapshotRunnableFuture) throws Exception {
>   if (!snapshotRunnableFuture.isDone()) {
>   return Executors.newFixedThreadPool(5).submit(() -> {
>   try {
>   snapshotRunnableFuture.ru

[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

2018-05-02 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5934
  
Thanks! Will merge once my travis run is green.


---


[GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...

2018-05-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5934#discussion_r185508430
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
@@ -3594,6 +3599,58 @@ public String fold(String acc, Integer value) throws 
Exception {
}
}
 
+   @Test
+   public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() 
throws Exception {
+
+   CheckpointStreamFactory streamFactory = createStreamFactory();
+   Environment env = new DummyEnvironment();
+   AbstractKeyedStateBackend backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+   ExecutorService executorService = 
Executors.newScheduledThreadPool(1);
+   try {
+   long checkpointID = 0;
+   List futureList = new ArrayList();
+   for (int i = 0; i < 10; ++i) {
+   ValueStateDescriptor kvId = new 
ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
+   ValueState state = 
backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+   ((InternalValueState) 
state).setCurrentNamespace(VoidNamespace.INSTANCE);
+   backend.setCurrentKey(i);
+   state.update(i);
+
+   futureList.add(runSnapshotAsync(executorService,
+   backend.snapshot(checkpointID++, 
System.currentTimeMillis(), streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation(;
+   }
+
+   for (Future future : futureList) {
+   future.get(10, TimeUnit.SECONDS);
+   }
+   } catch (Exception e) {
+   fail();
+   } finally {
+   backend.dispose();
+   executorService.shutdown();
+   }
+   }
+
+   protected Future> runSnapshotAsync(
+   ExecutorService executorService,
+   RunnableFuture> 
snapshotRunnableFuture) throws Exception {
+
+   if (!snapshotRunnableFuture.isDone()) {
--- End diff --

Sorry, my bad, I overlooked that you are using the return value. I will 
revert this to your first approach before merging because this does not really 
improve it. 


---


[jira] [Commented] (FLINK-6719) Add details about fault-tolerance of timers to ProcessFunction docs

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461075#comment-16461075
 ] 

ASF GitHub Bot commented on FLINK-6719:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5887
  
Thanks for the update @bowenli86. 

I'll merge the PR later.


> Add details about fault-tolerance of timers to ProcessFunction docs
> ---
>
> Key: FLINK-6719
> URL: https://issues.apache.org/jira/browse/FLINK-6719
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Documentation
>Affects Versions: 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> The fault-tolerance of timers is a frequently asked questions on the mailing 
> lists. We should add details about the topic in the {{ProcessFunction}} docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5887: [FLINK-6719] [docs] Add details about fault-tolerance of ...

2018-05-02 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5887
  
Thanks for the update @bowenli86. 

I'll merge the PR later.


---


[jira] [Commented] (FLINK-9273) Class cast exception

2018-05-02 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461060#comment-16461060
 ] 

Fabian Hueske commented on FLINK-9273:
--

Hi [~bob365],

Flink cannot automatically extract the field types for {{Row}} types. Instead, 
you have to manually create a RowTypeInfo and provide all field types.
The exception looks like you declared a field as {{Long}} that was actually a 
{{String}}.

Passing TypeInformation in Scala is a bit tricky, because you have to work with 
implicits.

>From what it looks like, this is not a bug in Flink. 
If you confirm, I'd close this issue as "Not a Problem".


> Class cast exception
> 
>
> Key: FLINK-9273
> URL: https://issues.apache.org/jira/browse/FLINK-9273
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Streaming, Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Priority: Major
>
> Exception stack is as follows:
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121)
> at 
> com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:385)
> at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
> at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:630)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:583)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:396)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at 
> org.apache.flink.streaming.

[jira] [Closed] (FLINK-9213) Revert breaking change in checkpoint detail URL

2018-05-02 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9213.
---
Resolution: Fixed

master: dee63946904fa7f1c82f4d482cb8a425935e8ef6
1.5: 547152eadd54f2b0d827b9fd53bf2a0d0a0c893e

> Revert breaking change in checkpoint detail URL
> ---
>
> Key: FLINK-9213
> URL: https://issues.apache.org/jira/browse/FLINK-9213
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.5.0
>
>
> In 1.4, the URL for retrieving detailed checkpoint information is 
> {{/jobs/:jobid/checkpoints/details/:checkpointid}}, whereas in 1.5 it is 
> {{/jobs/:jobid/checkpoints/:checkpointid}}.
> This is a breaking change that also affects the WebUI and should thus be 
> reverted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9041) Refactor StreamTaskTest to not use scala and akka

2018-05-02 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9041.
---
   Resolution: Fixed
Fix Version/s: 1.6.0

master: 40e412ae233046a5f6f38cf86288ab5185d9d194

> Refactor StreamTaskTest to not use scala and akka
> -
>
> Key: FLINK-9041
> URL: https://issues.apache.org/jira/browse/FLINK-9041
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Andrey Zagrebin
>Priority: Trivial
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9216) Fix comparator violation

2018-05-02 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461020#comment-16461020
 ] 

Chesnay Schepler commented on FLINK-9216:
-

master: 3242214bac3719d057aeaeb34259039dbbf09fb2
1.5: 05c6ef1efede6a3f51eb79103ae0ff392dda7b07

> Fix comparator violation
> 
>
> Key: FLINK-9216
> URL: https://issues.apache.org/jira/browse/FLINK-9216
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
> Environment: {{JSONGenerator}} uses an improper {{Comparator}} for 
> sorting Operator ID, which might cause 
> {{java.lang.IllegalArgumentException: Comparison method violates its general 
> contract!}}
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9249) Add convenience profile for skipping non-essential plugins

2018-05-02 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9249.
---
Resolution: Fixed

master: 678ab6cb2f810a93c9b9599cde56d821d09801af
1.5: 45354899cef40ed6ce8a38119ed87003b5286acc

> Add convenience profile for skipping non-essential plugins
> --
>
> Key: FLINK-9249
> URL: https://issues.apache.org/jira/browse/FLINK-9249
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0
>
>
> When compiling Flink devs can already set a variety of command line options 
> to speed up the process, for example skipping checkstyle. We also do the same 
> thing on travis.
> However, not only is it difficult to keep track of all possible options, it 
> is also tedious to write and obfuscates the actual command.
> I propose adding a {{fast}} profile that skips non-essential plugins, 
> including:
> * rat
> * checkstyle
> * scalastyle
> * enforcer
> * japicmp
> * javadoc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9211) Job submission via REST/dashboard does not work on Kubernetes

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461015#comment-16461015
 ] 

ASF GitHub Bot commented on FLINK-9211:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5903


> Job submission via REST/dashboard does not work on Kubernetes
> -
>
> Key: FLINK-9211
> URL: https://issues.apache.org/jira/browse/FLINK-9211
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Job-Submission, REST, Web Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When setting up a cluster on Kubernets according to the documentation it is 
> possible to upload jar files but when trying to execute them you get an 
> exception like this:
> {code}
> org.apache.flink.runtime.rest.handler.RestHandlerException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$2(JarRunHandler.java:113)
> at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:196)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$5(RestClusterClient.java:356)
> ... 17 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
> ... 18 more
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: 
> connection timed out: flink-jobmanager/10.105.154.28:8081
> at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> ... 15 more
> Caused by: 
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: 
> connection timed out: flink-jobmanager/10.105.154.28:8081
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
> ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9154) Include WebSubmissionExtension in REST API docs

2018-05-02 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9154.
---
Resolution: Fixed

master: fa321ea96bc5a58d53c0cf29dbd8fb04ace3c2a8
1.5: 1c48e32446363e903f5084082e9b9821c0db4587

> Include WebSubmissionExtension in REST API docs
> ---
>
> Key: FLINK-9154
> URL: https://issues.apache.org/jira/browse/FLINK-9154
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> The handlers contained in the {{WebSubmissionExtension}} are currently not 
> documented in the REST API docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9041) Refactor StreamTaskTest to not use scala and akka

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461012#comment-16461012
 ] 

ASF GitHub Bot commented on FLINK-9041:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5912


> Refactor StreamTaskTest to not use scala and akka
> -
>
> Key: FLINK-9041
> URL: https://issues.apache.org/jira/browse/FLINK-9041
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Andrey Zagrebin
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9213) Revert breaking change in checkpoint detail URL

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461010#comment-16461010
 ] 

ASF GitHub Bot commented on FLINK-9213:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5906


> Revert breaking change in checkpoint detail URL
> ---
>
> Key: FLINK-9213
> URL: https://issues.apache.org/jira/browse/FLINK-9213
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.5.0
>
>
> In 1.4, the URL for retrieving detailed checkpoint information is 
> {{/jobs/:jobid/checkpoints/details/:checkpointid}}, whereas in 1.5 it is 
> {{/jobs/:jobid/checkpoints/:checkpointid}}.
> This is a breaking change that also affects the WebUI and should thus be 
> reverted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9154) Include WebSubmissionExtension in REST API docs

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461016#comment-16461016
 ] 

ASF GitHub Bot commented on FLINK-9154:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5833


> Include WebSubmissionExtension in REST API docs
> ---
>
> Key: FLINK-9154
> URL: https://issues.apache.org/jira/browse/FLINK-9154
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> The handlers contained in the {{WebSubmissionExtension}} are currently not 
> documented in the REST API docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9216) Fix comparator violation

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461011#comment-16461011
 ] 

ASF GitHub Bot commented on FLINK-9216:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5878


> Fix comparator violation
> 
>
> Key: FLINK-9216
> URL: https://issues.apache.org/jira/browse/FLINK-9216
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
> Environment: {{JSONGenerator}} uses an improper {{Comparator}} for 
> sorting Operator ID, which might cause 
> {{java.lang.IllegalArgumentException: Comparison method violates its general 
> contract!}}
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8743) Add annotation to override documented default

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461014#comment-16461014
 ] 

ASF GitHub Bot commented on FLINK-8743:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5822


> Add annotation to override documented default
> -
>
> Key: FLINK-8743
> URL: https://issues.apache.org/jira/browse/FLINK-8743
> Project: Flink
>  Issue Type: New Feature
>  Components: Configuration, Documentation
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> The default value for some {{ConfigOptions}} is difficult to document as it 
> isn't static. This mostly affects options that use 
> {{System.getProperty("java.io.tmpdir")}}, as for example 
> {{CoreOptions#TMP_DIRS}}.
> To deal with this the generator has a special branch for selected options 
> that overrides this default: 
> {code}
> if (option == WebOptions.TMP_DIR || option.key().equals("python.dc.tmp.dir") 
> || option == CoreOptions.TMP_DIRS) {
>   defaultValue = null;
> }
> {code}
> (let's ignore for now that it just wipes the default and isn't setting it to 
> "System.getProperty("java.io.tmpdir")")
> This is pretty much hidden deep in the implementation of the generator. It 
> would be better if we had a dedicated annotation 
> {{@OverrideDocumentedDefault(String override)}} that options could be 
> annotated with.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >