[jira] [Commented] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable

2018-03-22 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410823#comment-16410823
 ] 

Sihua Zhou commented on FLINK-9058:
---

Shall we also relax the {{MapState.putAll()}} to take {{Iterable}}, it is an 
existing interface currently, could we deprecated it and introduce a new one? 
(If we will deprecated it later, it better to deprecated it earlier)

> Relax ListState.addAll() and ListState.update() to take Iterable
> 
>
> Key: FLINK-9058
> URL: https://issues.apache.org/jira/browse/FLINK-9058
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.5.0
>
>
> [~srichter] What do you think about this. None of the implementations require 
> the parameter to actually be a list and allowing an {{Iterable}} there allows 
> calling it in situations where all you have is an {{Iterable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6473) Add OVER window support for batch tables

2018-03-22 Thread yinhua.dai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410799#comment-16410799
 ] 

yinhua.dai commented on FLINK-6473:
---

[~fhueske]

Wondering before this feature is implemented, how could I implement aggregation 
function for incremental window?

For example, I want to calculate an average of price from the beginning to 
today for each day for a batch table.

SELECT productId, AVG(price) OVER(

    PARTITION BY productId

    ROWS BETWEEN PRECEDING UNBOUNDED AND CURRENT ROW

    )

FROM product

> Add OVER window support for batch tables
> 
>
> Key: FLINK-6473
> URL: https://issues.apache.org/jira/browse/FLINK-6473
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Priority: Major
>
> Add support for OVER windows for batch tables. 
> Since OVER windows are supported for streaming tables, this issue is not 
> about the API (which is available) but about adding the execution strategies 
> and translation for OVER windows on batch tables.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8988) End-to-end test: Cassandra connector

2018-03-22 Thread Shuyi Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen reassigned FLINK-8988:
-

Assignee: Shuyi Chen

> End-to-end test: Cassandra connector
> 
>
> Key: FLINK-8988
> URL: https://issues.apache.org/jira/browse/FLINK-8988
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cassandra Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>
> In order to test the integration with Cassandra, we should add an end-to-end 
> test which tests the Cassandra connector. In order to do this, we need to add 
> a script/function which sets up a {{Cassandra}} cluster. Then we can run a 
> simple job writing information to {{Cassandra}} using the 
> {{CassandraRowWriteAheadSink}} and the {{CassandraTupleWriteAheadSink}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8988) End-to-end test: Cassandra connector

2018-03-22 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410784#comment-16410784
 ] 

Shuyi Chen commented on FLINK-8988:
---

[~till.rohrmann] that sounds a good idea. I can help with it.

> End-to-end test: Cassandra connector
> 
>
> Key: FLINK-8988
> URL: https://issues.apache.org/jira/browse/FLINK-8988
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cassandra Connector, Tests
>Reporter: Till Rohrmann
>Priority: Major
>
> In order to test the integration with Cassandra, we should add an end-to-end 
> test which tests the Cassandra connector. In order to do this, we need to add 
> a script/function which sets up a {{Cassandra}} cluster. Then we can run a 
> simple job writing information to {{Cassandra}} using the 
> {{CassandraRowWriteAheadSink}} and the {{CassandraTupleWriteAheadSink}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8970) Add more automated end-to-end tests

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410766#comment-16410766
 ] 

ASF GitHub Bot commented on FLINK-8970:
---

Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5750
  
Based on the umbrella task link FLINK-8970, it seems like this e2e test 
should be attached to FLINK-8973 instead?


> Add more automated end-to-end tests
> ---
>
> Key: FLINK-8970
> URL: https://issues.apache.org/jira/browse/FLINK-8970
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Priority: Critical
>
> In order to improve Flink's test coverage and make releasing easier, we 
> should add more automated end-to-end tests which test Flink more like a user 
> would interact with the system. Additionally, these end-to-end tests should 
> test the integration of various other systems with Flink.
> With FLINK-6539, we added a new module \{{flink-end-to-end-tests}} which 
> contains the set of currently available end-to-end tests.
> With FLINK-8911, a script was added to trigger these tests.
>  
> This issue is an umbrella issue collecting all different end-to-end tests 
> which we want to add to the Flink repository.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5750: [FLINK-8970] [E2E] HA end-to-end test with StateMachineEx...

2018-03-22 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5750
  
Based on the umbrella task link FLINK-8970, it seems like this e2e test 
should be attached to FLINK-8973 instead?


---


[jira] [Commented] (FLINK-9060) Deleting state using KeyedStateBackend.getKeys() throws Exception

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410755#comment-16410755
 ] 

ASF GitHub Bot commented on FLINK-9060:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5751
  
CC: @aljoscha @StefanRRichter 


> Deleting state using KeyedStateBackend.getKeys() throws Exception
> -
>
> Key: FLINK-9060
> URL: https://issues.apache.org/jira/browse/FLINK-9060
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Adding this test to {{StateBackendTestBase}} showcases the problem:
> {code}
> @Test
> public void testConcurrentModificationWithGetKeys() throws Exception {
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE);
>   try {
>   ListStateDescriptor listStateDescriptor =
>   new ListStateDescriptor<>("foo", 
> StringSerializer.INSTANCE);
>   backend.setCurrentKey(1);
>   backend
>   .getPartitionedState(VoidNamespace.INSTANCE, 
> VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
>   .add("Hello");
>   backend.setCurrentKey(2);
>   backend
>   .getPartitionedState(VoidNamespace.INSTANCE, 
> VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
>   .add("Ciao");
>   Stream keys = backend
>   .getKeys(listStateDescriptor.getName(), 
> VoidNamespace.INSTANCE);
>   keys.forEach((key) -> {
>   backend.setCurrentKey(key);
>   try {
>   backend
>   .getPartitionedState(
>   VoidNamespace.INSTANCE,
>   
> VoidNamespaceSerializer.INSTANCE,
>   listStateDescriptor)
>   .clear();
>   } catch (Exception e) {
>   e.printStackTrace();
>   }
>   });
>   }
>   finally {
>   IOUtils.closeQuietly(backend);
>   backend.dispose();
>   }
> }
> {code}
> This should work because one of the use cases of {{getKeys()}} and 
> {{applyToAllKeys()}} is to do stuff for every key, which includes deleting 
> them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...

2018-03-22 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5751
  
CC: @aljoscha @StefanRRichter 


---


[jira] [Commented] (FLINK-9060) Deleting state using KeyedStateBackend.getKeys() throws Exception

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410753#comment-16410753
 ] 

ASF GitHub Bot commented on FLINK-9060:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5751

[FLINK-9060][state] Deleting state using KeyedStateBackend.getKeys() throws 
Exception

## What is the purpose of the change

This PR fixes the problem when deleting state using 
`KeyedStateBackend.getKeys()` throws Exception.

## Brief change log

  - copy the result of `getKeys()` into `list` to avoid concurrency problem.

## Verifying this change

  - *add a unit test in 
`StateBackendTest#testConcurrentModificationWithGetKeys()` to verify this*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation
 no


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink 
deletingStateUsingKeyedStateBackendGetKeys

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5751.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5751


commit cba3a32f3af16ee92676b1e5b82b21af6fee610d
Author: sihuazhou 
Date:   2018-03-23T03:20:42Z

fix concurrency risk in HeapKeyedStateBackend#getKeys().




> Deleting state using KeyedStateBackend.getKeys() throws Exception
> -
>
> Key: FLINK-9060
> URL: https://issues.apache.org/jira/browse/FLINK-9060
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Adding this test to {{StateBackendTestBase}} showcases the problem:
> {code}
> @Test
> public void testConcurrentModificationWithGetKeys() throws Exception {
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE);
>   try {
>   ListStateDescriptor listStateDescriptor =
>   new ListStateDescriptor<>("foo", 
> StringSerializer.INSTANCE);
>   backend.setCurrentKey(1);
>   backend
>   .getPartitionedState(VoidNamespace.INSTANCE, 
> VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
>   .add("Hello");
>   backend.setCurrentKey(2);
>   backend
>   .getPartitionedState(VoidNamespace.INSTANCE, 
> VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
>   .add("Ciao");
>   Stream keys = backend
>   .getKeys(listStateDescriptor.getName(), 
> VoidNamespace.INSTANCE);
>   keys.forEach((key) -> {
>   backend.setCurrentKey(key);
>   try {
>   backend
>   .getPartitionedState(
>   VoidNamespace.INSTANCE,
>   
> VoidNamespaceSerializer.INSTANCE,
>   listStateDescriptor)
>   .clear();
>   } catch (Exception e) {
>   e.printStackTrace();
>   }
>   });
>   }
>   finally {
>   IOUtils.closeQuietly(backend);
>   backend.dispose();
>   }
> }
> {code}
> This should work because one of the use cases of {{getKeys()}} and 
> {{applyToAllKeys()}} is to do stuff for every key, which includes deleting 
> them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5751: [FLINK-9060][state] Deleting state using KeyedStat...

2018-03-22 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5751

[FLINK-9060][state] Deleting state using KeyedStateBackend.getKeys() throws 
Exception

## What is the purpose of the change

This PR fixes the problem when deleting state using 
`KeyedStateBackend.getKeys()` throws Exception.

## Brief change log

  - copy the result of `getKeys()` into `list` to avoid concurrency problem.

## Verifying this change

  - *add a unit test in 
`StateBackendTest#testConcurrentModificationWithGetKeys()` to verify this*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation
 no


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink 
deletingStateUsingKeyedStateBackendGetKeys

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5751.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5751


commit cba3a32f3af16ee92676b1e5b82b21af6fee610d
Author: sihuazhou 
Date:   2018-03-23T03:20:42Z

fix concurrency risk in HeapKeyedStateBackend#getKeys().




---


[jira] [Assigned] (FLINK-7775) Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs

2018-03-22 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-7775:
---

Assignee: vinoyang

> Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs
> ---
>
> Key: FLINK-7775
> URL: https://issues.apache.org/jira/browse/FLINK-7775
> Project: Flink
>  Issue Type: Task
>  Components: Local Runtime
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
>   public int getNumberOfCachedJobs() {
> return jobRefCounters.size();
>   }
> {code}
> The method of PermanentBlobCache is not used.
> We should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9057) NPE in CreditBasedSequenceNumberingViewReader when cancelling before initilization was complete

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410688#comment-16410688
 ] 

ASF GitHub Bot commented on FLINK-9057:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5747
  
Thanks for fixing this problem. The `notifyReaderCreated` should be called 
after both views are created correctly, otherwise it will cause inconsistent.

LGTM  


> NPE in CreditBasedSequenceNumberingViewReader when cancelling before 
> initilization was complete
> ---
>
> Key: FLINK-9057
> URL: https://issues.apache.org/jira/browse/FLINK-9057
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {{RescalingITCase}} unveiled an exception which may occur when shutting down 
> before completely initializing the network stack:
> https://travis-ci.org/apache/flink/jobs/356612100
> {code}
> 01:08:13,458 WARN  
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline  - An 
> exception was thrown by a user handler's exceptionCaught() method while 
> handling the following exception:
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.releaseAllResources(CreditBasedSequenceNumberingViewReader.java:192)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.releaseAllResources(PartitionRequestQueue.java:322)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.channelInactive(PartitionRequestQueue.java:298)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
>   at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5747: [FLINK-9057][network] fix an NPE when cleaning up before ...

2018-03-22 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5747
  
Thanks for fixing this problem. The `notifyReaderCreated` should be called 
after both views are created correctly, otherwise it will cause inconsistent.

LGTM 👍 


---


[jira] [Commented] (FLINK-9060) Deleting state using KeyedStateBackend.getKeys() throws Exception

2018-03-22 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410669#comment-16410669
 ] 

Sihua Zhou commented on FLINK-9060:
---

1. For {{MemoryStateBackendTest}} this is because a concurrency problem in 
{{HeapKeyedStateBackend}}.
2. For {{RocksDBStateBackendTest}} this is because a bug of this test code.

I'd like to take this ticket, if no one have already work on this.

> Deleting state using KeyedStateBackend.getKeys() throws Exception
> -
>
> Key: FLINK-9060
> URL: https://issues.apache.org/jira/browse/FLINK-9060
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Adding this test to {{StateBackendTestBase}} showcases the problem:
> {code}
> @Test
> public void testConcurrentModificationWithGetKeys() throws Exception {
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE);
>   try {
>   ListStateDescriptor listStateDescriptor =
>   new ListStateDescriptor<>("foo", 
> StringSerializer.INSTANCE);
>   backend.setCurrentKey(1);
>   backend
>   .getPartitionedState(VoidNamespace.INSTANCE, 
> VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
>   .add("Hello");
>   backend.setCurrentKey(2);
>   backend
>   .getPartitionedState(VoidNamespace.INSTANCE, 
> VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
>   .add("Ciao");
>   Stream keys = backend
>   .getKeys(listStateDescriptor.getName(), 
> VoidNamespace.INSTANCE);
>   keys.forEach((key) -> {
>   backend.setCurrentKey(key);
>   try {
>   backend
>   .getPartitionedState(
>   VoidNamespace.INSTANCE,
>   
> VoidNamespaceSerializer.INSTANCE,
>   listStateDescriptor)
>   .clear();
>   } catch (Exception e) {
>   e.printStackTrace();
>   }
>   });
>   }
>   finally {
>   IOUtils.closeQuietly(backend);
>   backend.dispose();
>   }
> }
> {code}
> This should work because one of the use cases of {{getKeys()}} and 
> {{applyToAllKeys()}} is to do stuff for every key, which includes deleting 
> them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

2018-03-22 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5580
  
@zentol What do you think we still need now? I think this makes things 
easier for users. And we also need this for work on the Beam Flink Runner.


---


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410665#comment-16410665
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5580
  
@zentol What do you think we still need now? I think this makes things 
easier for users. And we also need this for work on the Beam Flink Runner.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover

2018-03-22 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410660#comment-16410660
 ] 

vinoyang commented on FLINK-8946:
-

[~till.rohrmann] yes, I will try to fix it.

> TaskManager stop sending metrics after JobManager failover
> --
>
> Key: FLINK-8946
> URL: https://issues.apache.org/jira/browse/FLINK-8946
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, TaskManager
>Affects Versions: 1.4.2
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Critical
> Fix For: 1.5.0
>
>
> Running in Yarn-standalone mode, when the Job Manager performs a failover, 
> all TaskManager that are inherited from the previous JobManager will not send 
> metrics to the new JobManager and other registered metric reporters.
>  
> A cursory glance reveal that these line of code might be the cause
> [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086]
> Perhap the TaskManager close its metrics group when disassociating 
> JobManager, but not creating a new one on fail-over association ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9060) Deleting state using KeyedStateBackend.getKeys() throws Exception

2018-03-22 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9060:
-

Assignee: Sihua Zhou

> Deleting state using KeyedStateBackend.getKeys() throws Exception
> -
>
> Key: FLINK-9060
> URL: https://issues.apache.org/jira/browse/FLINK-9060
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Adding this test to {{StateBackendTestBase}} showcases the problem:
> {code}
> @Test
> public void testConcurrentModificationWithGetKeys() throws Exception {
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE);
>   try {
>   ListStateDescriptor listStateDescriptor =
>   new ListStateDescriptor<>("foo", 
> StringSerializer.INSTANCE);
>   backend.setCurrentKey(1);
>   backend
>   .getPartitionedState(VoidNamespace.INSTANCE, 
> VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
>   .add("Hello");
>   backend.setCurrentKey(2);
>   backend
>   .getPartitionedState(VoidNamespace.INSTANCE, 
> VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
>   .add("Ciao");
>   Stream keys = backend
>   .getKeys(listStateDescriptor.getName(), 
> VoidNamespace.INSTANCE);
>   keys.forEach((key) -> {
>   backend.setCurrentKey(key);
>   try {
>   backend
>   .getPartitionedState(
>   VoidNamespace.INSTANCE,
>   
> VoidNamespaceSerializer.INSTANCE,
>   listStateDescriptor)
>   .clear();
>   } catch (Exception e) {
>   e.printStackTrace();
>   }
>   });
>   }
>   finally {
>   IOUtils.closeQuietly(backend);
>   backend.dispose();
>   }
> }
> {code}
> This should work because one of the use cases of {{getKeys()}} and 
> {{applyToAllKeys()}} is to do stuff for every key, which includes deleting 
> them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-22 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-9061:
--

 Summary: S3 checkpoint data not partitioned well -- causes errors 
and poor performance
 Key: FLINK-9061
 URL: https://issues.apache.org/jira/browse/FLINK-9061
 Project: Flink
  Issue Type: Bug
  Components: FileSystem, State Backends, Checkpointing
Affects Versions: 1.4.2
Reporter: Jamie Grier


I think we need to modify the way we write checkpoints to S3 for high-scale 
jobs (those with many total tasks).  The issue is that we are writing all the 
checkpoint data under a common key prefix.  This is the worst case scenario for 
S3 performance since the key is used as a partition key.
 
In the worst case checkpoints fail with a 500 status code coming back from S3 
and an internal error type of TooBusyException.

 
One possible solution would be to add a hook in the Flink filesystem code that 
allows me to "rewrite" paths.  For example say I have the checkpoint directory 
set to:
 
s3://bucket/flink/checkpoints
 
I would hook that and rewrite that path to:
 
s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
path
 
This would distribute the checkpoint write load around the S3 cluster evenly.
 
For reference: 
https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
 
Any other people hit this issue?  Any other ideas for solutions?  This is a 
pretty serious problem for people trying to checkpoint to S3.
 
-Jamie
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9060) Deleting state using KeyedStateBackend.getKeys() throws Exception

2018-03-22 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-9060:
---

 Summary: Deleting state using KeyedStateBackend.getKeys() throws 
Exception
 Key: FLINK-9060
 URL: https://issues.apache.org/jira/browse/FLINK-9060
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Aljoscha Krettek
 Fix For: 1.5.0


Adding this test to {{StateBackendTestBase}} showcases the problem:

{code}
@Test
public void testConcurrentModificationWithGetKeys() throws Exception {
AbstractKeyedStateBackend backend = 
createKeyedBackend(IntSerializer.INSTANCE);

try {

ListStateDescriptor listStateDescriptor =
new ListStateDescriptor<>("foo", 
StringSerializer.INSTANCE);

backend.setCurrentKey(1);

backend
.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
.add("Hello");

backend.setCurrentKey(2);

backend
.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
.add("Ciao");

Stream keys = backend
.getKeys(listStateDescriptor.getName(), 
VoidNamespace.INSTANCE);

keys.forEach((key) -> {
backend.setCurrentKey(key);
try {
backend
.getPartitionedState(
VoidNamespace.INSTANCE,

VoidNamespaceSerializer.INSTANCE,
listStateDescriptor)
.clear();
} catch (Exception e) {
e.printStackTrace();
}
});

}
finally {
IOUtils.closeQuietly(backend);
backend.dispose();
}
}

{code}

This should work because one of the use cases of {{getKeys()}} and 
{{applyToAllKeys()}} is to do stuff for every key, which includes deleting them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7775) Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs

2018-03-22 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7775:
--
Description: 
{code}
  public int getNumberOfCachedJobs() {
return jobRefCounters.size();
  }
{code}
The method of PermanentBlobCache is not used.
We should remove it.

  was:
{code}
  public int getNumberOfCachedJobs() {
return jobRefCounters.size();
  }
{code}

The method of PermanentBlobCache is not used.
We should remove it.


> Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs
> ---
>
> Key: FLINK-7775
> URL: https://issues.apache.org/jira/browse/FLINK-7775
> Project: Flink
>  Issue Type: Task
>  Components: Local Runtime
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public int getNumberOfCachedJobs() {
> return jobRefCounters.size();
>   }
> {code}
> The method of PermanentBlobCache is not used.
> We should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9049) Create unified interfaces to configure and instatiate TableSink

2018-03-22 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410237#comment-16410237
 ] 

Shuyi Chen commented on FLINK-9049:
---

Good point, [~fhueske]. Just created a JIRA for this and will get it in 1.5.0.

> Create unified interfaces to configure and instatiate TableSink
> ---
>
> Key: FLINK-9049
> URL: https://issues.apache.org/jira/browse/FLINK-9049
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> This is a similar effort to 
> [FLINK-8240|https://issues.apache.org/jira/browse/FLINK-8240], we want to 
> create a unified interface for discovery and instantiation of TableSink, and 
> later support table DDL in flink. The proposed solution would use similar 
> approach in [FLINK-8240|https://issues.apache.org/jira/browse/FLINK-8240], 
> and can re-use most of the implementations already done in 
> [FLINK-8240|https://issues.apache.org/jira/browse/FLINK-8240].
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "tableType" with values (source, sink and 
> both) for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-03-22 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-9059:
-

 Summary: Add support for unified table source and sink declaration 
in environment file
 Key: FLINK-9059
 URL: https://issues.apache.org/jira/browse/FLINK-9059
 Project: Flink
  Issue Type: Task
  Components: Table API  SQL
Reporter: Shuyi Chen
Assignee: Shuyi Chen
 Fix For: 1.5.0


1) Add a common property called "tableType" with single value 'source'.
2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5750: [FLINK-8970] [E2E] HA end-to-end test with StateMa...

2018-03-22 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/5750

[FLINK-8970] [E2E] HA end-to-end test with StateMachineExample.

Adds an end-to-end test that runs the StateMachineExample on a local 
cluster with HA enabled. There is a single JM which gets killed and re-created 
and we check if the new JM picks up the job execution and if at the end the 
StateMachine has no ALERTs printed.

## Verifying this change

It is a script that you can run independently.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink ha-end-to-end-inv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5750.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5750


commit 1fb36e02d79dd2299dbf7c6c6ff84b76226adf91
Author: kkloudas 
Date:   2018-03-15T12:13:46Z

[FLINK-8970] [E2E] HA end-to-end test with StateMachineExample.

Adds an end-to-end test that runs the StateMachineExample on a local
cluster with HA enabled. There is a single JM which gets killed and
re-created and we check if the new JM picks up the job execution and
if at the end the StateMachine has no ALERTs printed.




---


[jira] [Commented] (FLINK-8970) Add more automated end-to-end tests

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410172#comment-16410172
 ] 

ASF GitHub Bot commented on FLINK-8970:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/5750

[FLINK-8970] [E2E] HA end-to-end test with StateMachineExample.

Adds an end-to-end test that runs the StateMachineExample on a local 
cluster with HA enabled. There is a single JM which gets killed and re-created 
and we check if the new JM picks up the job execution and if at the end the 
StateMachine has no ALERTs printed.

## Verifying this change

It is a script that you can run independently.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink ha-end-to-end-inv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5750.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5750


commit 1fb36e02d79dd2299dbf7c6c6ff84b76226adf91
Author: kkloudas 
Date:   2018-03-15T12:13:46Z

[FLINK-8970] [E2E] HA end-to-end test with StateMachineExample.

Adds an end-to-end test that runs the StateMachineExample on a local
cluster with HA enabled. There is a single JM which gets killed and
re-created and we check if the new JM picks up the job execution and
if at the end the StateMachine has no ALERTs printed.




> Add more automated end-to-end tests
> ---
>
> Key: FLINK-8970
> URL: https://issues.apache.org/jira/browse/FLINK-8970
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Priority: Critical
>
> In order to improve Flink's test coverage and make releasing easier, we 
> should add more automated end-to-end tests which test Flink more like a user 
> would interact with the system. Additionally, these end-to-end tests should 
> test the integration of various other systems with Flink.
> With FLINK-6539, we added a new module \{{flink-end-to-end-tests}} which 
> contains the set of currently available end-to-end tests.
> With FLINK-8911, a script was added to trigger these tests.
>  
> This issue is an umbrella issue collecting all different end-to-end tests 
> which we want to add to the Flink repository.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable

2018-03-22 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410031#comment-16410031
 ] 

Aljoscha Krettek commented on FLINK-9058:
-

We could do but I would be more hesitant to change that, because 
{{ListCheckpointed}} is an existing interface while this ticket is about 
changing methods that we have not yet released in a stable release.

Side note, I think {{ListCheckpointed}} will go away at some point because it's 
functionality is subsumed by the various methods on {{OperatorStateStore}}. 
[~kkrugler] What do you think about that?

> Relax ListState.addAll() and ListState.update() to take Iterable
> 
>
> Key: FLINK-9058
> URL: https://issues.apache.org/jira/browse/FLINK-9058
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.5.0
>
>
> [~srichter] What do you think about this. None of the implementations require 
> the parameter to actually be a list and allowing an {{Iterable}} there allows 
> calling it in situations where all you have is an {{Iterable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9034) State Descriptors drop TypeInformation on serialization

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410027#comment-16410027
 ] 

ASF GitHub Bot commented on FLINK-9034:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5732
  
@StephanEwen Yes, I think this should go into 1.5.0 because it fixes 
potential (and real) problems. And yes, I wasn't suggesting to remove 
`initializeSerializerUnlessSet(ExecutionConfig)` now, but it seemed like a good 
place to mention it.  


> State Descriptors drop TypeInformation on serialization
> ---
>
> Key: FLINK-9034
> URL: https://issues.apache.org/jira/browse/FLINK-9034
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> The following code currently causes problems
> {code}
> public class MyFunction extends RichMapFunction  {
> private final ValueStateDescriptor descr = new 
> ValueStateDescriptor<>("state name", MyType.class);
> private ValueState state;
> @Override
> public void open() {
> state = getRuntimeContext().getValueState(descr);
> }
> }
> {code}
> The problem is that the state descriptor drops the type information and 
> creates a serializer before serialization as part of shipping the function in 
> the cluster. To do that, it initializes the serializer with an empty 
> execution config, making serialization inconsistent.
> This is mainly an artifact from the days when dropping the type information 
> before shipping was necessary, because the type info was not serializable. It 
> now is, and we can fix that bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9054) IllegalStateException: Buffer pool is destroyed

2018-03-22 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-9054:

Priority: Blocker  (was: Critical)

> IllegalStateException: Buffer pool is destroyed
> ---
>
> Key: FLINK-9054
> URL: https://issues.apache.org/jira/browse/FLINK-9054
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Configuration, Core
>Affects Versions: 1.4.2
>Reporter: dhiraj prajapati
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: flink-conf.yaml
>
>
> Hi,
> I have a flink cluster running on 2 machines, say A and B.
> Job manager is running on A. There are 2 TaksManagers, one on each node.
> So effectively, A has a job manager and a task manager, while B has a task 
> manager.
> When I submit a job to the cluster, I see below exception and the job fails:
> 2018-03-22 17:16:52,205 WARN 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while 
> emitting latency marker.
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486)
>  ... 10 more
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitLatencyMarker(OperatorChain.java:604)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486)
>  ... 14 more
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107)
>  at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138)
>  ... 19 more
>  
> The exception does not come when I run only one JobManager (only on machine 
> B).
>  
> I am attaching flink-conf.yaml



--
This message was 

[jira] [Updated] (FLINK-9054) IllegalStateException: Buffer pool is destroyed

2018-03-22 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-9054:

Fix Version/s: 1.5.0

> IllegalStateException: Buffer pool is destroyed
> ---
>
> Key: FLINK-9054
> URL: https://issues.apache.org/jira/browse/FLINK-9054
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Configuration, Core
>Affects Versions: 1.4.2
>Reporter: dhiraj prajapati
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: flink-conf.yaml
>
>
> Hi,
> I have a flink cluster running on 2 machines, say A and B.
> Job manager is running on A. There are 2 TaksManagers, one on each node.
> So effectively, A has a job manager and a task manager, while B has a task 
> manager.
> When I submit a job to the cluster, I see below exception and the job fails:
> 2018-03-22 17:16:52,205 WARN 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while 
> emitting latency marker.
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486)
>  ... 10 more
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitLatencyMarker(OperatorChain.java:604)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486)
>  ... 14 more
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107)
>  at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138)
>  ... 19 more
>  
> The exception does not come when I run only one JobManager (only on machine 
> B).
>  
> I am attaching flink-conf.yaml



--
This message was sent by 

[GitHub] flink issue #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descriptors

2018-03-22 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5732
  
@StephanEwen Yes, I think this should go into 1.5.0 because it fixes 
potential (and real) problems. And yes, I wasn't suggesting to remove 
`initializeSerializerUnlessSet(ExecutionConfig)` now, but it seemed like a good 
place to mention it. 😃 


---


[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410004#comment-16410004
 ] 

ASF GitHub Bot commented on FLINK-8721:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176515090
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
--- End diff --

Not sure whether we should capture the `RuntimeException` here. To me a 
`supplier` should not throw `RuntimeExceptions` and if so, then it should not 
produce a `OptionalFailure` but instead fail with a `RuntimeException`.


> Client blocks indefinitely if job archiving fails
> -
>
> Key: FLINK-8721
> URL: https://issues.apache.org/jira/browse/FLINK-8721
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, JobManager
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> While porting the  {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if 
> the job archiving fails (in this case due to a custom accumulator throwing an 
> exception in #merge) no response is sent to the client.
> {code}
> 3547 [flink-akka.actor.default-dispatcher-2] ERROR 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor  - Caught exception 
> while executing runnable in main thread.
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113)
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107)
>   at 
> org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599)
>   at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex.(ArchivedExecutionJobVertex.java:49)
>   at 
> 

[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410009#comment-16410009
 ] 

ASF GitHub Bot commented on FLINK-8721:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176516910
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
+   LOG.error("Failed to archive accumulators", ex);
--- End diff --

The message indicates that `OptionalFailure` was implemented for the 
accumulators in mind, but I think it should be more generic. I guess that 
`AccumulatorHelper#67` is also the reason why we catch the `RuntimeException` 
to make the merge supplier as smooth as possible.


> Client blocks indefinitely if job archiving fails
> -
>
> Key: FLINK-8721
> URL: https://issues.apache.org/jira/browse/FLINK-8721
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, JobManager
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> While porting the  {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if 
> the job archiving fails (in this case due to a custom accumulator throwing an 
> exception in #merge) no response is sent to the client.
> {code}
> 3547 [flink-akka.actor.default-dispatcher-2] ERROR 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor  - Caught exception 
> while executing runnable in main thread.
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113)
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107)
>   at 
> org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599)
>   at 
> 

[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410010#comment-16410010
 ] 

ASF GitHub Bot commented on FLINK-8721:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176515638
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
+   LOG.error("Failed to archive accumulators", ex);
+   return OptionalFailure.ofFailure(ex);
+   }
+   }
+
+   /**
+* @return stored value or throw a {@link FlinkRuntimeException} with 
{@code failureCause}.
+*/
+   public T get() throws FlinkRuntimeException {
+   if (value != null) {
+   return value;
+   }
+   checkNotNull(failureCause);
+   throw new FlinkRuntimeException(failureCause);
+   }
+
+   public Throwable getFailureCause() {
+   return checkNotNull(failureCause);
+   }
+
+   public boolean isFailure() {
+   return failureCause != null;
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(value, failureCause);
+   }
+
+   @Override
+   public boolean equals(Object object) {
--- End diff --

why deviating from the super class' parameter name `obj`?


> Client blocks indefinitely if job archiving fails
> -
>
> Key: FLINK-8721
> URL: https://issues.apache.org/jira/browse/FLINK-8721
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, JobManager
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> While porting the  {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if 
> the job archiving fails (in this case due to a custom accumulator throwing an 
> exception in #merge) no response is sent to the client.
> {code}
> 3547 [flink-akka.actor.default-dispatcher-2] ERROR 
> 

[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410005#comment-16410005
 ] 

ASF GitHub Bot commented on FLINK-8721:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176515453
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
+   LOG.error("Failed to archive accumulators", ex);
+   return OptionalFailure.ofFailure(ex);
+   }
+   }
+
+   /**
+* @return stored value or throw a {@link FlinkRuntimeException} with 
{@code failureCause}.
+*/
+   public T get() throws FlinkRuntimeException {
--- End diff --

I think `get` should throw a checked exception and not an unchecked 
exception. Otherwise users won't be aware of it. We could provide a method 
`getUnchecked` where we throw an unchecked exception.


> Client blocks indefinitely if job archiving fails
> -
>
> Key: FLINK-8721
> URL: https://issues.apache.org/jira/browse/FLINK-8721
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, JobManager
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> While porting the  {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if 
> the job archiving fails (in this case due to a custom accumulator throwing an 
> exception in #merge) no response is sent to the client.
> {code}
> 3547 [flink-akka.actor.default-dispatcher-2] ERROR 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor  - Caught exception 
> while executing runnable in main thread.
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113)
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107)
>   at 
> 

[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410007#comment-16410007
 ] 

ASF GitHub Bot commented on FLINK-8721:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176514312
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
--- End diff --

This type is not serializable. I think you should mark it `transient` and 
then override `readObject` and `writeObject` similar to how `ArrayList` does it.


> Client blocks indefinitely if job archiving fails
> -
>
> Key: FLINK-8721
> URL: https://issues.apache.org/jira/browse/FLINK-8721
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, JobManager
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> While porting the  {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if 
> the job archiving fails (in this case due to a custom accumulator throwing an 
> exception in #merge) no response is sent to the client.
> {code}
> 3547 [flink-akka.actor.default-dispatcher-2] ERROR 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor  - Caught exception 
> while executing runnable in main thread.
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113)
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107)
>   at 
> org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599)
>   at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex.(ArchivedExecutionJobVertex.java:49)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.archive(ExecutionJobVertex.java:612)
>   at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:313)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:985)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.access$1400(JobMaster.java:136)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1181)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:292)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:147)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at 

[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410011#comment-16410011
 ] 

ASF GitHub Bot commented on FLINK-8721:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176517725
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
 ---
@@ -50,13 +51,13 @@
 
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS)
@JsonSerialize(contentUsing = SerializedValueSerializer.class)
-   private Map serializedUserAccumulators;
+   private Map> 
serializedUserAccumulators;
 
@JsonCreator
public JobAccumulatorsInfo(
-   @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) 
List jobAccumulators,
-   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List userAccumulators,
-   @JsonDeserialize(contentUsing = 
SerializedValueDeserializer.class) 
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map serializedUserAccumulators) {
+   @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List 
jobAccumulators,
+   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List userAccumulators,
+   @JsonDeserialize(contentUsing = 
SerializedValueDeserializer.class) 
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map> serializedUserAccumulators) {
--- End diff --

indentation is wrong


> Client blocks indefinitely if job archiving fails
> -
>
> Key: FLINK-8721
> URL: https://issues.apache.org/jira/browse/FLINK-8721
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, JobManager
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> While porting the  {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if 
> the job archiving fails (in this case due to a custom accumulator throwing an 
> exception in #merge) no response is sent to the client.
> {code}
> 3547 [flink-akka.actor.default-dispatcher-2] ERROR 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor  - Caught exception 
> while executing runnable in main thread.
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113)
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107)
>   at 
> org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599)
>   at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex.(ArchivedExecutionJobVertex.java:49)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.archive(ExecutionJobVertex.java:612)
>   at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:313)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:985)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.access$1400(JobMaster.java:136)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1181)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:292)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:147)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> 

[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410008#comment-16410008
 ] 

ASF GitHub Bot commented on FLINK-8721:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176515734
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
+   LOG.error("Failed to archive accumulators", ex);
+   return OptionalFailure.ofFailure(ex);
+   }
+   }
+
+   /**
+* @return stored value or throw a {@link FlinkRuntimeException} with 
{@code failureCause}.
+*/
+   public T get() throws FlinkRuntimeException {
+   if (value != null) {
+   return value;
+   }
+   checkNotNull(failureCause);
+   throw new FlinkRuntimeException(failureCause);
+   }
+
+   public Throwable getFailureCause() {
+   return checkNotNull(failureCause);
+   }
+
+   public boolean isFailure() {
+   return failureCause != null;
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(value, failureCause);
+   }
+
+   @Override
+   public boolean equals(Object object) {
+   if (object == null) {
+   return false;
+   }
+   if (object == this) {
+   return true;
+   }
+   if (!(object instanceof OptionalFailure)) {
+   return false;
+   }
+   OptionalFailure other = (OptionalFailure) object;
--- End diff --

Let's cast to `OptionalFailure`


> Client blocks indefinitely if job archiving fails
> -
>
> Key: FLINK-8721
> URL: https://issues.apache.org/jira/browse/FLINK-8721
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, JobManager
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: flip-6
> Fix 

[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410013#comment-16410013
 ] 

ASF GitHub Bot commented on FLINK-8721:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176518316
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
 ---
@@ -45,84 +47,87 @@
  *  b) are not compatible with existing accumulator.
  */
 public class AccumulatorErrorITCase extends TestLogger {
-
-   private static LocalFlinkMiniCluster cluster;
-
-   private static ExecutionEnvironment env;
-
-   @BeforeClass
-   public static void startCluster() {
+   private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
+   private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
+   private static final String INCOMPATIBLE_ACCUMULATORS_NAME = 
"incompatible-accumulators";
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResource.MiniClusterResourceConfiguration(
+   getConfiguration(),
+   2,
+   3));
+
+   public static Configuration getConfiguration() {
Configuration config = new Configuration();
-   config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
3);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-   cluster = new LocalFlinkMiniCluster(config, false);
-
-   cluster.start();
-
-   env = new TestEnvironment(cluster, 6, false);
-   }
-
-   @AfterClass
-   public static void shutdownCluster() {
-   cluster.stop();
-   cluster = null;
+   return config;
}
 
@Test
public void testFaultyAccumulator() throws Exception {
-
+   TestEnvironment env = 
MINI_CLUSTER_RESOURCE.getTestEnvironment();
--- End diff --

a no-op sink?


> Client blocks indefinitely if job archiving fails
> -
>
> Key: FLINK-8721
> URL: https://issues.apache.org/jira/browse/FLINK-8721
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, JobManager
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> While porting the  {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if 
> the job archiving fails (in this case due to a custom accumulator throwing an 
> exception in #merge) no response is sent to the client.
> {code}
> 3547 [flink-akka.actor.default-dispatcher-2] ERROR 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor  - Caught exception 
> while executing runnable in main thread.
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113)
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107)
>   at 
> org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599)
>   at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex.(ArchivedExecutionJobVertex.java:49)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.archive(ExecutionJobVertex.java:612)
>   at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:313)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:985)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.access$1400(JobMaster.java:136)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1181)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:292)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:147)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>  

[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410006#comment-16410006
 ] 

ASF GitHub Bot commented on FLINK-8721:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176517187
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
 ---
@@ -48,8 +49,9 @@
 * @param netRuntime The net runtime of the job (excluding pre-flight 
phase like the optimizer) in milliseconds
 * @param accumulators A map of all accumulator results produced by the 
job, in serialized form
 */
-   public SerializedJobExecutionResult(JobID jobID, long netRuntime,
-   
Map accumulators) {
+   public SerializedJobExecutionResult(JobID jobID,
+   
long netRuntime,
+   
Map> accumulators) {
--- End diff --

Something is with the indentation off here.


> Client blocks indefinitely if job archiving fails
> -
>
> Key: FLINK-8721
> URL: https://issues.apache.org/jira/browse/FLINK-8721
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, JobManager
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> While porting the  {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if 
> the job archiving fails (in this case due to a custom accumulator throwing an 
> exception in #merge) no response is sent to the client.
> {code}
> 3547 [flink-akka.actor.default-dispatcher-2] ERROR 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor  - Caught exception 
> while executing runnable in main thread.
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113)
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107)
>   at 
> org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599)
>   at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex.(ArchivedExecutionJobVertex.java:49)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.archive(ExecutionJobVertex.java:612)
>   at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:313)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:985)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.access$1400(JobMaster.java:136)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1181)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:292)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:147)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410012#comment-16410012
 ] 

ASF GitHub Bot commented on FLINK-8721:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176517944
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
 ---
@@ -21,83 +21,98 @@
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the SerializedJobExecutionResult
  */
 public class SerializedJobExecutionResultTest {
--- End diff --

`extends TestLogger` missing


> Client blocks indefinitely if job archiving fails
> -
>
> Key: FLINK-8721
> URL: https://issues.apache.org/jira/browse/FLINK-8721
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, JobManager
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> While porting the  {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if 
> the job archiving fails (in this case due to a custom accumulator throwing an 
> exception in #merge) no response is sent to the client.
> {code}
> 3547 [flink-akka.actor.default-dispatcher-2] ERROR 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor  - Caught exception 
> while executing runnable in main thread.
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113)
>   at 
> org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107)
>   at 
> org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599)
>   at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex.(ArchivedExecutionJobVertex.java:49)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.archive(ExecutionJobVertex.java:612)
>   at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:313)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:985)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.access$1400(JobMaster.java:136)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1181)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:292)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:147)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> 

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176515090
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
--- End diff --

Not sure whether we should capture the `RuntimeException` here. To me a 
`supplier` should not throw `RuntimeExceptions` and if so, then it should not 
produce a `OptionalFailure` but instead fail with a `RuntimeException`.


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176517944
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
 ---
@@ -21,83 +21,98 @@
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the SerializedJobExecutionResult
  */
 public class SerializedJobExecutionResultTest {
--- End diff --

`extends TestLogger` missing


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176515453
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
+   LOG.error("Failed to archive accumulators", ex);
+   return OptionalFailure.ofFailure(ex);
+   }
+   }
+
+   /**
+* @return stored value or throw a {@link FlinkRuntimeException} with 
{@code failureCause}.
+*/
+   public T get() throws FlinkRuntimeException {
--- End diff --

I think `get` should throw a checked exception and not an unchecked 
exception. Otherwise users won't be aware of it. We could provide a method 
`getUnchecked` where we throw an unchecked exception.


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176515734
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
+   LOG.error("Failed to archive accumulators", ex);
+   return OptionalFailure.ofFailure(ex);
+   }
+   }
+
+   /**
+* @return stored value or throw a {@link FlinkRuntimeException} with 
{@code failureCause}.
+*/
+   public T get() throws FlinkRuntimeException {
+   if (value != null) {
+   return value;
+   }
+   checkNotNull(failureCause);
+   throw new FlinkRuntimeException(failureCause);
+   }
+
+   public Throwable getFailureCause() {
+   return checkNotNull(failureCause);
+   }
+
+   public boolean isFailure() {
+   return failureCause != null;
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(value, failureCause);
+   }
+
+   @Override
+   public boolean equals(Object object) {
+   if (object == null) {
+   return false;
+   }
+   if (object == this) {
+   return true;
+   }
+   if (!(object instanceof OptionalFailure)) {
+   return false;
+   }
+   OptionalFailure other = (OptionalFailure) object;
--- End diff --

Let's cast to `OptionalFailure`


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176515638
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
+   LOG.error("Failed to archive accumulators", ex);
+   return OptionalFailure.ofFailure(ex);
+   }
+   }
+
+   /**
+* @return stored value or throw a {@link FlinkRuntimeException} with 
{@code failureCause}.
+*/
+   public T get() throws FlinkRuntimeException {
+   if (value != null) {
+   return value;
+   }
+   checkNotNull(failureCause);
+   throw new FlinkRuntimeException(failureCause);
+   }
+
+   public Throwable getFailureCause() {
+   return checkNotNull(failureCause);
+   }
+
+   public boolean isFailure() {
+   return failureCause != null;
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(value, failureCause);
+   }
+
+   @Override
+   public boolean equals(Object object) {
--- End diff --

why deviating from the super class' parameter name `obj`?


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176518316
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
 ---
@@ -45,84 +47,87 @@
  *  b) are not compatible with existing accumulator.
  */
 public class AccumulatorErrorITCase extends TestLogger {
-
-   private static LocalFlinkMiniCluster cluster;
-
-   private static ExecutionEnvironment env;
-
-   @BeforeClass
-   public static void startCluster() {
+   private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
+   private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
+   private static final String INCOMPATIBLE_ACCUMULATORS_NAME = 
"incompatible-accumulators";
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResource.MiniClusterResourceConfiguration(
+   getConfiguration(),
+   2,
+   3));
+
+   public static Configuration getConfiguration() {
Configuration config = new Configuration();
-   config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
3);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-   cluster = new LocalFlinkMiniCluster(config, false);
-
-   cluster.start();
-
-   env = new TestEnvironment(cluster, 6, false);
-   }
-
-   @AfterClass
-   public static void shutdownCluster() {
-   cluster.stop();
-   cluster = null;
+   return config;
}
 
@Test
public void testFaultyAccumulator() throws Exception {
-
+   TestEnvironment env = 
MINI_CLUSTER_RESOURCE.getTestEnvironment();
--- End diff --

a no-op sink?


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176516910
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
+   LOG.error("Failed to archive accumulators", ex);
--- End diff --

The message indicates that `OptionalFailure` was implemented for the 
accumulators in mind, but I think it should be more generic. I guess that 
`AccumulatorHelper#67` is also the reason why we catch the `RuntimeException` 
to make the merge supplier as smooth as possible.


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176517725
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
 ---
@@ -50,13 +51,13 @@
 
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS)
@JsonSerialize(contentUsing = SerializedValueSerializer.class)
-   private Map serializedUserAccumulators;
+   private Map> 
serializedUserAccumulators;
 
@JsonCreator
public JobAccumulatorsInfo(
-   @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) 
List jobAccumulators,
-   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List userAccumulators,
-   @JsonDeserialize(contentUsing = 
SerializedValueDeserializer.class) 
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map serializedUserAccumulators) {
+   @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List 
jobAccumulators,
+   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List userAccumulators,
+   @JsonDeserialize(contentUsing = 
SerializedValueDeserializer.class) 
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map> serializedUserAccumulators) {
--- End diff --

indentation is wrong


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176517187
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
 ---
@@ -48,8 +49,9 @@
 * @param netRuntime The net runtime of the job (excluding pre-flight 
phase like the optimizer) in milliseconds
 * @param accumulators A map of all accumulator results produced by the 
job, in serialized form
 */
-   public SerializedJobExecutionResult(JobID jobID, long netRuntime,
-   
Map accumulators) {
+   public SerializedJobExecutionResult(JobID jobID,
+   
long netRuntime,
+   
Map> accumulators) {
--- End diff --

Something is with the indentation off here.


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176514312
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
--- End diff --

This type is not serializable. I think you should mark it `transient` and 
then override `readObject` and `writeObject` similar to how `ArrayList` does it.


---


[jira] [Commented] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable

2018-03-22 Thread Ken Krugler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1640#comment-1640
 ] 

Ken Krugler commented on FLINK-9058:


Would it make sense to file a related enhancement issue, for the 
{{ListCheckpointed}} methods to take/return iterables to avoid wasteful memory 
allocations, for the case where the state isn't also an in-memory list?

> Relax ListState.addAll() and ListState.update() to take Iterable
> 
>
> Key: FLINK-9058
> URL: https://issues.apache.org/jira/browse/FLINK-9058
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.5.0
>
>
> [~srichter] What do you think about this. None of the implementations require 
> the parameter to actually be a list and allowing an {{Iterable}} there allows 
> calling it in situations where all you have is an {{Iterable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409985#comment-16409985
 ] 

ASF GitHub Bot commented on FLINK-9058:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5749

[FLINK-9058] Relax ListState.addAll() and ListState.update() to take 
Iterable

If we do this, we should do it before 1.5.0 because we are introducing the 
methods for the first time there.

R: @StefanRRichter 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jira-9058-list-state-iterable

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5749.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5749


commit 673cb2c0304ceb3cb9cc95c107bd00d37c59394b
Author: Aljoscha Krettek 
Date:   2018-03-22T17:53:10Z

[FLINK-9058] Relax ListState.addAll() and ListState.update() to take 
Iterable




> Relax ListState.addAll() and ListState.update() to take Iterable
> 
>
> Key: FLINK-9058
> URL: https://issues.apache.org/jira/browse/FLINK-9058
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.5.0
>
>
> [~srichter] What do you think about this. None of the implementations require 
> the parameter to actually be a list and allowing an {{Iterable}} there allows 
> calling it in situations where all you have is an {{Iterable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5749: [FLINK-9058] Relax ListState.addAll() and ListStat...

2018-03-22 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5749

[FLINK-9058] Relax ListState.addAll() and ListState.update() to take 
Iterable

If we do this, we should do it before 1.5.0 because we are introducing the 
methods for the first time there.

R: @StefanRRichter 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jira-9058-list-state-iterable

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5749.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5749


commit 673cb2c0304ceb3cb9cc95c107bd00d37c59394b
Author: Aljoscha Krettek 
Date:   2018-03-22T17:53:10Z

[FLINK-9058] Relax ListState.addAll() and ListState.update() to take 
Iterable




---


[jira] [Commented] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable

2018-03-22 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409966#comment-16409966
 ] 

Aljoscha Krettek commented on FLINK-9058:
-

And I would like to fix this before we release those new methods with 1.5.0.

> Relax ListState.addAll() and ListState.update() to take Iterable
> 
>
> Key: FLINK-9058
> URL: https://issues.apache.org/jira/browse/FLINK-9058
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.5.0
>
>
> [~srichter] What do you think about this. None of the implementations require 
> the parameter to actually be a list and allowing an {{Iterable}} there allows 
> calling it in situations where all you have is an {{Iterable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable

2018-03-22 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-9058:
---

 Summary: Relax ListState.addAll() and ListState.update() to take 
Iterable
 Key: FLINK-9058
 URL: https://issues.apache.org/jira/browse/FLINK-9058
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 1.5.0


[~srichter] What do you think about this. None of the implementations require 
the parameter to actually be a list and allowing an {{Iterable}} there allows 
calling it in situations where all you have is an {{Iterable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8562) Fix YARNSessionFIFOSecuredITCase

2018-03-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8562.
--
Resolution: Fixed

Fixed via
master:
37a114875afb9352e6f7b10e2729a94d0eeb72ee
b550ac67fbf525863d5812d9d2a1010672a0169b

1.5.0:
4b66514f2c31d5ea29493baf9d022a0115faf82d
9c105f2c982e511bc1274a86d629e7fa26cf7ac8

> Fix YARNSessionFIFOSecuredITCase
> 
>
> Key: FLINK-8562
> URL: https://issues.apache.org/jira/browse/FLINK-8562
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, YARNSessionFIFOSecuredITCase will not fail even if the current 
> Flink YARN Kerberos integration is failing in production. Please see 
> FLINK-8275.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8562) Fix YARNSessionFIFOSecuredITCase

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409949#comment-16409949
 ] 

ASF GitHub Bot commented on FLINK-8562:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5416


> Fix YARNSessionFIFOSecuredITCase
> 
>
> Key: FLINK-8562
> URL: https://issues.apache.org/jira/browse/FLINK-8562
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, YARNSessionFIFOSecuredITCase will not fail even if the current 
> Flink YARN Kerberos integration is failing in production. Please see 
> FLINK-8275.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5416: [FLINK-8562] [Security] Fix YARNSessionFIFOSecured...

2018-03-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5416


---


[jira] [Commented] (FLINK-8919) Add KeyedProcessFunctionWIthCleanupState

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409918#comment-16409918
 ] 

ASF GitHub Bot commented on FLINK-8919:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5680#discussion_r176499309
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators
+
+import java.lang.{Boolean => JBool}
+import scala.collection.JavaConversions._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.util.Collector
+import org.junit.Test
+import org.junit.Assert.assertArrayEquals
+
+class KeyedProcessFunctionWithCleanupStateTest extends HarnessTestBase {
+  @Test
+  def testNeedToCleanup(): Unit = {
+val queryConfig = new StreamQueryConfig()
--- End diff --

It would be good if the test would check that the state is actually cleared.


> Add KeyedProcessFunctionWIthCleanupState
> 
>
> Key: FLINK-8919
> URL: https://issues.apache.org/jira/browse/FLINK-8919
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Minor
> Fix For: 1.6.0
>
>
> ProcessFunctionWithCleanupState is a useful tool and I think we also need one 
> for the new KeyedProcessFunction api.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5680: [FLINK-8919] [Table API & SQL] Add KeyedProcessFun...

2018-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5680#discussion_r176499309
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators
+
+import java.lang.{Boolean => JBool}
+import scala.collection.JavaConversions._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.util.Collector
+import org.junit.Test
+import org.junit.Assert.assertArrayEquals
+
+class KeyedProcessFunctionWithCleanupStateTest extends HarnessTestBase {
+  @Test
+  def testNeedToCleanup(): Unit = {
+val queryConfig = new StreamQueryConfig()
--- End diff --

It would be good if the test would check that the state is actually cleared.


---


[jira] [Commented] (FLINK-8943) Jobs will not recover if DFS is temporarily unavailable

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409913#comment-16409913
 ] 

ASF GitHub Bot commented on FLINK-8943:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5746
  
Thanks for your reply ;)


> Jobs will not recover if DFS is temporarily unavailable
> ---
>
> Key: FLINK-8943
> URL: https://issues.apache.org/jira/browse/FLINK-8943
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip6
> Fix For: 1.5.0
>
>
> *Description*
> Job graphs will be recovered only once from the DFS. If the DFS is 
> unavailable at the recovery attempt, the jobs will simply be not running 
> until the master is restarted again.
> *Steps to reproduce*
> # Submit job on Flink Cluster with HDFS as HA storage dir.
> # Trigger job recovery by killing the master
> # Stop HDFS NameNode
> # Enable HDFS NameNode after job recovery is over
> # Verify that job is not running.
> *Expected behavior*
> The new master should fail fast and exit. The new master should re-attempt 
> the recovery.
> *Stacktrace*
> {noformat}
> 2018-03-14 14:01:37,704 ERROR 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Could not 
> recover the job graph for a41d50b6f3ac16a730dd12792a847c97.
> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph 
> from state handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates 
> that the retrieved state handle is broken. Try cleaning the state handle 
> store.
>   at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 
> to ip-172-31-32-118.eu-central-1.compute.internal:9000 failed on connection 
> exception: java.net.ConnectException: Connection refused; For more details 
> see:  http://wiki.apache.org/hadoop/ConnectionRefused
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
>   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>   at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>   at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
>   at 

[GitHub] flink issue #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recov...

2018-03-22 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5746
  
Thanks for your reply ;)


---


[jira] [Commented] (FLINK-8943) Jobs will not recover if DFS is temporarily unavailable

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409907#comment-16409907
 ] 

ASF GitHub Bot commented on FLINK-8943:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5746
  
please just ignore my question...


> Jobs will not recover if DFS is temporarily unavailable
> ---
>
> Key: FLINK-8943
> URL: https://issues.apache.org/jira/browse/FLINK-8943
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip6
> Fix For: 1.5.0
>
>
> *Description*
> Job graphs will be recovered only once from the DFS. If the DFS is 
> unavailable at the recovery attempt, the jobs will simply be not running 
> until the master is restarted again.
> *Steps to reproduce*
> # Submit job on Flink Cluster with HDFS as HA storage dir.
> # Trigger job recovery by killing the master
> # Stop HDFS NameNode
> # Enable HDFS NameNode after job recovery is over
> # Verify that job is not running.
> *Expected behavior*
> The new master should fail fast and exit. The new master should re-attempt 
> the recovery.
> *Stacktrace*
> {noformat}
> 2018-03-14 14:01:37,704 ERROR 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Could not 
> recover the job graph for a41d50b6f3ac16a730dd12792a847c97.
> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph 
> from state handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates 
> that the retrieved state handle is broken. Try cleaning the state handle 
> store.
>   at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 
> to ip-172-31-32-118.eu-central-1.compute.internal:9000 failed on connection 
> exception: java.net.ConnectException: Connection refused; For more details 
> see:  http://wiki.apache.org/hadoop/ConnectionRefused
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
>   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>   at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>   at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
>

[jira] [Commented] (FLINK-8943) Jobs will not recover if DFS is temporarily unavailable

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409908#comment-16409908
 ] 

ASF GitHub Bot commented on FLINK-8943:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5746
  
Yes @sihuazhou. In such a case the JM will terminate in order to let 
another JM try to recover the jobs.


> Jobs will not recover if DFS is temporarily unavailable
> ---
>
> Key: FLINK-8943
> URL: https://issues.apache.org/jira/browse/FLINK-8943
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip6
> Fix For: 1.5.0
>
>
> *Description*
> Job graphs will be recovered only once from the DFS. If the DFS is 
> unavailable at the recovery attempt, the jobs will simply be not running 
> until the master is restarted again.
> *Steps to reproduce*
> # Submit job on Flink Cluster with HDFS as HA storage dir.
> # Trigger job recovery by killing the master
> # Stop HDFS NameNode
> # Enable HDFS NameNode after job recovery is over
> # Verify that job is not running.
> *Expected behavior*
> The new master should fail fast and exit. The new master should re-attempt 
> the recovery.
> *Stacktrace*
> {noformat}
> 2018-03-14 14:01:37,704 ERROR 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Could not 
> recover the job graph for a41d50b6f3ac16a730dd12792a847c97.
> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph 
> from state handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates 
> that the retrieved state handle is broken. Try cleaning the state handle 
> store.
>   at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 
> to ip-172-31-32-118.eu-central-1.compute.internal:9000 failed on connection 
> exception: java.net.ConnectException: Connection refused; For more details 
> see:  http://wiki.apache.org/hadoop/ConnectionRefused
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
>   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>   at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>   at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>   at 
> 

[GitHub] flink issue #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recov...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5746
  
Yes @sihuazhou. In such a case the JM will terminate in order to let 
another JM try to recover the jobs.


---


[GitHub] flink issue #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recov...

2018-03-22 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5746
  
please just ignore my question...


---


[jira] [Commented] (FLINK-9031) DataSet Job result changes when adding rebalance after union

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409899#comment-16409899
 ] 

ASF GitHub Bot commented on FLINK-9031:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5742
  
Updated the PR. 
Tests are passing (mod an unrelated failure).


> DataSet Job result changes when adding rebalance after union
> 
>
> Key: FLINK-9031
> URL: https://issues.apache.org/jira/browse/FLINK-9031
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Local Runtime, Optimizer
>Affects Versions: 1.3.1
>Reporter: Fabian Hueske
>Priority: Critical
> Attachments: Person.java, RunAll.java, newplan.txt, oldplan.txt
>
>
> A user [reported this issue on the user mailing 
> list|https://lists.apache.org/thread.html/075f1a487b044079b5d61f199439cb77dd4174bd425bcb3327ed7dfc@%3Cuser.flink.apache.org%3E].
> {quote}I am using Flink 1.3.1 and I have found a strange behavior on running 
> the following logic:
>  # Read data from file and store into DataSet
>  # Split dataset in two, by checking if "field1" of POJOs is empty or not, so 
> that the first dataset contains only elements with non empty "field1", and 
> the second dataset will contain the other elements.
>  # Each dataset is then grouped by, one by "field1" and other by another 
> field, and subsequently reduced.
>  # The 2 datasets are merged together by union.
>  # The final dataset is written as json.
> What I was expected, from output, was to find only one element with a 
> specific value of "field1" because:
>  # Reducing the first dataset grouped by "field1" should generate only one 
> element with a specific value of "field1".
>  # The second dataset should contain only elements with empty "field1".
>  # Making an union of them should not duplicate any record.
> This does not happen. When i read the generated jsons i see some duplicate 
> (non empty) values of "field1".
>  Strangely this does not happen when the union between the two datasets is 
> not computed. In this case the first dataset produces elements only with 
> distinct values of "field1", while second dataset produces only records with 
> empty field "value1".
> {quote}
> The user has not enable object reuse.
> Later he reports that the problem disappears when he injects a rebalance() 
> after a union resolves the problem. I had a look at the execution plans for 
> both cases (attached to this issue) but could not identify a problem.
> Hence I assume, this might be an issue with the runtime code but we need to 
> look deeper into this. The user also provided an example program consisting 
> of two classes which are attached to the issue as well.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5742: [FLINK-9031] Fix DataSet Union operator translation bug.

2018-03-22 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5742
  
Updated the PR. 
Tests are passing (mod an unrelated failure).


---


[jira] [Commented] (FLINK-8943) Jobs will not recover if DFS is temporarily unavailable

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409888#comment-16409888
 ] 

ASF GitHub Bot commented on FLINK-8943:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5746
  
Hi @tillrohrmann there is one question I want ask about this PR, is it 
means that in HA mode we can't tolerant jobs partial broken?


> Jobs will not recover if DFS is temporarily unavailable
> ---
>
> Key: FLINK-8943
> URL: https://issues.apache.org/jira/browse/FLINK-8943
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip6
> Fix For: 1.5.0
>
>
> *Description*
> Job graphs will be recovered only once from the DFS. If the DFS is 
> unavailable at the recovery attempt, the jobs will simply be not running 
> until the master is restarted again.
> *Steps to reproduce*
> # Submit job on Flink Cluster with HDFS as HA storage dir.
> # Trigger job recovery by killing the master
> # Stop HDFS NameNode
> # Enable HDFS NameNode after job recovery is over
> # Verify that job is not running.
> *Expected behavior*
> The new master should fail fast and exit. The new master should re-attempt 
> the recovery.
> *Stacktrace*
> {noformat}
> 2018-03-14 14:01:37,704 ERROR 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Could not 
> recover the job graph for a41d50b6f3ac16a730dd12792a847c97.
> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph 
> from state handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates 
> that the retrieved state handle is broken. Try cleaning the state handle 
> store.
>   at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 
> to ip-172-31-32-118.eu-central-1.compute.internal:9000 failed on connection 
> exception: java.net.ConnectException: Connection refused; For more details 
> see:  http://wiki.apache.org/hadoop/ConnectionRefused
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
>   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>   at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>   at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>   

[GitHub] flink issue #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recov...

2018-03-22 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5746
  
Hi @tillrohrmann there is one question I want ask about this PR, is it 
means that in HA mode we can't tolerant jobs partial broken?


---


[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...

2018-03-22 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5624
  
@StephanEwen unfortunately not, for example:
`org.apache.flink.runtime.fs.hdfs.HadoopFileSystem#create()` -> 
`org.apache.hadoop.fs.FileSystem#create()` -> 
`org.apache.hadoop.fs.s3a.S3AFileSystem#create()` and this (depending on the 
Hadoop version, of course) may call this:
```
  // get the status or throw an FNFE
  status = getFileStatus(f);

  // if the thread reaches here, there is something at the path
  if (status.isDirectory()) {
...
```


---


[jira] [Commented] (FLINK-8402) HadoopS3FileSystemITCase.testDirectoryListing fails on Travis

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409887#comment-16409887
 ] 

ASF GitHub Bot commented on FLINK-8402:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5624
  
@StephanEwen unfortunately not, for example:
`org.apache.flink.runtime.fs.hdfs.HadoopFileSystem#create()` -> 
`org.apache.hadoop.fs.FileSystem#create()` -> 
`org.apache.hadoop.fs.s3a.S3AFileSystem#create()` and this (depending on the 
Hadoop version, of course) may call this:
```
  // get the status or throw an FNFE
  status = getFileStatus(f);

  // if the thread reaches here, there is something at the path
  if (status.isDirectory()) {
...
```


> HadoopS3FileSystemITCase.testDirectoryListing fails on Travis
> -
>
> Key: FLINK-8402
> URL: https://issues.apache.org/jira/browse/FLINK-8402
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.3
>
>
> The test {{HadoopS3FileSystemITCase.testDirectoryListing}} fails on Travis.
> https://travis-ci.org/tillrohrmann/flink/jobs/327021175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8402) HadoopS3FileSystemITCase.testDirectoryListing fails on Travis

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409866#comment-16409866
 ] 

ASF GitHub Bot commented on FLINK-8402:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5624#discussion_r176492623
  
--- Diff: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
 ---
@@ -143,7 +157,20 @@ public void testDirectoryListing() throws Exception {
fs.delete(directory, true);
}
 
-   // now directory must be gone
-   assertFalse(fs.exists(directory));
+   // now directory must be gone (this is eventually-consistent, 
though!)
+   checkPathExists(fs, directory, false, deadline);
+   }
+
+   private static void checkPathExists(
--- End diff --

makes sense - I created `org.apache.flink.core.fs.FileSystemTestUtils` for 
this helper method


> HadoopS3FileSystemITCase.testDirectoryListing fails on Travis
> -
>
> Key: FLINK-8402
> URL: https://issues.apache.org/jira/browse/FLINK-8402
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.3
>
>
> The test {{HadoopS3FileSystemITCase.testDirectoryListing}} fails on Travis.
> https://travis-ci.org/tillrohrmann/flink/jobs/327021175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT ca...

2018-03-22 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5624#discussion_r176492623
  
--- Diff: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
 ---
@@ -143,7 +157,20 @@ public void testDirectoryListing() throws Exception {
fs.delete(directory, true);
}
 
-   // now directory must be gone
-   assertFalse(fs.exists(directory));
+   // now directory must be gone (this is eventually-consistent, 
though!)
+   checkPathExists(fs, directory, false, deadline);
+   }
+
+   private static void checkPathExists(
--- End diff --

makes sense - I created `org.apache.flink.core.fs.FileSystemTestUtils` for 
this helper method


---


[jira] [Commented] (FLINK-8964) Port JobSubmissionFailsITCase to flip6

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409841#comment-16409841
 ] 

ASF GitHub Bot commented on FLINK-8964:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5727#discussion_r176489375
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
 ---
@@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) {
 
// 

 
-   private JobExecutionResult submitJob(JobGraph jobGraph) throws 
Exception {
-   if (detached) {
-   cluster.submitJobDetached(jobGraph);
-   return null;
-   }
-   else {
-   return cluster.submitJobAndWait(jobGraph, false, 
TestingUtils.TESTING_DURATION());
-   }
-   }
-
@Test
-   public void testExceptionInInitializeOnMaster() {
-   try {
-   final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
-   failingJobVertex.setInvokableClass(NoOpInvokable.class);
-
-   final JobGraph failingJobGraph = new JobGraph("Failing 
testing job", failingJobVertex);
+   public void testExceptionInInitializeOnMaster() throws Exception {
+   final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
+   failingJobVertex.setInvokableClass(NoOpInvokable.class);
 
-   try {
-   submitJob(failingJobGraph);
-   fail("Expected JobExecutionException.");
-   }
-   catch (JobExecutionException e) {
-   assertEquals("Test exception.", 
e.getCause().getMessage());
-   }
-   catch (Throwable t) {
-   t.printStackTrace();
-   fail("Caught wrong exception of type " + 
t.getClass() + ".");
-   }
+   final JobGraph failingJobGraph = new JobGraph("Failing testing 
job", failingJobVertex);
 
-   cluster.submitJobAndWait(workingJobGraph, false);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
-   }
-   }
+   ClusterClient client = 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   client.setDetached(detached);
 
-   @Test
-   public void testSubmitEmptyJobGraph() {
try {
-   final JobGraph jobGraph = new JobGraph("Testing job");
-
-   try {
-   submitJob(jobGraph);
-   fail("Expected JobSubmissionException.");
-   }
-   catch (JobSubmissionException e) {
-   assertTrue(e.getMessage() != null && 
e.getMessage().contains("empty"));
+   client.submitJob(failingJobGraph, 
JobSubmissionFailsITCase.class.getClassLoader());
+   fail("Job submission should have thrown an exception.");
+   } catch (Exception e) {
+   Optional expectedCause = 
ExceptionUtils.findThrowable(e,
+   candidate -> candidate.getMessage() != null && 
candidate.getMessage().equals("Test exception."));
+   if (!expectedCause.isPresent()) {
+   throw e;
}
-   catch (Throwable t) {
-   t.printStackTrace();
-   fail("Caught wrong exception of type " + 
t.getClass() + ".");
-   }
-
-   cluster.submitJobAndWait(workingJobGraph, false);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
}
+
+   client.setDetached(false);
+   client.submitJob(getWorkingJobGraph(), 
JobSubmissionFailsITCase.class.getClassLoader());
--- End diff --

Alright, it's because of the `RunningJobsRegistry` which records that a 
previous job with the same `JobID` has already been executed.


> Port JobSubmissionFailsITCase to flip6
> --
>
> Key: FLINK-8964
> URL: https://issues.apache.org/jira/browse/FLINK-8964
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>  

[GitHub] flink pull request #5727: [FLINK-8964][tests] Port JobSubmissionFailsITCase ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5727#discussion_r176489375
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
 ---
@@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) {
 
// 

 
-   private JobExecutionResult submitJob(JobGraph jobGraph) throws 
Exception {
-   if (detached) {
-   cluster.submitJobDetached(jobGraph);
-   return null;
-   }
-   else {
-   return cluster.submitJobAndWait(jobGraph, false, 
TestingUtils.TESTING_DURATION());
-   }
-   }
-
@Test
-   public void testExceptionInInitializeOnMaster() {
-   try {
-   final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
-   failingJobVertex.setInvokableClass(NoOpInvokable.class);
-
-   final JobGraph failingJobGraph = new JobGraph("Failing 
testing job", failingJobVertex);
+   public void testExceptionInInitializeOnMaster() throws Exception {
+   final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
+   failingJobVertex.setInvokableClass(NoOpInvokable.class);
 
-   try {
-   submitJob(failingJobGraph);
-   fail("Expected JobExecutionException.");
-   }
-   catch (JobExecutionException e) {
-   assertEquals("Test exception.", 
e.getCause().getMessage());
-   }
-   catch (Throwable t) {
-   t.printStackTrace();
-   fail("Caught wrong exception of type " + 
t.getClass() + ".");
-   }
+   final JobGraph failingJobGraph = new JobGraph("Failing testing 
job", failingJobVertex);
 
-   cluster.submitJobAndWait(workingJobGraph, false);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
-   }
-   }
+   ClusterClient client = 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   client.setDetached(detached);
 
-   @Test
-   public void testSubmitEmptyJobGraph() {
try {
-   final JobGraph jobGraph = new JobGraph("Testing job");
-
-   try {
-   submitJob(jobGraph);
-   fail("Expected JobSubmissionException.");
-   }
-   catch (JobSubmissionException e) {
-   assertTrue(e.getMessage() != null && 
e.getMessage().contains("empty"));
+   client.submitJob(failingJobGraph, 
JobSubmissionFailsITCase.class.getClassLoader());
+   fail("Job submission should have thrown an exception.");
+   } catch (Exception e) {
+   Optional expectedCause = 
ExceptionUtils.findThrowable(e,
+   candidate -> candidate.getMessage() != null && 
candidate.getMessage().equals("Test exception."));
+   if (!expectedCause.isPresent()) {
+   throw e;
}
-   catch (Throwable t) {
-   t.printStackTrace();
-   fail("Caught wrong exception of type " + 
t.getClass() + ".");
-   }
-
-   cluster.submitJobAndWait(workingJobGraph, false);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
}
+
+   client.setDetached(false);
+   client.submitJob(getWorkingJobGraph(), 
JobSubmissionFailsITCase.class.getClassLoader());
--- End diff --

Alright, it's because of the `RunningJobsRegistry` which records that a 
previous job with the same `JobID` has already been executed.


---


[jira] [Commented] (FLINK-8964) Port JobSubmissionFailsITCase to flip6

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409832#comment-16409832
 ] 

ASF GitHub Bot commented on FLINK-8964:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5727#discussion_r176487399
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
 ---
@@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) {
 
// 

 
-   private JobExecutionResult submitJob(JobGraph jobGraph) throws 
Exception {
-   if (detached) {
-   cluster.submitJobDetached(jobGraph);
-   return null;
-   }
-   else {
-   return cluster.submitJobAndWait(jobGraph, false, 
TestingUtils.TESTING_DURATION());
-   }
-   }
-
@Test
-   public void testExceptionInInitializeOnMaster() {
-   try {
-   final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
-   failingJobVertex.setInvokableClass(NoOpInvokable.class);
-
-   final JobGraph failingJobGraph = new JobGraph("Failing 
testing job", failingJobVertex);
+   public void testExceptionInInitializeOnMaster() throws Exception {
+   final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
+   failingJobVertex.setInvokableClass(NoOpInvokable.class);
 
-   try {
-   submitJob(failingJobGraph);
-   fail("Expected JobExecutionException.");
-   }
-   catch (JobExecutionException e) {
-   assertEquals("Test exception.", 
e.getCause().getMessage());
-   }
-   catch (Throwable t) {
-   t.printStackTrace();
-   fail("Caught wrong exception of type " + 
t.getClass() + ".");
-   }
+   final JobGraph failingJobGraph = new JobGraph("Failing testing 
job", failingJobVertex);
 
-   cluster.submitJobAndWait(workingJobGraph, false);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
-   }
-   }
+   ClusterClient client = 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   client.setDetached(detached);
 
-   @Test
-   public void testSubmitEmptyJobGraph() {
try {
-   final JobGraph jobGraph = new JobGraph("Testing job");
-
-   try {
-   submitJob(jobGraph);
-   fail("Expected JobSubmissionException.");
-   }
-   catch (JobSubmissionException e) {
-   assertTrue(e.getMessage() != null && 
e.getMessage().contains("empty"));
+   client.submitJob(failingJobGraph, 
JobSubmissionFailsITCase.class.getClassLoader());
+   fail("Job submission should have thrown an exception.");
+   } catch (Exception e) {
+   Optional expectedCause = 
ExceptionUtils.findThrowable(e,
+   candidate -> candidate.getMessage() != null && 
candidate.getMessage().equals("Test exception."));
+   if (!expectedCause.isPresent()) {
+   throw e;
}
-   catch (Throwable t) {
-   t.printStackTrace();
-   fail("Caught wrong exception of type " + 
t.getClass() + ".");
-   }
-
-   cluster.submitJobAndWait(workingJobGraph, false);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
}
+
+   client.setDetached(false);
+   client.submitJob(getWorkingJobGraph(), 
JobSubmissionFailsITCase.class.getClassLoader());
--- End diff --

Why didn't it work to submit twice the same `JobGraph`?


> Port JobSubmissionFailsITCase to flip6
> --
>
> Key: FLINK-8964
> URL: https://issues.apache.org/jira/browse/FLINK-8964
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> 

[GitHub] flink pull request #5727: [FLINK-8964][tests] Port JobSubmissionFailsITCase ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5727#discussion_r176487399
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
 ---
@@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) {
 
// 

 
-   private JobExecutionResult submitJob(JobGraph jobGraph) throws 
Exception {
-   if (detached) {
-   cluster.submitJobDetached(jobGraph);
-   return null;
-   }
-   else {
-   return cluster.submitJobAndWait(jobGraph, false, 
TestingUtils.TESTING_DURATION());
-   }
-   }
-
@Test
-   public void testExceptionInInitializeOnMaster() {
-   try {
-   final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
-   failingJobVertex.setInvokableClass(NoOpInvokable.class);
-
-   final JobGraph failingJobGraph = new JobGraph("Failing 
testing job", failingJobVertex);
+   public void testExceptionInInitializeOnMaster() throws Exception {
+   final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
+   failingJobVertex.setInvokableClass(NoOpInvokable.class);
 
-   try {
-   submitJob(failingJobGraph);
-   fail("Expected JobExecutionException.");
-   }
-   catch (JobExecutionException e) {
-   assertEquals("Test exception.", 
e.getCause().getMessage());
-   }
-   catch (Throwable t) {
-   t.printStackTrace();
-   fail("Caught wrong exception of type " + 
t.getClass() + ".");
-   }
+   final JobGraph failingJobGraph = new JobGraph("Failing testing 
job", failingJobVertex);
 
-   cluster.submitJobAndWait(workingJobGraph, false);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
-   }
-   }
+   ClusterClient client = 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   client.setDetached(detached);
 
-   @Test
-   public void testSubmitEmptyJobGraph() {
try {
-   final JobGraph jobGraph = new JobGraph("Testing job");
-
-   try {
-   submitJob(jobGraph);
-   fail("Expected JobSubmissionException.");
-   }
-   catch (JobSubmissionException e) {
-   assertTrue(e.getMessage() != null && 
e.getMessage().contains("empty"));
+   client.submitJob(failingJobGraph, 
JobSubmissionFailsITCase.class.getClassLoader());
+   fail("Job submission should have thrown an exception.");
+   } catch (Exception e) {
+   Optional expectedCause = 
ExceptionUtils.findThrowable(e,
+   candidate -> candidate.getMessage() != null && 
candidate.getMessage().equals("Test exception."));
+   if (!expectedCause.isPresent()) {
+   throw e;
}
-   catch (Throwable t) {
-   t.printStackTrace();
-   fail("Caught wrong exception of type " + 
t.getClass() + ".");
-   }
-
-   cluster.submitJobAndWait(workingJobGraph, false);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
}
+
+   client.setDetached(false);
+   client.submitJob(getWorkingJobGraph(), 
JobSubmissionFailsITCase.class.getClassLoader());
--- End diff --

Why didn't it work to submit twice the same `JobGraph`?


---


[jira] [Commented] (FLINK-8964) Port JobSubmissionFailsITCase to flip6

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409833#comment-16409833
 ] 

ASF GitHub Bot commented on FLINK-8964:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5727#discussion_r176487184
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
 ---
@@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) {
 
// 

 
-   private JobExecutionResult submitJob(JobGraph jobGraph) throws 
Exception {
-   if (detached) {
-   cluster.submitJobDetached(jobGraph);
-   return null;
-   }
-   else {
-   return cluster.submitJobAndWait(jobGraph, false, 
TestingUtils.TESTING_DURATION());
-   }
-   }
-
@Test
-   public void testExceptionInInitializeOnMaster() {
-   try {
-   final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
-   failingJobVertex.setInvokableClass(NoOpInvokable.class);
-
-   final JobGraph failingJobGraph = new JobGraph("Failing 
testing job", failingJobVertex);
+   public void testExceptionInInitializeOnMaster() throws Exception {
+   final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
+   failingJobVertex.setInvokableClass(NoOpInvokable.class);
 
-   try {
-   submitJob(failingJobGraph);
-   fail("Expected JobExecutionException.");
-   }
-   catch (JobExecutionException e) {
-   assertEquals("Test exception.", 
e.getCause().getMessage());
-   }
-   catch (Throwable t) {
-   t.printStackTrace();
-   fail("Caught wrong exception of type " + 
t.getClass() + ".");
-   }
+   final JobGraph failingJobGraph = new JobGraph("Failing testing 
job", failingJobVertex);
 
-   cluster.submitJobAndWait(workingJobGraph, false);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
-   }
-   }
+   ClusterClient client = 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   client.setDetached(detached);
 
-   @Test
-   public void testSubmitEmptyJobGraph() {
try {
-   final JobGraph jobGraph = new JobGraph("Testing job");
-
-   try {
-   submitJob(jobGraph);
-   fail("Expected JobSubmissionException.");
-   }
-   catch (JobSubmissionException e) {
-   assertTrue(e.getMessage() != null && 
e.getMessage().contains("empty"));
+   client.submitJob(failingJobGraph, 
JobSubmissionFailsITCase.class.getClassLoader());
+   fail("Job submission should have thrown an exception.");
+   } catch (Exception e) {
+   Optional expectedCause = 
ExceptionUtils.findThrowable(e,
+   candidate -> candidate.getMessage() != null && 
candidate.getMessage().equals("Test exception."));
--- End diff --

could be simplified by `"Test exception.".equals(candidate.getMessage)`


> Port JobSubmissionFailsITCase to flip6
> --
>
> Key: FLINK-8964
> URL: https://issues.apache.org/jira/browse/FLINK-8964
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5727: [FLINK-8964][tests] Port JobSubmissionFailsITCase ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5727#discussion_r176487184
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
 ---
@@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) {
 
// 

 
-   private JobExecutionResult submitJob(JobGraph jobGraph) throws 
Exception {
-   if (detached) {
-   cluster.submitJobDetached(jobGraph);
-   return null;
-   }
-   else {
-   return cluster.submitJobAndWait(jobGraph, false, 
TestingUtils.TESTING_DURATION());
-   }
-   }
-
@Test
-   public void testExceptionInInitializeOnMaster() {
-   try {
-   final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
-   failingJobVertex.setInvokableClass(NoOpInvokable.class);
-
-   final JobGraph failingJobGraph = new JobGraph("Failing 
testing job", failingJobVertex);
+   public void testExceptionInInitializeOnMaster() throws Exception {
+   final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
+   failingJobVertex.setInvokableClass(NoOpInvokable.class);
 
-   try {
-   submitJob(failingJobGraph);
-   fail("Expected JobExecutionException.");
-   }
-   catch (JobExecutionException e) {
-   assertEquals("Test exception.", 
e.getCause().getMessage());
-   }
-   catch (Throwable t) {
-   t.printStackTrace();
-   fail("Caught wrong exception of type " + 
t.getClass() + ".");
-   }
+   final JobGraph failingJobGraph = new JobGraph("Failing testing 
job", failingJobVertex);
 
-   cluster.submitJobAndWait(workingJobGraph, false);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
-   }
-   }
+   ClusterClient client = 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   client.setDetached(detached);
 
-   @Test
-   public void testSubmitEmptyJobGraph() {
try {
-   final JobGraph jobGraph = new JobGraph("Testing job");
-
-   try {
-   submitJob(jobGraph);
-   fail("Expected JobSubmissionException.");
-   }
-   catch (JobSubmissionException e) {
-   assertTrue(e.getMessage() != null && 
e.getMessage().contains("empty"));
+   client.submitJob(failingJobGraph, 
JobSubmissionFailsITCase.class.getClassLoader());
+   fail("Job submission should have thrown an exception.");
+   } catch (Exception e) {
+   Optional expectedCause = 
ExceptionUtils.findThrowable(e,
+   candidate -> candidate.getMessage() != null && 
candidate.getMessage().equals("Test exception."));
--- End diff --

could be simplified by `"Test exception.".equals(candidate.getMessage)`


---


[GitHub] flink pull request #5696: [hotfix][javadoc] fix doc of SlotProvider.allocate...

2018-03-22 Thread sihuazhou
Github user sihuazhou closed the pull request at:

https://github.com/apache/flink/pull/5696


---


[jira] [Commented] (FLINK-9053) Exception in RecordWriter during cleanup of StreamTask with the checkpoint trigger running in parallel

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409803#comment-16409803
 ] 

ASF GitHub Bot commented on FLINK-9053:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5748

[FLINK-9053][runtime] only release outputs under the checkpoint lock

## What is the purpose of the change

Releasing an operator chain's outputs will call 
`RecordWriter#clearBuffers()` and this may not be run in parallel with 
`RecordWriter#broadcastEvent()` which the asynchronous checkpoint barrier 
trigger inside `Task` may run via `StreamTask#triggerCheckpoint()`. Now, during 
the cleanup of `StreamTask#invoke()`, `StreamTask`'s asynchronous services are 
shut down but not those of the `Task` and also `operatorChain.releaseOutputs()` 
is not put under the checkpoint lock. Therefore, the following may run in 
parallel:
- `Task`'s checkpoint trigger execution
- `operatorChain.releaseOutputs()`

We may guard `operatorChain.releaseOutputs()` with the the checkpoint lock 
and should be safe to do so since we already closed all of `StreamTask`'s 
asynchronous executors and also disposed the operators. Hence nothing should be 
blocking the cleanup by holding the checkpoint lock.
@StephanEwen can you please have a look to verify the safety of this?

## Brief change log

- add the checkpoint lock in the cleanup of `StreamTask#invoke()` around  
`operatorChain.releaseOutputs()`

## Verifying this change

This is a very rare race condition that was uncovered by the 
`RescalingITCase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-9053

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5748.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5748


commit 9f0295aa3c02e4870b248241cb9094d14863a686
Author: Stefan Richter 
Date:   2018-02-26T17:03:14Z

[hotfix] Improved logging for task local recovery

(cherry picked from commit 56c7560)

commit e9e13dec10f1a1ee57c46719d758885c4f33dcf3
Author: Stephan Ewen 
Date:   2018-02-27T15:53:03Z

[hotfix] [core] Suppress unused warning config options only used in shell 
scripts and doc generation.

commit a269f8519305faff153e84d729873b6f9497bd36
Author: Stephan Ewen 
Date:   2018-02-27T16:04:29Z

[FLINK-8798] [core] Make force 'commons-logging' to be parent-first loaded.

commit 1d26062de130c05fdbe7701b55766b4a8d433418
Author: Xingcan Cui 
Date:   2018-02-12T10:11:36Z

[FLINK-8538][table]Add a Kafka table source factory with JSON format support

commit db2c510fb4f171c9e9940759e5fbaf466ec74474
Author: Timo Walther 
Date:   2018-02-19T12:35:45Z

[FLINK-8538] [table] Improve unified table sources

This closes #5564.

commit 23358ff87003fd6603c0ca19bc37f31944d2c494
Author: Stephan Ewen 
Date:   2018-02-26T15:41:24Z

[FLINK-8791] [docs] Fix documentation about configuring dependencies

commit acf114793c708f0ab207008c25195f6f65796e5f
Author: gyao 
Date:   2018-02-21T15:02:01Z

[FLINK-8730][REST] JSON serialize entire SerializedThrowable

Do not only serialize the serialized exception but the entire
SerializedThrowable object. This makes it possible to throw the
SerializedThrowable itself without deserializing it.

This closes #5546.

commit 2f6cb37c775106bb684ef9c608585e7a72056460
Author: gyao 
Date:   2018-02-27T15:58:53Z

[FLINK-8787][flip6] Do not copy flinkConfiguration in 
AbstractYarnClusterDescriptor

This closes #5591.

commit 51d5bc6c5151c2aed3f932f84c35da43689501ec
Author: vinoyang 
Date:   2018-02-27T06:43:52Z

[FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to 
convertValueToString

This closes #5587.

commit 08e615027acd426537dc580139a61bd4082b7c3f
Author: Till Rohrmann 
Date:   2018-02-28T09:11:44Z

[FLINK-8792] [rest] Change 

[GitHub] flink pull request #5748: [FLINK-9053][runtime] only release outputs under t...

2018-03-22 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5748

[FLINK-9053][runtime] only release outputs under the checkpoint lock

## What is the purpose of the change

Releasing an operator chain's outputs will call 
`RecordWriter#clearBuffers()` and this may not be run in parallel with 
`RecordWriter#broadcastEvent()` which the asynchronous checkpoint barrier 
trigger inside `Task` may run via `StreamTask#triggerCheckpoint()`. Now, during 
the cleanup of `StreamTask#invoke()`, `StreamTask`'s asynchronous services are 
shut down but not those of the `Task` and also `operatorChain.releaseOutputs()` 
is not put under the checkpoint lock. Therefore, the following may run in 
parallel:
- `Task`'s checkpoint trigger execution
- `operatorChain.releaseOutputs()`

We may guard `operatorChain.releaseOutputs()` with the the checkpoint lock 
and should be safe to do so since we already closed all of `StreamTask`'s 
asynchronous executors and also disposed the operators. Hence nothing should be 
blocking the cleanup by holding the checkpoint lock.
@StephanEwen can you please have a look to verify the safety of this?

## Brief change log

- add the checkpoint lock in the cleanup of `StreamTask#invoke()` around  
`operatorChain.releaseOutputs()`

## Verifying this change

This is a very rare race condition that was uncovered by the 
`RescalingITCase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-9053

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5748.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5748


commit 9f0295aa3c02e4870b248241cb9094d14863a686
Author: Stefan Richter 
Date:   2018-02-26T17:03:14Z

[hotfix] Improved logging for task local recovery

(cherry picked from commit 56c7560)

commit e9e13dec10f1a1ee57c46719d758885c4f33dcf3
Author: Stephan Ewen 
Date:   2018-02-27T15:53:03Z

[hotfix] [core] Suppress unused warning config options only used in shell 
scripts and doc generation.

commit a269f8519305faff153e84d729873b6f9497bd36
Author: Stephan Ewen 
Date:   2018-02-27T16:04:29Z

[FLINK-8798] [core] Make force 'commons-logging' to be parent-first loaded.

commit 1d26062de130c05fdbe7701b55766b4a8d433418
Author: Xingcan Cui 
Date:   2018-02-12T10:11:36Z

[FLINK-8538][table]Add a Kafka table source factory with JSON format support

commit db2c510fb4f171c9e9940759e5fbaf466ec74474
Author: Timo Walther 
Date:   2018-02-19T12:35:45Z

[FLINK-8538] [table] Improve unified table sources

This closes #5564.

commit 23358ff87003fd6603c0ca19bc37f31944d2c494
Author: Stephan Ewen 
Date:   2018-02-26T15:41:24Z

[FLINK-8791] [docs] Fix documentation about configuring dependencies

commit acf114793c708f0ab207008c25195f6f65796e5f
Author: gyao 
Date:   2018-02-21T15:02:01Z

[FLINK-8730][REST] JSON serialize entire SerializedThrowable

Do not only serialize the serialized exception but the entire
SerializedThrowable object. This makes it possible to throw the
SerializedThrowable itself without deserializing it.

This closes #5546.

commit 2f6cb37c775106bb684ef9c608585e7a72056460
Author: gyao 
Date:   2018-02-27T15:58:53Z

[FLINK-8787][flip6] Do not copy flinkConfiguration in 
AbstractYarnClusterDescriptor

This closes #5591.

commit 51d5bc6c5151c2aed3f932f84c35da43689501ec
Author: vinoyang 
Date:   2018-02-27T06:43:52Z

[FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to 
convertValueToString

This closes #5587.

commit 08e615027acd426537dc580139a61bd4082b7c3f
Author: Till Rohrmann 
Date:   2018-02-28T09:11:44Z

[FLINK-8792] [rest] Change MessageQueryParameter#convertValueFromString to 
convertStringToValue

commit 302aaeb021bacf3f37cb9a3ee236304c94adbf30
Author: Timo Walther 
Date:   2018-02-22T16:22:54Z

[FLINK-8451] [serializers] Make Scala tuple 

[jira] [Commented] (FLINK-8957) Port JMXJobManagerMetricTest to flip6

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409800#comment-16409800
 ] 

ASF GitHub Bot commented on FLINK-8957:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5720#discussion_r176482195
  
--- Diff: 
flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 ---
@@ -92,28 +101,26 @@ public void testJobManagerJMXMetricAccess() throws 
Exception {
true),
null));
 
-   flink.waitForActorsToBeAlive();
-
-   flink.submitJobDetached(jobGraph);
+   ClusterClient client = 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   client.setDetached(true);
+   client.submitJob(jobGraph, 
JMXJobManagerMetricTest.class.getClassLoader());
 
-   Future jobRunning = 
flink.getLeaderGateway(deadline.timeLeft())
-   .ask(new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), 
deadline.timeLeft());
-   Await.ready(jobRunning, deadline.timeLeft());
+   FutureUtils.retrySuccesfulWithDelay(
+   () -> client.getJobStatus(jobGraph.getJobID()),
+   Time.milliseconds(10),
+   deadline,
+   status -> status == JobStatus.RUNNING,
--- End diff --

The check whether the job is running does not necessarily mean that all 
vertices are running. But I guess what we are waiting for is the initialization 
of the `CheckpointStatsTracker`.


> Port JMXJobManagerMetricTest to flip6
> -
>
> Key: FLINK-8957
> URL: https://issues.apache.org/jira/browse/FLINK-8957
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5720: [FLINK-8957][tests] Port JMXJobManagerMetricTest t...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5720#discussion_r176482195
  
--- Diff: 
flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 ---
@@ -92,28 +101,26 @@ public void testJobManagerJMXMetricAccess() throws 
Exception {
true),
null));
 
-   flink.waitForActorsToBeAlive();
-
-   flink.submitJobDetached(jobGraph);
+   ClusterClient client = 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   client.setDetached(true);
+   client.submitJob(jobGraph, 
JMXJobManagerMetricTest.class.getClassLoader());
 
-   Future jobRunning = 
flink.getLeaderGateway(deadline.timeLeft())
-   .ask(new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), 
deadline.timeLeft());
-   Await.ready(jobRunning, deadline.timeLeft());
+   FutureUtils.retrySuccesfulWithDelay(
+   () -> client.getJobStatus(jobGraph.getJobID()),
+   Time.milliseconds(10),
+   deadline,
+   status -> status == JobStatus.RUNNING,
--- End diff --

The check whether the job is running does not necessarily mean that all 
vertices are running. But I guess what we are waiting for is the initialization 
of the `CheckpointStatsTracker`.


---


[jira] [Assigned] (FLINK-9053) Exception in RecordWriter during cleanup of StreamTask with the checkpoint trigger running in parallel

2018-03-22 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber reassigned FLINK-9053:
--

Assignee: Nico Kruber

> Exception in RecordWriter during cleanup of StreamTask with the checkpoint 
> trigger running in parallel
> --
>
> Key: FLINK-9053
> URL: https://issues.apache.org/jira/browse/FLINK-9053
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/356612100
> {code}
> testSavepointRescalingOutKeyedStateDerivedMaxParallelism[0](org.apache.flink.test.checkpointing.RescalingITCase)
>   Time elapsed: 2.021 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.NoSuchElementException: No value present
>   at java.util.Optional.get(Optional.java:135)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.closeBufferBuilder(RecordWriter.java:218)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.clearBuffers(RecordWriter.java:178)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.close(StreamRecordWriter.java:99)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.close(RecordWriterOutput.java:161)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.releaseOutputs(OperatorChain.java:239)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:402)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9057) NPE in CreditBasedSequenceNumberingViewReader when cancelling before initilization was complete

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409786#comment-16409786
 ] 

ASF GitHub Bot commented on FLINK-9057:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5747

[FLINK-9057][network] fix an NPE when cleaning up before requesting a 
subpartition view

## What is the purpose of the change

In `PartitionRequestServerHandler`, the view reader is created and 
immediately afterwards added to the `PartitionRequestQueue` which would attempt 
a cleanup of the view reader's subpartition view. This view, however, is 
currently only created after adding the reader to the `PartitionRequestQueue` 
and may thus result in a `NullPointerException` if the cleanup happens very 
early in the initialization phase, e.g. due to failures.

## Brief change log

- call `NetworkSequenceViewReader#requestSubpartitionView` before calling  
`PartitionRequestQueue#notifyReaderCreated()`

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-9057

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5747.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5747


commit 3a34531f8d5fd2c4e71102f2d9d66105e55eb697
Author: Nico Kruber 
Date:   2018-03-22T12:49:45Z

[hotfix][tests] add a name to the parameter of RescalingITCase

commit 2e909f085bbc90f406eeae16efda15254c296c0e
Author: Nico Kruber 
Date:   2018-03-22T12:50:07Z

[FLINK-9057][network] fix an NPE when cleaning up before requesting a 
subpartition view




> NPE in CreditBasedSequenceNumberingViewReader when cancelling before 
> initilization was complete
> ---
>
> Key: FLINK-9057
> URL: https://issues.apache.org/jira/browse/FLINK-9057
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {{RescalingITCase}} unveiled an exception which may occur when shutting down 
> before completely initializing the network stack:
> https://travis-ci.org/apache/flink/jobs/356612100
> {code}
> 01:08:13,458 WARN  
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline  - An 
> exception was thrown by a user handler's exceptionCaught() method while 
> handling the following exception:
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.releaseAllResources(CreditBasedSequenceNumberingViewReader.java:192)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.releaseAllResources(PartitionRequestQueue.java:322)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.channelInactive(PartitionRequestQueue.java:298)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
> 

[GitHub] flink pull request #5747: [FLINK-9057][network] fix an NPE when cleaning up ...

2018-03-22 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5747

[FLINK-9057][network] fix an NPE when cleaning up before requesting a 
subpartition view

## What is the purpose of the change

In `PartitionRequestServerHandler`, the view reader is created and 
immediately afterwards added to the `PartitionRequestQueue` which would attempt 
a cleanup of the view reader's subpartition view. This view, however, is 
currently only created after adding the reader to the `PartitionRequestQueue` 
and may thus result in a `NullPointerException` if the cleanup happens very 
early in the initialization phase, e.g. due to failures.

## Brief change log

- call `NetworkSequenceViewReader#requestSubpartitionView` before calling  
`PartitionRequestQueue#notifyReaderCreated()`

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-9057

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5747.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5747


commit 3a34531f8d5fd2c4e71102f2d9d66105e55eb697
Author: Nico Kruber 
Date:   2018-03-22T12:49:45Z

[hotfix][tests] add a name to the parameter of RescalingITCase

commit 2e909f085bbc90f406eeae16efda15254c296c0e
Author: Nico Kruber 
Date:   2018-03-22T12:50:07Z

[FLINK-9057][network] fix an NPE when cleaning up before requesting a 
subpartition view




---


[jira] [Commented] (FLINK-8959) Port AccumulatorLiveITCase to flip6

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409777#comment-16409777
 ] 

ASF GitHub Bot commented on FLINK-8959:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5719
  
Good work @zentol. I guess only a rebase is missing. +1 for merging then.


> Port AccumulatorLiveITCase to flip6
> ---
>
> Key: FLINK-8959
> URL: https://issues.apache.org/jira/browse/FLINK-8959
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5719: [FLINK-8959][tests] Port AccumulatorLiveITCase to flip6

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5719
  
Good work @zentol. I guess only a rebase is missing. +1 for merging then.


---


[jira] [Updated] (FLINK-9053) Exception in RecordWriter during cleanup of StreamTask with the checkpoint trigger running in parallel

2018-03-22 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9053:
---
Summary: Exception in RecordWriter during cleanup of StreamTask with the 
checkpoint trigger running in parallel  (was: RescalingITCase failed on travis)

> Exception in RecordWriter during cleanup of StreamTask with the checkpoint 
> trigger running in parallel
> --
>
> Key: FLINK-9053
> URL: https://issues.apache.org/jira/browse/FLINK-9053
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/356612100
> {code}
> testSavepointRescalingOutKeyedStateDerivedMaxParallelism[0](org.apache.flink.test.checkpointing.RescalingITCase)
>   Time elapsed: 2.021 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.NoSuchElementException: No value present
>   at java.util.Optional.get(Optional.java:135)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.closeBufferBuilder(RecordWriter.java:218)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.clearBuffers(RecordWriter.java:178)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.close(StreamRecordWriter.java:99)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.close(RecordWriterOutput.java:161)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.releaseOutputs(OperatorChain.java:239)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:402)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9053) RescalingITCase failed on travis

2018-03-22 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9053:
---
Affects Version/s: 1.6.0

> RescalingITCase failed on travis
> 
>
> Key: FLINK-9053
> URL: https://issues.apache.org/jira/browse/FLINK-9053
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/356612100
> {code}
> testSavepointRescalingOutKeyedStateDerivedMaxParallelism[0](org.apache.flink.test.checkpointing.RescalingITCase)
>   Time elapsed: 2.021 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.NoSuchElementException: No value present
>   at java.util.Optional.get(Optional.java:135)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.closeBufferBuilder(RecordWriter.java:218)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.clearBuffers(RecordWriter.java:178)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.close(StreamRecordWriter.java:99)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.close(RecordWriterOutput.java:161)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.releaseOutputs(OperatorChain.java:239)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:402)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9053) RescalingITCase failed on travis

2018-03-22 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9053:
---
Component/s: (was: Tests)

> RescalingITCase failed on travis
> 
>
> Key: FLINK-9053
> URL: https://issues.apache.org/jira/browse/FLINK-9053
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/356612100
> {code}
> testSavepointRescalingOutKeyedStateDerivedMaxParallelism[0](org.apache.flink.test.checkpointing.RescalingITCase)
>   Time elapsed: 2.021 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.NoSuchElementException: No value present
>   at java.util.Optional.get(Optional.java:135)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.closeBufferBuilder(RecordWriter.java:218)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.clearBuffers(RecordWriter.java:178)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.close(StreamRecordWriter.java:99)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.close(RecordWriterOutput.java:161)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.releaseOutputs(OperatorChain.java:239)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:402)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9053) RescalingITCase failed on travis

2018-03-22 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9053:
---
Labels:   (was: test-stability)

> RescalingITCase failed on travis
> 
>
> Key: FLINK-9053
> URL: https://issues.apache.org/jira/browse/FLINK-9053
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/356612100
> {code}
> testSavepointRescalingOutKeyedStateDerivedMaxParallelism[0](org.apache.flink.test.checkpointing.RescalingITCase)
>   Time elapsed: 2.021 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.NoSuchElementException: No value present
>   at java.util.Optional.get(Optional.java:135)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.closeBufferBuilder(RecordWriter.java:218)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.clearBuffers(RecordWriter.java:178)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.close(StreamRecordWriter.java:99)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.close(RecordWriterOutput.java:161)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.releaseOutputs(OperatorChain.java:239)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:402)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9057) NPE in CreditBasedSequenceNumberingViewReader when cancelling before initilization was complete

2018-03-22 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-9057:
--

 Summary: NPE in CreditBasedSequenceNumberingViewReader when 
cancelling before initilization was complete
 Key: FLINK-9057
 URL: https://issues.apache.org/jira/browse/FLINK-9057
 Project: Flink
  Issue Type: Bug
  Components: Network
Affects Versions: 1.5.0, 1.6.0
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.5.0


{{RescalingITCase}} unveiled an exception which may occur when shutting down 
before completely initializing the network stack:

https://travis-ci.org/apache/flink/jobs/356612100
{code}
01:08:13,458 WARN  
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline  - An 
exception was thrown by a user handler's exceptionCaught() method while 
handling the following exception:
java.lang.NullPointerException
at 
org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.releaseAllResources(CreditBasedSequenceNumberingViewReader.java:192)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.releaseAllResources(PartitionRequestQueue.java:322)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.channelInactive(PartitionRequestQueue.java:298)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8956) Port RescalingITCase to flip6

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409766#comment-16409766
 ] 

ASF GitHub Bot commented on FLINK-8956:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5715#discussion_r176475682
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 ---
@@ -528,54 +454,44 @@ public void 
testSavepointRescalingPartitionedOperatorState(boolean scaleOut, Ope
}
 
try {
-   jobManager = 
cluster.getLeaderGateway(deadline.timeLeft());
-
JobGraph jobGraph = 
createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod);
 
-   jobID = jobGraph.getJobID();
-
-   cluster.submitJobDetached(jobGraph);
+   final JobID jobID = jobGraph.getJobID();
 
-   Object savepointResponse = null;
+   client.setDetached(true);
+   client.submitJob(jobGraph, 
RescalingITCase.class.getClassLoader());
 
// wait until the operator is started
StateSourceBase.workStartedLatch.await();
 
-   while (deadline.hasTimeLeft()) {
-
-   Future savepointPathFuture = 
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, 
Option.empty()), deadline.timeLeft());
-   FiniteDuration waitingTime = new 
FiniteDuration(10, TimeUnit.SECONDS);
-   savepointResponse = 
Await.result(savepointPathFuture, waitingTime);
-
-   if (savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess) {
-   break;
-   }
-   }
-
-   assertTrue(savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess);
-
-   final String savepointPath = 
((JobManagerMessages.TriggerSavepointSuccess) 
savepointResponse).savepointPath();
-
-   Future jobRemovedFuture = jobManager.ask(new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
-
-   Future cancellationResponseFuture = 
jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
-
-   Object cancellationResponse = 
Await.result(cancellationResponseFuture, deadline.timeLeft());
+   CompletableFuture savepointPathFuture = 
FutureUtils.retryWithDelay(
+   () -> {
+   try {
+   return 
client.triggerSavepoint(jobID, null);
+   } catch (FlinkException e) {
+   throw new RuntimeException(e);
--- End diff --

Shouldn't we return a exceptionally completed future here?


> Port RescalingITCase to flip6
> -
>
> Key: FLINK-8956
> URL: https://issues.apache.org/jira/browse/FLINK-8956
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5715: [FLINK-8956][tests] Port RescalingITCase to flip6

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5715#discussion_r176475682
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 ---
@@ -528,54 +454,44 @@ public void 
testSavepointRescalingPartitionedOperatorState(boolean scaleOut, Ope
}
 
try {
-   jobManager = 
cluster.getLeaderGateway(deadline.timeLeft());
-
JobGraph jobGraph = 
createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod);
 
-   jobID = jobGraph.getJobID();
-
-   cluster.submitJobDetached(jobGraph);
+   final JobID jobID = jobGraph.getJobID();
 
-   Object savepointResponse = null;
+   client.setDetached(true);
+   client.submitJob(jobGraph, 
RescalingITCase.class.getClassLoader());
 
// wait until the operator is started
StateSourceBase.workStartedLatch.await();
 
-   while (deadline.hasTimeLeft()) {
-
-   Future savepointPathFuture = 
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, 
Option.empty()), deadline.timeLeft());
-   FiniteDuration waitingTime = new 
FiniteDuration(10, TimeUnit.SECONDS);
-   savepointResponse = 
Await.result(savepointPathFuture, waitingTime);
-
-   if (savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess) {
-   break;
-   }
-   }
-
-   assertTrue(savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess);
-
-   final String savepointPath = 
((JobManagerMessages.TriggerSavepointSuccess) 
savepointResponse).savepointPath();
-
-   Future jobRemovedFuture = jobManager.ask(new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
-
-   Future cancellationResponseFuture = 
jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
-
-   Object cancellationResponse = 
Await.result(cancellationResponseFuture, deadline.timeLeft());
+   CompletableFuture savepointPathFuture = 
FutureUtils.retryWithDelay(
+   () -> {
+   try {
+   return 
client.triggerSavepoint(jobID, null);
+   } catch (FlinkException e) {
+   throw new RuntimeException(e);
--- End diff --

Shouldn't we return a exceptionally completed future here?


---


[jira] [Assigned] (FLINK-9055) WebUI shows job as Running although not enough resources are available

2018-03-22 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9055:
-

Assignee: Sihua Zhou

> WebUI shows job as Running although not enough resources are available
> --
>
> Key: FLINK-9055
> URL: https://issues.apache.org/jira/browse/FLINK-9055
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Webfrontend
>Affects Versions: 1.5.0
> Environment: * FLIP-6 enabled
>  * Local Flink instance with fixed number of TMs
>  * Job parallelism exceeds available slots
>Reporter: Fabian Hueske
>Assignee: Sihua Zhou
>Priority: Major
>
> The WebUI shows a (batch) job as "Running" although not enough resources have 
> been allocated to actually run the job with the requested parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8887) ClusterClient.getJobStatus can throw FencingTokenException

2018-03-22 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409738#comment-16409738
 ] 

Till Rohrmann edited comment on FLINK-8887 at 3/22/18 3:41 PM:
---

When did you observe the problem [~gjy]? I think this problem can always occur 
if you send a rpc while a leader change happens. However, such an event should 
not happen if the leader component is not killed, if I'm not mistaken.


was (Author: till.rohrmann):
When did you observe the problem [~gjy]? I think this problem can always occur 
if you send a rpc while a leader change happens. However, such an event should 
not happen if you don't kill the leader component, if I'm not mistaken.

> ClusterClient.getJobStatus can throw FencingTokenException
> --
>
> Key: FLINK-8887
> URL: https://issues.apache.org/jira/browse/FLINK-8887
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> *Description*
> Calling {{RestClusterClient.getJobStatus}} or 
> {{MiniClusterClient.getJobStatus}} can result in a {{FencingTokenException}}. 
> *Analysis*
> {{Dispatcher.requestJobStatus}} first looks the {{JobManagerRunner}} up by 
> job id. If a reference is found, {{requestJobStatus}} is called on the 
> respective instance. If not, the {{ArchivedExecutionGraphStore}} is queried. 
> However, between the lookup and the method call, the {{JobMaster}} of the 
> respective job may have lost leadership already (job finished), and has set 
> the fencing token to {{null}}.
> *Stacktrace*
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
> Fencing token mismatch: Ignoring message LocalFencedMessage(null, 
> LocalRpcInvocation(requestJobStatus(Time))) because the fencing token null 
> did not match the expected fencing token b8423c75bc6838244b8c93c8bd4a4f51.
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:73)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
> Fencing token not set: Ignoring message LocalFencedMessage(null, 
> LocalRpcInvocation(requestJobStatus(Time))) because the fencing token is null.
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:56)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8887) ClusterClient.getJobStatus can throw FencingTokenException

2018-03-22 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409738#comment-16409738
 ] 

Till Rohrmann commented on FLINK-8887:
--

When did you observe the problem [~gjy]? I think this problem can always occur 
if you send a rpc while a leader change happens. However, such an event should 
not happen if you don't kill the leader component, if I'm not mistaken.

> ClusterClient.getJobStatus can throw FencingTokenException
> --
>
> Key: FLINK-8887
> URL: https://issues.apache.org/jira/browse/FLINK-8887
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> *Description*
> Calling {{RestClusterClient.getJobStatus}} or 
> {{MiniClusterClient.getJobStatus}} can result in a {{FencingTokenException}}. 
> *Analysis*
> {{Dispatcher.requestJobStatus}} first looks the {{JobManagerRunner}} up by 
> job id. If a reference is found, {{requestJobStatus}} is called on the 
> respective instance. If not, the {{ArchivedExecutionGraphStore}} is queried. 
> However, between the lookup and the method call, the {{JobMaster}} of the 
> respective job may have lost leadership already (job finished), and has set 
> the fencing token to {{null}}.
> *Stacktrace*
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
> Fencing token mismatch: Ignoring message LocalFencedMessage(null, 
> LocalRpcInvocation(requestJobStatus(Time))) because the fencing token null 
> did not match the expected fencing token b8423c75bc6838244b8c93c8bd4a4f51.
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:73)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
> Fencing token not set: Ignoring message LocalFencedMessage(null, 
> LocalRpcInvocation(requestJobStatus(Time))) because the fencing token is null.
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:56)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >