[jira] [Commented] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable
[ https://issues.apache.org/jira/browse/FLINK-9058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410823#comment-16410823 ] Sihua Zhou commented on FLINK-9058: --- Shall we also relax the {{MapState.putAll()}} to take {{Iterable}}, it is an existing interface currently, could we deprecated it and introduce a new one? (If we will deprecated it later, it better to deprecated it earlier) > Relax ListState.addAll() and ListState.update() to take Iterable > > > Key: FLINK-9058 > URL: https://issues.apache.org/jira/browse/FLINK-9058 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.5.0 > > > [~srichter] What do you think about this. None of the implementations require > the parameter to actually be a list and allowing an {{Iterable}} there allows > calling it in situations where all you have is an {{Iterable}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6473) Add OVER window support for batch tables
[ https://issues.apache.org/jira/browse/FLINK-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410799#comment-16410799 ] yinhua.dai commented on FLINK-6473: --- [~fhueske] Wondering before this feature is implemented, how could I implement aggregation function for incremental window? For example, I want to calculate an average of price from the beginning to today for each day for a batch table. SELECT productId, AVG(price) OVER( PARTITION BY productId ROWS BETWEEN PRECEDING UNBOUNDED AND CURRENT ROW ) FROM product > Add OVER window support for batch tables > > > Key: FLINK-6473 > URL: https://issues.apache.org/jira/browse/FLINK-6473 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Priority: Major > > Add support for OVER windows for batch tables. > Since OVER windows are supported for streaming tables, this issue is not > about the API (which is available) but about adding the execution strategies > and translation for OVER windows on batch tables. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8988) End-to-end test: Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-8988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reassigned FLINK-8988: - Assignee: Shuyi Chen > End-to-end test: Cassandra connector > > > Key: FLINK-8988 > URL: https://issues.apache.org/jira/browse/FLINK-8988 > Project: Flink > Issue Type: Sub-task > Components: Cassandra Connector, Tests >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Major > > In order to test the integration with Cassandra, we should add an end-to-end > test which tests the Cassandra connector. In order to do this, we need to add > a script/function which sets up a {{Cassandra}} cluster. Then we can run a > simple job writing information to {{Cassandra}} using the > {{CassandraRowWriteAheadSink}} and the {{CassandraTupleWriteAheadSink}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8988) End-to-end test: Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-8988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410784#comment-16410784 ] Shuyi Chen commented on FLINK-8988: --- [~till.rohrmann] that sounds a good idea. I can help with it. > End-to-end test: Cassandra connector > > > Key: FLINK-8988 > URL: https://issues.apache.org/jira/browse/FLINK-8988 > Project: Flink > Issue Type: Sub-task > Components: Cassandra Connector, Tests >Reporter: Till Rohrmann >Priority: Major > > In order to test the integration with Cassandra, we should add an end-to-end > test which tests the Cassandra connector. In order to do this, we need to add > a script/function which sets up a {{Cassandra}} cluster. Then we can run a > simple job writing information to {{Cassandra}} using the > {{CassandraRowWriteAheadSink}} and the {{CassandraTupleWriteAheadSink}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8970) Add more automated end-to-end tests
[ https://issues.apache.org/jira/browse/FLINK-8970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410766#comment-16410766 ] ASF GitHub Bot commented on FLINK-8970: --- Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5750 Based on the umbrella task link FLINK-8970, it seems like this e2e test should be attached to FLINK-8973 instead? > Add more automated end-to-end tests > --- > > Key: FLINK-8970 > URL: https://issues.apache.org/jira/browse/FLINK-8970 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Till Rohrmann >Priority: Critical > > In order to improve Flink's test coverage and make releasing easier, we > should add more automated end-to-end tests which test Flink more like a user > would interact with the system. Additionally, these end-to-end tests should > test the integration of various other systems with Flink. > With FLINK-6539, we added a new module \{{flink-end-to-end-tests}} which > contains the set of currently available end-to-end tests. > With FLINK-8911, a script was added to trigger these tests. > > This issue is an umbrella issue collecting all different end-to-end tests > which we want to add to the Flink repository. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5750: [FLINK-8970] [E2E] HA end-to-end test with StateMachineEx...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5750 Based on the umbrella task link FLINK-8970, it seems like this e2e test should be attached to FLINK-8973 instead? ---
[jira] [Commented] (FLINK-9060) Deleting state using KeyedStateBackend.getKeys() throws Exception
[ https://issues.apache.org/jira/browse/FLINK-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410755#comment-16410755 ] ASF GitHub Bot commented on FLINK-9060: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5751 CC: @aljoscha @StefanRRichter > Deleting state using KeyedStateBackend.getKeys() throws Exception > - > > Key: FLINK-9060 > URL: https://issues.apache.org/jira/browse/FLINK-9060 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.0 > > > Adding this test to {{StateBackendTestBase}} showcases the problem: > {code} > @Test > public void testConcurrentModificationWithGetKeys() throws Exception { > AbstractKeyedStateBackend backend = > createKeyedBackend(IntSerializer.INSTANCE); > try { > ListStateDescriptor listStateDescriptor = > new ListStateDescriptor<>("foo", > StringSerializer.INSTANCE); > backend.setCurrentKey(1); > backend > .getPartitionedState(VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, listStateDescriptor) > .add("Hello"); > backend.setCurrentKey(2); > backend > .getPartitionedState(VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, listStateDescriptor) > .add("Ciao"); > Stream keys = backend > .getKeys(listStateDescriptor.getName(), > VoidNamespace.INSTANCE); > keys.forEach((key) -> { > backend.setCurrentKey(key); > try { > backend > .getPartitionedState( > VoidNamespace.INSTANCE, > > VoidNamespaceSerializer.INSTANCE, > listStateDescriptor) > .clear(); > } catch (Exception e) { > e.printStackTrace(); > } > }); > } > finally { > IOUtils.closeQuietly(backend); > backend.dispose(); > } > } > {code} > This should work because one of the use cases of {{getKeys()}} and > {{applyToAllKeys()}} is to do stuff for every key, which includes deleting > them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5751 CC: @aljoscha @StefanRRichter ---
[jira] [Commented] (FLINK-9060) Deleting state using KeyedStateBackend.getKeys() throws Exception
[ https://issues.apache.org/jira/browse/FLINK-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410753#comment-16410753 ] ASF GitHub Bot commented on FLINK-9060: --- GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/5751 [FLINK-9060][state] Deleting state using KeyedStateBackend.getKeys() throws Exception ## What is the purpose of the change This PR fixes the problem when deleting state using `KeyedStateBackend.getKeys()` throws Exception. ## Brief change log - copy the result of `getKeys()` into `list` to avoid concurrency problem. ## Verifying this change - *add a unit test in `StateBackendTest#testConcurrentModificationWithGetKeys()` to verify this* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation no You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink deletingStateUsingKeyedStateBackendGetKeys Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5751.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5751 commit cba3a32f3af16ee92676b1e5b82b21af6fee610d Author: sihuazhouDate: 2018-03-23T03:20:42Z fix concurrency risk in HeapKeyedStateBackend#getKeys(). > Deleting state using KeyedStateBackend.getKeys() throws Exception > - > > Key: FLINK-9060 > URL: https://issues.apache.org/jira/browse/FLINK-9060 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.0 > > > Adding this test to {{StateBackendTestBase}} showcases the problem: > {code} > @Test > public void testConcurrentModificationWithGetKeys() throws Exception { > AbstractKeyedStateBackend backend = > createKeyedBackend(IntSerializer.INSTANCE); > try { > ListStateDescriptor listStateDescriptor = > new ListStateDescriptor<>("foo", > StringSerializer.INSTANCE); > backend.setCurrentKey(1); > backend > .getPartitionedState(VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, listStateDescriptor) > .add("Hello"); > backend.setCurrentKey(2); > backend > .getPartitionedState(VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, listStateDescriptor) > .add("Ciao"); > Stream keys = backend > .getKeys(listStateDescriptor.getName(), > VoidNamespace.INSTANCE); > keys.forEach((key) -> { > backend.setCurrentKey(key); > try { > backend > .getPartitionedState( > VoidNamespace.INSTANCE, > > VoidNamespaceSerializer.INSTANCE, > listStateDescriptor) > .clear(); > } catch (Exception e) { > e.printStackTrace(); > } > }); > } > finally { > IOUtils.closeQuietly(backend); > backend.dispose(); > } > } > {code} > This should work because one of the use cases of {{getKeys()}} and > {{applyToAllKeys()}} is to do stuff for every key, which includes deleting > them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5751: [FLINK-9060][state] Deleting state using KeyedStat...
GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/5751 [FLINK-9060][state] Deleting state using KeyedStateBackend.getKeys() throws Exception ## What is the purpose of the change This PR fixes the problem when deleting state using `KeyedStateBackend.getKeys()` throws Exception. ## Brief change log - copy the result of `getKeys()` into `list` to avoid concurrency problem. ## Verifying this change - *add a unit test in `StateBackendTest#testConcurrentModificationWithGetKeys()` to verify this* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation no You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink deletingStateUsingKeyedStateBackendGetKeys Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5751.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5751 commit cba3a32f3af16ee92676b1e5b82b21af6fee610d Author: sihuazhouDate: 2018-03-23T03:20:42Z fix concurrency risk in HeapKeyedStateBackend#getKeys(). ---
[jira] [Assigned] (FLINK-7775) Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs
[ https://issues.apache.org/jira/browse/FLINK-7775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-7775: --- Assignee: vinoyang > Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs > --- > > Key: FLINK-7775 > URL: https://issues.apache.org/jira/browse/FLINK-7775 > Project: Flink > Issue Type: Task > Components: Local Runtime >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > {code} > public int getNumberOfCachedJobs() { > return jobRefCounters.size(); > } > {code} > The method of PermanentBlobCache is not used. > We should remove it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9057) NPE in CreditBasedSequenceNumberingViewReader when cancelling before initilization was complete
[ https://issues.apache.org/jira/browse/FLINK-9057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410688#comment-16410688 ] ASF GitHub Bot commented on FLINK-9057: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5747 Thanks for fixing this problem. The `notifyReaderCreated` should be called after both views are created correctly, otherwise it will cause inconsistent. LGTM > NPE in CreditBasedSequenceNumberingViewReader when cancelling before > initilization was complete > --- > > Key: FLINK-9057 > URL: https://issues.apache.org/jira/browse/FLINK-9057 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > {{RescalingITCase}} unveiled an exception which may occur when shutting down > before completely initializing the network stack: > https://travis-ci.org/apache/flink/jobs/356612100 > {code} > 01:08:13,458 WARN > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline - An > exception was thrown by a user handler's exceptionCaught() method while > handling the following exception: > java.lang.NullPointerException > at > org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.releaseAllResources(CreditBasedSequenceNumberingViewReader.java:192) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.releaseAllResources(PartitionRequestQueue.java:322) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.channelInactive(PartitionRequestQueue.java:298) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5747: [FLINK-9057][network] fix an NPE when cleaning up before ...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5747 Thanks for fixing this problem. The `notifyReaderCreated` should be called after both views are created correctly, otherwise it will cause inconsistent. LGTM ð ---
[jira] [Commented] (FLINK-9060) Deleting state using KeyedStateBackend.getKeys() throws Exception
[ https://issues.apache.org/jira/browse/FLINK-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410669#comment-16410669 ] Sihua Zhou commented on FLINK-9060: --- 1. For {{MemoryStateBackendTest}} this is because a concurrency problem in {{HeapKeyedStateBackend}}. 2. For {{RocksDBStateBackendTest}} this is because a bug of this test code. I'd like to take this ticket, if no one have already work on this. > Deleting state using KeyedStateBackend.getKeys() throws Exception > - > > Key: FLINK-9060 > URL: https://issues.apache.org/jira/browse/FLINK-9060 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.0 > > > Adding this test to {{StateBackendTestBase}} showcases the problem: > {code} > @Test > public void testConcurrentModificationWithGetKeys() throws Exception { > AbstractKeyedStateBackend backend = > createKeyedBackend(IntSerializer.INSTANCE); > try { > ListStateDescriptor listStateDescriptor = > new ListStateDescriptor<>("foo", > StringSerializer.INSTANCE); > backend.setCurrentKey(1); > backend > .getPartitionedState(VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, listStateDescriptor) > .add("Hello"); > backend.setCurrentKey(2); > backend > .getPartitionedState(VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, listStateDescriptor) > .add("Ciao"); > Stream keys = backend > .getKeys(listStateDescriptor.getName(), > VoidNamespace.INSTANCE); > keys.forEach((key) -> { > backend.setCurrentKey(key); > try { > backend > .getPartitionedState( > VoidNamespace.INSTANCE, > > VoidNamespaceSerializer.INSTANCE, > listStateDescriptor) > .clear(); > } catch (Exception e) { > e.printStackTrace(); > } > }); > } > finally { > IOUtils.closeQuietly(backend); > backend.dispose(); > } > } > {code} > This should work because one of the use cases of {{getKeys()}} and > {{applyToAllKeys()}} is to do stuff for every key, which includes deleting > them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5580 @zentol What do you think we still need now? I think this makes things easier for users. And we also need this for work on the Beam Flink Runner. ---
[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache
[ https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410665#comment-16410665 ] ASF GitHub Bot commented on FLINK-8620: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5580 @zentol What do you think we still need now? I think this makes things easier for users. And we also need this for work on the Beam Flink Runner. > Enable shipping custom artifacts to BlobStore and accessing them through > DistributedCache > - > > Key: FLINK-8620 > URL: https://issues.apache.org/jira/browse/FLINK-8620 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > > We should be able to distribute custom files to taskmanagers. To do that we > can store those files in BlobStore and later on access them in TaskManagers > through DistributedCache. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover
[ https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410660#comment-16410660 ] vinoyang commented on FLINK-8946: - [~till.rohrmann] yes, I will try to fix it. > TaskManager stop sending metrics after JobManager failover > -- > > Key: FLINK-8946 > URL: https://issues.apache.org/jira/browse/FLINK-8946 > Project: Flink > Issue Type: Bug > Components: Metrics, TaskManager >Affects Versions: 1.4.2 >Reporter: Truong Duc Kien >Assignee: vinoyang >Priority: Critical > Fix For: 1.5.0 > > > Running in Yarn-standalone mode, when the Job Manager performs a failover, > all TaskManager that are inherited from the previous JobManager will not send > metrics to the new JobManager and other registered metric reporters. > > A cursory glance reveal that these line of code might be the cause > [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086] > Perhap the TaskManager close its metrics group when disassociating > JobManager, but not creating a new one on fail-over association ? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9060) Deleting state using KeyedStateBackend.getKeys() throws Exception
[ https://issues.apache.org/jira/browse/FLINK-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9060: - Assignee: Sihua Zhou > Deleting state using KeyedStateBackend.getKeys() throws Exception > - > > Key: FLINK-9060 > URL: https://issues.apache.org/jira/browse/FLINK-9060 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.0 > > > Adding this test to {{StateBackendTestBase}} showcases the problem: > {code} > @Test > public void testConcurrentModificationWithGetKeys() throws Exception { > AbstractKeyedStateBackend backend = > createKeyedBackend(IntSerializer.INSTANCE); > try { > ListStateDescriptor listStateDescriptor = > new ListStateDescriptor<>("foo", > StringSerializer.INSTANCE); > backend.setCurrentKey(1); > backend > .getPartitionedState(VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, listStateDescriptor) > .add("Hello"); > backend.setCurrentKey(2); > backend > .getPartitionedState(VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, listStateDescriptor) > .add("Ciao"); > Stream keys = backend > .getKeys(listStateDescriptor.getName(), > VoidNamespace.INSTANCE); > keys.forEach((key) -> { > backend.setCurrentKey(key); > try { > backend > .getPartitionedState( > VoidNamespace.INSTANCE, > > VoidNamespaceSerializer.INSTANCE, > listStateDescriptor) > .clear(); > } catch (Exception e) { > e.printStackTrace(); > } > }); > } > finally { > IOUtils.closeQuietly(backend); > backend.dispose(); > } > } > {code} > This should work because one of the use cases of {{getKeys()}} and > {{applyToAllKeys()}} is to do stuff for every key, which includes deleting > them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
Jamie Grier created FLINK-9061: -- Summary: S3 checkpoint data not partitioned well -- causes errors and poor performance Key: FLINK-9061 URL: https://issues.apache.org/jira/browse/FLINK-9061 Project: Flink Issue Type: Bug Components: FileSystem, State Backends, Checkpointing Affects Versions: 1.4.2 Reporter: Jamie Grier I think we need to modify the way we write checkpoints to S3 for high-scale jobs (those with many total tasks). The issue is that we are writing all the checkpoint data under a common key prefix. This is the worst case scenario for S3 performance since the key is used as a partition key. In the worst case checkpoints fail with a 500 status code coming back from S3 and an internal error type of TooBusyException. One possible solution would be to add a hook in the Flink filesystem code that allows me to "rewrite" paths. For example say I have the checkpoint directory set to: s3://bucket/flink/checkpoints I would hook that and rewrite that path to: s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original path This would distribute the checkpoint write load around the S3 cluster evenly. For reference: https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ Any other people hit this issue? Any other ideas for solutions? This is a pretty serious problem for people trying to checkpoint to S3. -Jamie -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9060) Deleting state using KeyedStateBackend.getKeys() throws Exception
Aljoscha Krettek created FLINK-9060: --- Summary: Deleting state using KeyedStateBackend.getKeys() throws Exception Key: FLINK-9060 URL: https://issues.apache.org/jira/browse/FLINK-9060 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Aljoscha Krettek Fix For: 1.5.0 Adding this test to {{StateBackendTestBase}} showcases the problem: {code} @Test public void testConcurrentModificationWithGetKeys() throws Exception { AbstractKeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); try { ListStateDescriptor listStateDescriptor = new ListStateDescriptor<>("foo", StringSerializer.INSTANCE); backend.setCurrentKey(1); backend .getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor) .add("Hello"); backend.setCurrentKey(2); backend .getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor) .add("Ciao"); Stream keys = backend .getKeys(listStateDescriptor.getName(), VoidNamespace.INSTANCE); keys.forEach((key) -> { backend.setCurrentKey(key); try { backend .getPartitionedState( VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor) .clear(); } catch (Exception e) { e.printStackTrace(); } }); } finally { IOUtils.closeQuietly(backend); backend.dispose(); } } {code} This should work because one of the use cases of {{getKeys()}} and {{applyToAllKeys()}} is to do stuff for every key, which includes deleting them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7775) Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs
[ https://issues.apache.org/jira/browse/FLINK-7775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7775: -- Description: {code} public int getNumberOfCachedJobs() { return jobRefCounters.size(); } {code} The method of PermanentBlobCache is not used. We should remove it. was: {code} public int getNumberOfCachedJobs() { return jobRefCounters.size(); } {code} The method of PermanentBlobCache is not used. We should remove it. > Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs > --- > > Key: FLINK-7775 > URL: https://issues.apache.org/jira/browse/FLINK-7775 > Project: Flink > Issue Type: Task > Components: Local Runtime >Reporter: Ted Yu >Priority: Minor > > {code} > public int getNumberOfCachedJobs() { > return jobRefCounters.size(); > } > {code} > The method of PermanentBlobCache is not used. > We should remove it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9049) Create unified interfaces to configure and instatiate TableSink
[ https://issues.apache.org/jira/browse/FLINK-9049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410237#comment-16410237 ] Shuyi Chen commented on FLINK-9049: --- Good point, [~fhueske]. Just created a JIRA for this and will get it in 1.5.0. > Create unified interfaces to configure and instatiate TableSink > --- > > Key: FLINK-9049 > URL: https://issues.apache.org/jira/browse/FLINK-9049 > Project: Flink > Issue Type: Task > Components: Table API SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > This is a similar effort to > [FLINK-8240|https://issues.apache.org/jira/browse/FLINK-8240], we want to > create a unified interface for discovery and instantiation of TableSink, and > later support table DDL in flink. The proposed solution would use similar > approach in [FLINK-8240|https://issues.apache.org/jira/browse/FLINK-8240], > and can re-use most of the implementations already done in > [FLINK-8240|https://issues.apache.org/jira/browse/FLINK-8240]. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "tableType" with values (source, sink and > both) for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9059) Add support for unified table source and sink declaration in environment file
Shuyi Chen created FLINK-9059: - Summary: Add support for unified table source and sink declaration in environment file Key: FLINK-9059 URL: https://issues.apache.org/jira/browse/FLINK-9059 Project: Flink Issue Type: Task Components: Table API SQL Reporter: Shuyi Chen Assignee: Shuyi Chen Fix For: 1.5.0 1) Add a common property called "tableType" with single value 'source'. 2) in yaml file, replace "sources" with "tables". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5750: [FLINK-8970] [E2E] HA end-to-end test with StateMa...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/5750 [FLINK-8970] [E2E] HA end-to-end test with StateMachineExample. Adds an end-to-end test that runs the StateMachineExample on a local cluster with HA enabled. There is a single JM which gets killed and re-created and we check if the new JM picks up the job execution and if at the end the StateMachine has no ALERTs printed. ## Verifying this change It is a script that you can run independently. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink ha-end-to-end-inv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5750.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5750 commit 1fb36e02d79dd2299dbf7c6c6ff84b76226adf91 Author: kkloudasDate: 2018-03-15T12:13:46Z [FLINK-8970] [E2E] HA end-to-end test with StateMachineExample. Adds an end-to-end test that runs the StateMachineExample on a local cluster with HA enabled. There is a single JM which gets killed and re-created and we check if the new JM picks up the job execution and if at the end the StateMachine has no ALERTs printed. ---
[jira] [Commented] (FLINK-8970) Add more automated end-to-end tests
[ https://issues.apache.org/jira/browse/FLINK-8970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410172#comment-16410172 ] ASF GitHub Bot commented on FLINK-8970: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/5750 [FLINK-8970] [E2E] HA end-to-end test with StateMachineExample. Adds an end-to-end test that runs the StateMachineExample on a local cluster with HA enabled. There is a single JM which gets killed and re-created and we check if the new JM picks up the job execution and if at the end the StateMachine has no ALERTs printed. ## Verifying this change It is a script that you can run independently. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink ha-end-to-end-inv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5750.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5750 commit 1fb36e02d79dd2299dbf7c6c6ff84b76226adf91 Author: kkloudasDate: 2018-03-15T12:13:46Z [FLINK-8970] [E2E] HA end-to-end test with StateMachineExample. Adds an end-to-end test that runs the StateMachineExample on a local cluster with HA enabled. There is a single JM which gets killed and re-created and we check if the new JM picks up the job execution and if at the end the StateMachine has no ALERTs printed. > Add more automated end-to-end tests > --- > > Key: FLINK-8970 > URL: https://issues.apache.org/jira/browse/FLINK-8970 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Till Rohrmann >Priority: Critical > > In order to improve Flink's test coverage and make releasing easier, we > should add more automated end-to-end tests which test Flink more like a user > would interact with the system. Additionally, these end-to-end tests should > test the integration of various other systems with Flink. > With FLINK-6539, we added a new module \{{flink-end-to-end-tests}} which > contains the set of currently available end-to-end tests. > With FLINK-8911, a script was added to trigger these tests. > > This issue is an umbrella issue collecting all different end-to-end tests > which we want to add to the Flink repository. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable
[ https://issues.apache.org/jira/browse/FLINK-9058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410031#comment-16410031 ] Aljoscha Krettek commented on FLINK-9058: - We could do but I would be more hesitant to change that, because {{ListCheckpointed}} is an existing interface while this ticket is about changing methods that we have not yet released in a stable release. Side note, I think {{ListCheckpointed}} will go away at some point because it's functionality is subsumed by the various methods on {{OperatorStateStore}}. [~kkrugler] What do you think about that? > Relax ListState.addAll() and ListState.update() to take Iterable > > > Key: FLINK-9058 > URL: https://issues.apache.org/jira/browse/FLINK-9058 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.5.0 > > > [~srichter] What do you think about this. None of the implementations require > the parameter to actually be a list and allowing an {{Iterable}} there allows > calling it in situations where all you have is an {{Iterable}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9034) State Descriptors drop TypeInformation on serialization
[ https://issues.apache.org/jira/browse/FLINK-9034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410027#comment-16410027 ] ASF GitHub Bot commented on FLINK-9034: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5732 @StephanEwen Yes, I think this should go into 1.5.0 because it fixes potential (and real) problems. And yes, I wasn't suggesting to remove `initializeSerializerUnlessSet(ExecutionConfig)` now, but it seemed like a good place to mention it. > State Descriptors drop TypeInformation on serialization > --- > > Key: FLINK-9034 > URL: https://issues.apache.org/jira/browse/FLINK-9034 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0 > > > The following code currently causes problems > {code} > public class MyFunction extends RichMapFunction { > private final ValueStateDescriptor descr = new > ValueStateDescriptor<>("state name", MyType.class); > private ValueState state; > @Override > public void open() { > state = getRuntimeContext().getValueState(descr); > } > } > {code} > The problem is that the state descriptor drops the type information and > creates a serializer before serialization as part of shipping the function in > the cluster. To do that, it initializes the serializer with an empty > execution config, making serialization inconsistent. > This is mainly an artifact from the days when dropping the type information > before shipping was necessary, because the type info was not serializable. It > now is, and we can fix that bug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9054) IllegalStateException: Buffer pool is destroyed
[ https://issues.apache.org/jira/browse/FLINK-9054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-9054: Priority: Blocker (was: Critical) > IllegalStateException: Buffer pool is destroyed > --- > > Key: FLINK-9054 > URL: https://issues.apache.org/jira/browse/FLINK-9054 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Configuration, Core >Affects Versions: 1.4.2 >Reporter: dhiraj prajapati >Priority: Blocker > Fix For: 1.5.0 > > Attachments: flink-conf.yaml > > > Hi, > I have a flink cluster running on 2 machines, say A and B. > Job manager is running on A. There are 2 TaksManagers, one on each node. > So effectively, A has a job manager and a task manager, while B has a task > manager. > When I submit a job to the cluster, I see below exception and the job fails: > 2018-03-22 17:16:52,205 WARN > org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while > emitting latency marker. > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486) > ... 10 more > Caused by: java.lang.RuntimeException: Buffer pool is destroyed. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitLatencyMarker(OperatorChain.java:604) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486) > ... 14 more > Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138) > ... 19 more > > The exception does not come when I run only one JobManager (only on machine > B). > > I am attaching flink-conf.yaml -- This message was
[jira] [Updated] (FLINK-9054) IllegalStateException: Buffer pool is destroyed
[ https://issues.apache.org/jira/browse/FLINK-9054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-9054: Fix Version/s: 1.5.0 > IllegalStateException: Buffer pool is destroyed > --- > > Key: FLINK-9054 > URL: https://issues.apache.org/jira/browse/FLINK-9054 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Configuration, Core >Affects Versions: 1.4.2 >Reporter: dhiraj prajapati >Priority: Blocker > Fix For: 1.5.0 > > Attachments: flink-conf.yaml > > > Hi, > I have a flink cluster running on 2 machines, say A and B. > Job manager is running on A. There are 2 TaksManagers, one on each node. > So effectively, A has a job manager and a task manager, while B has a task > manager. > When I submit a job to the cluster, I see below exception and the job fails: > 2018-03-22 17:16:52,205 WARN > org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while > emitting latency marker. > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486) > ... 10 more > Caused by: java.lang.RuntimeException: Buffer pool is destroyed. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitLatencyMarker(OperatorChain.java:604) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486) > ... 14 more > Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138) > ... 19 more > > The exception does not come when I run only one JobManager (only on machine > B). > > I am attaching flink-conf.yaml -- This message was sent by
[GitHub] flink issue #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descriptors
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5732 @StephanEwen Yes, I think this should go into 1.5.0 because it fixes potential (and real) problems. And yes, I wasn't suggesting to remove `initializeSerializerUnlessSet(ExecutionConfig)` now, but it seemed like a good place to mention it. ð ---
[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails
[ https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410004#comment-16410004 ] ASF GitHub Bot commented on FLINK-8721: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176515090 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { --- End diff -- Not sure whether we should capture the `RuntimeException` here. To me a `supplier` should not throw `RuntimeExceptions` and if so, then it should not produce a `OptionalFailure` but instead fail with a `RuntimeException`. > Client blocks indefinitely if job archiving fails > - > > Key: FLINK-8721 > URL: https://issues.apache.org/jira/browse/FLINK-8721 > Project: Flink > Issue Type: Improvement > Components: Client, JobManager >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Piotr Nowojski >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > While porting the {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if > the job archiving fails (in this case due to a custom accumulator throwing an > exception in #merge) no response is sent to the client. > {code} > 3547 [flink-akka.actor.default-dispatcher-2] ERROR > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - Caught exception > while executing runnable in main thread. > org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113) > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107) > at > org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex.(ArchivedExecutionJobVertex.java:49) > at >
[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails
[ https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410009#comment-16410009 ] ASF GitHub Bot commented on FLINK-8721: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176516910 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { + LOG.error("Failed to archive accumulators", ex); --- End diff -- The message indicates that `OptionalFailure` was implemented for the accumulators in mind, but I think it should be more generic. I guess that `AccumulatorHelper#67` is also the reason why we catch the `RuntimeException` to make the merge supplier as smooth as possible. > Client blocks indefinitely if job archiving fails > - > > Key: FLINK-8721 > URL: https://issues.apache.org/jira/browse/FLINK-8721 > Project: Flink > Issue Type: Improvement > Components: Client, JobManager >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Piotr Nowojski >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > While porting the {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if > the job archiving fails (in this case due to a custom accumulator throwing an > exception in #merge) no response is sent to the client. > {code} > 3547 [flink-akka.actor.default-dispatcher-2] ERROR > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - Caught exception > while executing runnable in main thread. > org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113) > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107) > at > org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599) > at >
[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails
[ https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410010#comment-16410010 ] ASF GitHub Bot commented on FLINK-8721: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176515638 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { + LOG.error("Failed to archive accumulators", ex); + return OptionalFailure.ofFailure(ex); + } + } + + /** +* @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}. +*/ + public T get() throws FlinkRuntimeException { + if (value != null) { + return value; + } + checkNotNull(failureCause); + throw new FlinkRuntimeException(failureCause); + } + + public Throwable getFailureCause() { + return checkNotNull(failureCause); + } + + public boolean isFailure() { + return failureCause != null; + } + + @Override + public int hashCode() { + return Objects.hash(value, failureCause); + } + + @Override + public boolean equals(Object object) { --- End diff -- why deviating from the super class' parameter name `obj`? > Client blocks indefinitely if job archiving fails > - > > Key: FLINK-8721 > URL: https://issues.apache.org/jira/browse/FLINK-8721 > Project: Flink > Issue Type: Improvement > Components: Client, JobManager >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Piotr Nowojski >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > While porting the {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if > the job archiving fails (in this case due to a custom accumulator throwing an > exception in #merge) no response is sent to the client. > {code} > 3547 [flink-akka.actor.default-dispatcher-2] ERROR >
[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails
[ https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410005#comment-16410005 ] ASF GitHub Bot commented on FLINK-8721: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176515453 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { + LOG.error("Failed to archive accumulators", ex); + return OptionalFailure.ofFailure(ex); + } + } + + /** +* @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}. +*/ + public T get() throws FlinkRuntimeException { --- End diff -- I think `get` should throw a checked exception and not an unchecked exception. Otherwise users won't be aware of it. We could provide a method `getUnchecked` where we throw an unchecked exception. > Client blocks indefinitely if job archiving fails > - > > Key: FLINK-8721 > URL: https://issues.apache.org/jira/browse/FLINK-8721 > Project: Flink > Issue Type: Improvement > Components: Client, JobManager >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Piotr Nowojski >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > While porting the {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if > the job archiving fails (in this case due to a custom accumulator throwing an > exception in #merge) no response is sent to the client. > {code} > 3547 [flink-akka.actor.default-dispatcher-2] ERROR > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - Caught exception > while executing runnable in main thread. > org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113) > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107) > at >
[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails
[ https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410007#comment-16410007 ] ASF GitHub Bot commented on FLINK-8721: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176514312 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; --- End diff -- This type is not serializable. I think you should mark it `transient` and then override `readObject` and `writeObject` similar to how `ArrayList` does it. > Client blocks indefinitely if job archiving fails > - > > Key: FLINK-8721 > URL: https://issues.apache.org/jira/browse/FLINK-8721 > Project: Flink > Issue Type: Improvement > Components: Client, JobManager >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Piotr Nowojski >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > While porting the {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if > the job archiving fails (in this case due to a custom accumulator throwing an > exception in #merge) no response is sent to the client. > {code} > 3547 [flink-akka.actor.default-dispatcher-2] ERROR > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - Caught exception > while executing runnable in main thread. > org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113) > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107) > at > org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex.(ArchivedExecutionJobVertex.java:49) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.archive(ExecutionJobVertex.java:612) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:313) > at > org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:985) > at > org.apache.flink.runtime.jobmaster.JobMaster.access$1400(JobMaster.java:136) > at > org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1181) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:292) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:147) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at
[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails
[ https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410011#comment-16410011 ] ASF GitHub Bot commented on FLINK-8721: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176517725 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java --- @@ -50,13 +51,13 @@ @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) @JsonSerialize(contentUsing = SerializedValueSerializer.class) - private MapserializedUserAccumulators; + private Map > serializedUserAccumulators; @JsonCreator public JobAccumulatorsInfo( - @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List jobAccumulators, - @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List userAccumulators, - @JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map serializedUserAccumulators) { + @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List jobAccumulators, + @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List userAccumulators, + @JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map > serializedUserAccumulators) { --- End diff -- indentation is wrong > Client blocks indefinitely if job archiving fails > - > > Key: FLINK-8721 > URL: https://issues.apache.org/jira/browse/FLINK-8721 > Project: Flink > Issue Type: Improvement > Components: Client, JobManager >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Piotr Nowojski >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > While porting the {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if > the job archiving fails (in this case due to a custom accumulator throwing an > exception in #merge) no response is sent to the client. > {code} > 3547 [flink-akka.actor.default-dispatcher-2] ERROR > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - Caught exception > while executing runnable in main thread. > org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113) > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107) > at > org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex.(ArchivedExecutionJobVertex.java:49) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.archive(ExecutionJobVertex.java:612) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:313) > at > org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:985) > at > org.apache.flink.runtime.jobmaster.JobMaster.access$1400(JobMaster.java:136) > at > org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1181) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:292) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:147) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at >
[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails
[ https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410008#comment-16410008 ] ASF GitHub Bot commented on FLINK-8721: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176515734 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { + LOG.error("Failed to archive accumulators", ex); + return OptionalFailure.ofFailure(ex); + } + } + + /** +* @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}. +*/ + public T get() throws FlinkRuntimeException { + if (value != null) { + return value; + } + checkNotNull(failureCause); + throw new FlinkRuntimeException(failureCause); + } + + public Throwable getFailureCause() { + return checkNotNull(failureCause); + } + + public boolean isFailure() { + return failureCause != null; + } + + @Override + public int hashCode() { + return Objects.hash(value, failureCause); + } + + @Override + public boolean equals(Object object) { + if (object == null) { + return false; + } + if (object == this) { + return true; + } + if (!(object instanceof OptionalFailure)) { + return false; + } + OptionalFailure other = (OptionalFailure) object; --- End diff -- Let's cast to `OptionalFailure` > Client blocks indefinitely if job archiving fails > - > > Key: FLINK-8721 > URL: https://issues.apache.org/jira/browse/FLINK-8721 > Project: Flink > Issue Type: Improvement > Components: Client, JobManager >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Piotr Nowojski >Priority: Blocker > Labels: flip-6 > Fix
[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails
[ https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410013#comment-16410013 ] ASF GitHub Bot commented on FLINK-8721: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176518316 --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java --- @@ -45,84 +47,87 @@ * b) are not compatible with existing accumulator. */ public class AccumulatorErrorITCase extends TestLogger { - - private static LocalFlinkMiniCluster cluster; - - private static ExecutionEnvironment env; - - @BeforeClass - public static void startCluster() { + private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone"; + private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge"; + private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators"; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfiguration(), + 2, + 3)); + + public static Configuration getConfiguration() { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L); - cluster = new LocalFlinkMiniCluster(config, false); - - cluster.start(); - - env = new TestEnvironment(cluster, 6, false); - } - - @AfterClass - public static void shutdownCluster() { - cluster.stop(); - cluster = null; + return config; } @Test public void testFaultyAccumulator() throws Exception { - + TestEnvironment env = MINI_CLUSTER_RESOURCE.getTestEnvironment(); --- End diff -- a no-op sink? > Client blocks indefinitely if job archiving fails > - > > Key: FLINK-8721 > URL: https://issues.apache.org/jira/browse/FLINK-8721 > Project: Flink > Issue Type: Improvement > Components: Client, JobManager >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Piotr Nowojski >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > While porting the {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if > the job archiving fails (in this case due to a custom accumulator throwing an > exception in #merge) no response is sent to the client. > {code} > 3547 [flink-akka.actor.default-dispatcher-2] ERROR > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - Caught exception > while executing runnable in main thread. > org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113) > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107) > at > org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex.(ArchivedExecutionJobVertex.java:49) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.archive(ExecutionJobVertex.java:612) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:313) > at > org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:985) > at > org.apache.flink.runtime.jobmaster.JobMaster.access$1400(JobMaster.java:136) > at > org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1181) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:292) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:147) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) >
[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails
[ https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410006#comment-16410006 ] ASF GitHub Bot commented on FLINK-8721: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176517187 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java --- @@ -48,8 +49,9 @@ * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds * @param accumulators A map of all accumulator results produced by the job, in serialized form */ - public SerializedJobExecutionResult(JobID jobID, long netRuntime, - Mapaccumulators) { + public SerializedJobExecutionResult(JobID jobID, + long netRuntime, + Map > accumulators) { --- End diff -- Something is with the indentation off here. > Client blocks indefinitely if job archiving fails > - > > Key: FLINK-8721 > URL: https://issues.apache.org/jira/browse/FLINK-8721 > Project: Flink > Issue Type: Improvement > Components: Client, JobManager >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Piotr Nowojski >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > While porting the {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if > the job archiving fails (in this case due to a custom accumulator throwing an > exception in #merge) no response is sent to the client. > {code} > 3547 [flink-akka.actor.default-dispatcher-2] ERROR > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - Caught exception > while executing runnable in main thread. > org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113) > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107) > at > org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex.(ArchivedExecutionJobVertex.java:49) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.archive(ExecutionJobVertex.java:612) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:313) > at > org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:985) > at > org.apache.flink.runtime.jobmaster.JobMaster.access$1400(JobMaster.java:136) > at > org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1181) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:292) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:147) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8721) Client blocks indefinitely if job archiving fails
[ https://issues.apache.org/jira/browse/FLINK-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410012#comment-16410012 ] ASF GitHub Bot commented on FLINK-8721: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176517944 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java --- @@ -21,83 +21,98 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.SerializedValue; + import org.junit.Test; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for the SerializedJobExecutionResult */ public class SerializedJobExecutionResultTest { --- End diff -- `extends TestLogger` missing > Client blocks indefinitely if job archiving fails > - > > Key: FLINK-8721 > URL: https://issues.apache.org/jira/browse/FLINK-8721 > Project: Flink > Issue Type: Improvement > Components: Client, JobManager >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Piotr Nowojski >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > While porting the {{AccumulatorErrorITCase}} i noticed that, for FLIP-6, if > the job archiving fails (in this case due to a custom accumulator throwing an > exception in #merge) no response is sent to the client. > {code} > 3547 [flink-akka.actor.default-dispatcher-2] ERROR > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - Caught exception > while executing runnable in main thread. > org.apache.flink.test.accumulators.AccumulatorErrorITCase$CustomException > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:113) > at > org.apache.flink.test.accumulators.AccumulatorErrorITCase$FaultyAccumulator.clone(AccumulatorErrorITCase.java:107) > at > org.apache.flink.api.common.accumulators.AccumulatorHelper.mergeInto(AccumulatorHelper.java:52) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregatedUserAccumulatorsStringified(ExecutionJobVertex.java:599) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex.(ArchivedExecutionJobVertex.java:49) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.archive(ExecutionJobVertex.java:612) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:313) > at > org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:985) > at > org.apache.flink.runtime.jobmaster.JobMaster.access$1400(JobMaster.java:136) > at > org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1181) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:292) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:147) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at >
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176515090 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { --- End diff -- Not sure whether we should capture the `RuntimeException` here. To me a `supplier` should not throw `RuntimeExceptions` and if so, then it should not produce a `OptionalFailure` but instead fail with a `RuntimeException`. ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176517944 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java --- @@ -21,83 +21,98 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.SerializedValue; + import org.junit.Test; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for the SerializedJobExecutionResult */ public class SerializedJobExecutionResultTest { --- End diff -- `extends TestLogger` missing ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176515453 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { + LOG.error("Failed to archive accumulators", ex); + return OptionalFailure.ofFailure(ex); + } + } + + /** +* @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}. +*/ + public T get() throws FlinkRuntimeException { --- End diff -- I think `get` should throw a checked exception and not an unchecked exception. Otherwise users won't be aware of it. We could provide a method `getUnchecked` where we throw an unchecked exception. ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176515734 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { + LOG.error("Failed to archive accumulators", ex); + return OptionalFailure.ofFailure(ex); + } + } + + /** +* @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}. +*/ + public T get() throws FlinkRuntimeException { + if (value != null) { + return value; + } + checkNotNull(failureCause); + throw new FlinkRuntimeException(failureCause); + } + + public Throwable getFailureCause() { + return checkNotNull(failureCause); + } + + public boolean isFailure() { + return failureCause != null; + } + + @Override + public int hashCode() { + return Objects.hash(value, failureCause); + } + + @Override + public boolean equals(Object object) { + if (object == null) { + return false; + } + if (object == this) { + return true; + } + if (!(object instanceof OptionalFailure)) { + return false; + } + OptionalFailure other = (OptionalFailure) object; --- End diff -- Let's cast to `OptionalFailure` ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176515638 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { + LOG.error("Failed to archive accumulators", ex); + return OptionalFailure.ofFailure(ex); + } + } + + /** +* @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}. +*/ + public T get() throws FlinkRuntimeException { + if (value != null) { + return value; + } + checkNotNull(failureCause); + throw new FlinkRuntimeException(failureCause); + } + + public Throwable getFailureCause() { + return checkNotNull(failureCause); + } + + public boolean isFailure() { + return failureCause != null; + } + + @Override + public int hashCode() { + return Objects.hash(value, failureCause); + } + + @Override + public boolean equals(Object object) { --- End diff -- why deviating from the super class' parameter name `obj`? ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176518316 --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java --- @@ -45,84 +47,87 @@ * b) are not compatible with existing accumulator. */ public class AccumulatorErrorITCase extends TestLogger { - - private static LocalFlinkMiniCluster cluster; - - private static ExecutionEnvironment env; - - @BeforeClass - public static void startCluster() { + private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone"; + private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge"; + private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators"; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfiguration(), + 2, + 3)); + + public static Configuration getConfiguration() { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L); - cluster = new LocalFlinkMiniCluster(config, false); - - cluster.start(); - - env = new TestEnvironment(cluster, 6, false); - } - - @AfterClass - public static void shutdownCluster() { - cluster.stop(); - cluster = null; + return config; } @Test public void testFaultyAccumulator() throws Exception { - + TestEnvironment env = MINI_CLUSTER_RESOURCE.getTestEnvironment(); --- End diff -- a no-op sink? ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176516910 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { + LOG.error("Failed to archive accumulators", ex); --- End diff -- The message indicates that `OptionalFailure` was implemented for the accumulators in mind, but I think it should be more generic. I guess that `AccumulatorHelper#67` is also the reason why we catch the `RuntimeException` to make the merge supplier as smooth as possible. ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176517725 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java --- @@ -50,13 +51,13 @@ @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) @JsonSerialize(contentUsing = SerializedValueSerializer.class) - private MapserializedUserAccumulators; + private Map > serializedUserAccumulators; @JsonCreator public JobAccumulatorsInfo( - @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List jobAccumulators, - @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List userAccumulators, - @JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map serializedUserAccumulators) { + @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List jobAccumulators, + @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List userAccumulators, + @JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map > serializedUserAccumulators) { --- End diff -- indentation is wrong ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176517187 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java --- @@ -48,8 +49,9 @@ * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds * @param accumulators A map of all accumulator results produced by the job, in serialized form */ - public SerializedJobExecutionResult(JobID jobID, long netRuntime, - Mapaccumulators) { + public SerializedJobExecutionResult(JobID jobID, + long netRuntime, + Map > accumulators) { --- End diff -- Something is with the indentation off here. ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176514312 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; --- End diff -- This type is not serializable. I think you should mark it `transient` and then override `readObject` and `writeObject` similar to how `ArrayList` does it. ---
[jira] [Commented] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable
[ https://issues.apache.org/jira/browse/FLINK-9058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1640#comment-1640 ] Ken Krugler commented on FLINK-9058: Would it make sense to file a related enhancement issue, for the {{ListCheckpointed}} methods to take/return iterables to avoid wasteful memory allocations, for the case where the state isn't also an in-memory list? > Relax ListState.addAll() and ListState.update() to take Iterable > > > Key: FLINK-9058 > URL: https://issues.apache.org/jira/browse/FLINK-9058 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.5.0 > > > [~srichter] What do you think about this. None of the implementations require > the parameter to actually be a list and allowing an {{Iterable}} there allows > calling it in situations where all you have is an {{Iterable}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable
[ https://issues.apache.org/jira/browse/FLINK-9058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409985#comment-16409985 ] ASF GitHub Bot commented on FLINK-9058: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5749 [FLINK-9058] Relax ListState.addAll() and ListState.update() to take Iterable If we do this, we should do it before 1.5.0 because we are introducing the methods for the first time there. R: @StefanRRichter You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-9058-list-state-iterable Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5749.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5749 commit 673cb2c0304ceb3cb9cc95c107bd00d37c59394b Author: Aljoscha KrettekDate: 2018-03-22T17:53:10Z [FLINK-9058] Relax ListState.addAll() and ListState.update() to take Iterable > Relax ListState.addAll() and ListState.update() to take Iterable > > > Key: FLINK-9058 > URL: https://issues.apache.org/jira/browse/FLINK-9058 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.5.0 > > > [~srichter] What do you think about this. None of the implementations require > the parameter to actually be a list and allowing an {{Iterable}} there allows > calling it in situations where all you have is an {{Iterable}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5749: [FLINK-9058] Relax ListState.addAll() and ListStat...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5749 [FLINK-9058] Relax ListState.addAll() and ListState.update() to take Iterable If we do this, we should do it before 1.5.0 because we are introducing the methods for the first time there. R: @StefanRRichter You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-9058-list-state-iterable Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5749.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5749 commit 673cb2c0304ceb3cb9cc95c107bd00d37c59394b Author: Aljoscha KrettekDate: 2018-03-22T17:53:10Z [FLINK-9058] Relax ListState.addAll() and ListState.update() to take Iterable ---
[jira] [Commented] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable
[ https://issues.apache.org/jira/browse/FLINK-9058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409966#comment-16409966 ] Aljoscha Krettek commented on FLINK-9058: - And I would like to fix this before we release those new methods with 1.5.0. > Relax ListState.addAll() and ListState.update() to take Iterable > > > Key: FLINK-9058 > URL: https://issues.apache.org/jira/browse/FLINK-9058 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.5.0 > > > [~srichter] What do you think about this. None of the implementations require > the parameter to actually be a list and allowing an {{Iterable}} there allows > calling it in situations where all you have is an {{Iterable}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable
Aljoscha Krettek created FLINK-9058: --- Summary: Relax ListState.addAll() and ListState.update() to take Iterable Key: FLINK-9058 URL: https://issues.apache.org/jira/browse/FLINK-9058 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek Fix For: 1.5.0 [~srichter] What do you think about this. None of the implementations require the parameter to actually be a list and allowing an {{Iterable}} there allows calling it in situations where all you have is an {{Iterable}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8562) Fix YARNSessionFIFOSecuredITCase
[ https://issues.apache.org/jira/browse/FLINK-8562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-8562. -- Resolution: Fixed Fixed via master: 37a114875afb9352e6f7b10e2729a94d0eeb72ee b550ac67fbf525863d5812d9d2a1010672a0169b 1.5.0: 4b66514f2c31d5ea29493baf9d022a0115faf82d 9c105f2c982e511bc1274a86d629e7fa26cf7ac8 > Fix YARNSessionFIFOSecuredITCase > > > Key: FLINK-8562 > URL: https://issues.apache.org/jira/browse/FLINK-8562 > Project: Flink > Issue Type: Bug > Components: Security >Affects Versions: 1.5.0, 1.6.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Fix For: 1.5.0 > > > Currently, YARNSessionFIFOSecuredITCase will not fail even if the current > Flink YARN Kerberos integration is failing in production. Please see > FLINK-8275. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8562) Fix YARNSessionFIFOSecuredITCase
[ https://issues.apache.org/jira/browse/FLINK-8562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409949#comment-16409949 ] ASF GitHub Bot commented on FLINK-8562: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5416 > Fix YARNSessionFIFOSecuredITCase > > > Key: FLINK-8562 > URL: https://issues.apache.org/jira/browse/FLINK-8562 > Project: Flink > Issue Type: Bug > Components: Security >Affects Versions: 1.5.0, 1.6.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Fix For: 1.5.0 > > > Currently, YARNSessionFIFOSecuredITCase will not fail even if the current > Flink YARN Kerberos integration is failing in production. Please see > FLINK-8275. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5416: [FLINK-8562] [Security] Fix YARNSessionFIFOSecured...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5416 ---
[jira] [Commented] (FLINK-8919) Add KeyedProcessFunctionWIthCleanupState
[ https://issues.apache.org/jira/browse/FLINK-8919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409918#comment-16409918 ] ASF GitHub Bot commented on FLINK-8919: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5680#discussion_r176499309 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators + +import java.lang.{Boolean => JBool} +import scala.collection.JavaConversions._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState +import org.apache.flink.table.runtime.harness.HarnessTestBase +import org.apache.flink.util.Collector +import org.junit.Test +import org.junit.Assert.assertArrayEquals + +class KeyedProcessFunctionWithCleanupStateTest extends HarnessTestBase { + @Test + def testNeedToCleanup(): Unit = { +val queryConfig = new StreamQueryConfig() --- End diff -- It would be good if the test would check that the state is actually cleared. > Add KeyedProcessFunctionWIthCleanupState > > > Key: FLINK-8919 > URL: https://issues.apache.org/jira/browse/FLINK-8919 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Minor > Fix For: 1.6.0 > > > ProcessFunctionWithCleanupState is a useful tool and I think we also need one > for the new KeyedProcessFunction api. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5680: [FLINK-8919] [Table API & SQL] Add KeyedProcessFun...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5680#discussion_r176499309 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators + +import java.lang.{Boolean => JBool} +import scala.collection.JavaConversions._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState +import org.apache.flink.table.runtime.harness.HarnessTestBase +import org.apache.flink.util.Collector +import org.junit.Test +import org.junit.Assert.assertArrayEquals + +class KeyedProcessFunctionWithCleanupStateTest extends HarnessTestBase { + @Test + def testNeedToCleanup(): Unit = { +val queryConfig = new StreamQueryConfig() --- End diff -- It would be good if the test would check that the state is actually cleared. ---
[jira] [Commented] (FLINK-8943) Jobs will not recover if DFS is temporarily unavailable
[ https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409913#comment-16409913 ] ASF GitHub Bot commented on FLINK-8943: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5746 Thanks for your reply ;) > Jobs will not recover if DFS is temporarily unavailable > --- > > Key: FLINK-8943 > URL: https://issues.apache.org/jira/browse/FLINK-8943 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.4.2, 1.6.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip6 > Fix For: 1.5.0 > > > *Description* > Job graphs will be recovered only once from the DFS. If the DFS is > unavailable at the recovery attempt, the jobs will simply be not running > until the master is restarted again. > *Steps to reproduce* > # Submit job on Flink Cluster with HDFS as HA storage dir. > # Trigger job recovery by killing the master > # Stop HDFS NameNode > # Enable HDFS NameNode after job recovery is over > # Verify that job is not running. > *Expected behavior* > The new master should fail fast and exit. The new master should re-attempt > the recovery. > *Stacktrace* > {noformat} > 2018-03-14 14:01:37,704 ERROR > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Could not > recover the job graph for a41d50b6f3ac16a730dd12792a847c97. > org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph > from state handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates > that the retrieved state handle is broken. Try cleaning the state handle > store. > at > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 > to ip-172-31-32-118.eu-central-1.compute.internal:9000 failed on connection > exception: java.net.ConnectException: Connection refused; For more details > see: http://wiki.apache.org/hadoop/ConnectionRefused > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801) > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732) > at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493) > at org.apache.hadoop.ipc.Client.call(Client.java:1435) > at org.apache.hadoop.ipc.Client.call(Client.java:1345) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) > at
[GitHub] flink issue #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recov...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5746 Thanks for your reply ;) ---
[jira] [Commented] (FLINK-8943) Jobs will not recover if DFS is temporarily unavailable
[ https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409907#comment-16409907 ] ASF GitHub Bot commented on FLINK-8943: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5746 please just ignore my question... > Jobs will not recover if DFS is temporarily unavailable > --- > > Key: FLINK-8943 > URL: https://issues.apache.org/jira/browse/FLINK-8943 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.4.2, 1.6.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip6 > Fix For: 1.5.0 > > > *Description* > Job graphs will be recovered only once from the DFS. If the DFS is > unavailable at the recovery attempt, the jobs will simply be not running > until the master is restarted again. > *Steps to reproduce* > # Submit job on Flink Cluster with HDFS as HA storage dir. > # Trigger job recovery by killing the master > # Stop HDFS NameNode > # Enable HDFS NameNode after job recovery is over > # Verify that job is not running. > *Expected behavior* > The new master should fail fast and exit. The new master should re-attempt > the recovery. > *Stacktrace* > {noformat} > 2018-03-14 14:01:37,704 ERROR > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Could not > recover the job graph for a41d50b6f3ac16a730dd12792a847c97. > org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph > from state handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates > that the retrieved state handle is broken. Try cleaning the state handle > store. > at > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 > to ip-172-31-32-118.eu-central-1.compute.internal:9000 failed on connection > exception: java.net.ConnectException: Connection refused; For more details > see: http://wiki.apache.org/hadoop/ConnectionRefused > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801) > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732) > at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493) > at org.apache.hadoop.ipc.Client.call(Client.java:1435) > at org.apache.hadoop.ipc.Client.call(Client.java:1345) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) >
[jira] [Commented] (FLINK-8943) Jobs will not recover if DFS is temporarily unavailable
[ https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409908#comment-16409908 ] ASF GitHub Bot commented on FLINK-8943: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5746 Yes @sihuazhou. In such a case the JM will terminate in order to let another JM try to recover the jobs. > Jobs will not recover if DFS is temporarily unavailable > --- > > Key: FLINK-8943 > URL: https://issues.apache.org/jira/browse/FLINK-8943 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.4.2, 1.6.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip6 > Fix For: 1.5.0 > > > *Description* > Job graphs will be recovered only once from the DFS. If the DFS is > unavailable at the recovery attempt, the jobs will simply be not running > until the master is restarted again. > *Steps to reproduce* > # Submit job on Flink Cluster with HDFS as HA storage dir. > # Trigger job recovery by killing the master > # Stop HDFS NameNode > # Enable HDFS NameNode after job recovery is over > # Verify that job is not running. > *Expected behavior* > The new master should fail fast and exit. The new master should re-attempt > the recovery. > *Stacktrace* > {noformat} > 2018-03-14 14:01:37,704 ERROR > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Could not > recover the job graph for a41d50b6f3ac16a730dd12792a847c97. > org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph > from state handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates > that the retrieved state handle is broken. Try cleaning the state handle > store. > at > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 > to ip-172-31-32-118.eu-central-1.compute.internal:9000 failed on connection > exception: java.net.ConnectException: Connection refused; For more details > see: http://wiki.apache.org/hadoop/ConnectionRefused > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801) > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732) > at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493) > at org.apache.hadoop.ipc.Client.call(Client.java:1435) > at org.apache.hadoop.ipc.Client.call(Client.java:1345) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at >
[GitHub] flink issue #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recov...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5746 Yes @sihuazhou. In such a case the JM will terminate in order to let another JM try to recover the jobs. ---
[GitHub] flink issue #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recov...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5746 please just ignore my question... ---
[jira] [Commented] (FLINK-9031) DataSet Job result changes when adding rebalance after union
[ https://issues.apache.org/jira/browse/FLINK-9031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409899#comment-16409899 ] ASF GitHub Bot commented on FLINK-9031: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5742 Updated the PR. Tests are passing (mod an unrelated failure). > DataSet Job result changes when adding rebalance after union > > > Key: FLINK-9031 > URL: https://issues.apache.org/jira/browse/FLINK-9031 > Project: Flink > Issue Type: Bug > Components: DataSet API, Local Runtime, Optimizer >Affects Versions: 1.3.1 >Reporter: Fabian Hueske >Priority: Critical > Attachments: Person.java, RunAll.java, newplan.txt, oldplan.txt > > > A user [reported this issue on the user mailing > list|https://lists.apache.org/thread.html/075f1a487b044079b5d61f199439cb77dd4174bd425bcb3327ed7dfc@%3Cuser.flink.apache.org%3E]. > {quote}I am using Flink 1.3.1 and I have found a strange behavior on running > the following logic: > # Read data from file and store into DataSet > # Split dataset in two, by checking if "field1" of POJOs is empty or not, so > that the first dataset contains only elements with non empty "field1", and > the second dataset will contain the other elements. > # Each dataset is then grouped by, one by "field1" and other by another > field, and subsequently reduced. > # The 2 datasets are merged together by union. > # The final dataset is written as json. > What I was expected, from output, was to find only one element with a > specific value of "field1" because: > # Reducing the first dataset grouped by "field1" should generate only one > element with a specific value of "field1". > # The second dataset should contain only elements with empty "field1". > # Making an union of them should not duplicate any record. > This does not happen. When i read the generated jsons i see some duplicate > (non empty) values of "field1". > Strangely this does not happen when the union between the two datasets is > not computed. In this case the first dataset produces elements only with > distinct values of "field1", while second dataset produces only records with > empty field "value1". > {quote} > The user has not enable object reuse. > Later he reports that the problem disappears when he injects a rebalance() > after a union resolves the problem. I had a look at the execution plans for > both cases (attached to this issue) but could not identify a problem. > Hence I assume, this might be an issue with the runtime code but we need to > look deeper into this. The user also provided an example program consisting > of two classes which are attached to the issue as well. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5742: [FLINK-9031] Fix DataSet Union operator translation bug.
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5742 Updated the PR. Tests are passing (mod an unrelated failure). ---
[jira] [Commented] (FLINK-8943) Jobs will not recover if DFS is temporarily unavailable
[ https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409888#comment-16409888 ] ASF GitHub Bot commented on FLINK-8943: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5746 Hi @tillrohrmann there is one question I want ask about this PR, is it means that in HA mode we can't tolerant jobs partial broken? > Jobs will not recover if DFS is temporarily unavailable > --- > > Key: FLINK-8943 > URL: https://issues.apache.org/jira/browse/FLINK-8943 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.4.2, 1.6.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip6 > Fix For: 1.5.0 > > > *Description* > Job graphs will be recovered only once from the DFS. If the DFS is > unavailable at the recovery attempt, the jobs will simply be not running > until the master is restarted again. > *Steps to reproduce* > # Submit job on Flink Cluster with HDFS as HA storage dir. > # Trigger job recovery by killing the master > # Stop HDFS NameNode > # Enable HDFS NameNode after job recovery is over > # Verify that job is not running. > *Expected behavior* > The new master should fail fast and exit. The new master should re-attempt > the recovery. > *Stacktrace* > {noformat} > 2018-03-14 14:01:37,704 ERROR > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Could not > recover the job graph for a41d50b6f3ac16a730dd12792a847c97. > org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph > from state handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates > that the retrieved state handle is broken. Try cleaning the state handle > store. > at > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 > to ip-172-31-32-118.eu-central-1.compute.internal:9000 failed on connection > exception: java.net.ConnectException: Connection refused; For more details > see: http://wiki.apache.org/hadoop/ConnectionRefused > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801) > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732) > at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493) > at org.apache.hadoop.ipc.Client.call(Client.java:1435) > at org.apache.hadoop.ipc.Client.call(Client.java:1345) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) >
[GitHub] flink issue #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recov...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5746 Hi @tillrohrmann there is one question I want ask about this PR, is it means that in HA mode we can't tolerant jobs partial broken? ---
[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5624 @StephanEwen unfortunately not, for example: `org.apache.flink.runtime.fs.hdfs.HadoopFileSystem#create()` -> `org.apache.hadoop.fs.FileSystem#create()` -> `org.apache.hadoop.fs.s3a.S3AFileSystem#create()` and this (depending on the Hadoop version, of course) may call this: ``` // get the status or throw an FNFE status = getFileStatus(f); // if the thread reaches here, there is something at the path if (status.isDirectory()) { ... ``` ---
[jira] [Commented] (FLINK-8402) HadoopS3FileSystemITCase.testDirectoryListing fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-8402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409887#comment-16409887 ] ASF GitHub Bot commented on FLINK-8402: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5624 @StephanEwen unfortunately not, for example: `org.apache.flink.runtime.fs.hdfs.HadoopFileSystem#create()` -> `org.apache.hadoop.fs.FileSystem#create()` -> `org.apache.hadoop.fs.s3a.S3AFileSystem#create()` and this (depending on the Hadoop version, of course) may call this: ``` // get the status or throw an FNFE status = getFileStatus(f); // if the thread reaches here, there is something at the path if (status.isDirectory()) { ... ``` > HadoopS3FileSystemITCase.testDirectoryListing fails on Travis > - > > Key: FLINK-8402 > URL: https://issues.apache.org/jira/browse/FLINK-8402 > Project: Flink > Issue Type: Bug > Components: FileSystem, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Nico Kruber >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.3 > > > The test {{HadoopS3FileSystemITCase.testDirectoryListing}} fails on Travis. > https://travis-ci.org/tillrohrmann/flink/jobs/327021175 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8402) HadoopS3FileSystemITCase.testDirectoryListing fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-8402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409866#comment-16409866 ] ASF GitHub Bot commented on FLINK-8402: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5624#discussion_r176492623 --- Diff: flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java --- @@ -143,7 +157,20 @@ public void testDirectoryListing() throws Exception { fs.delete(directory, true); } - // now directory must be gone - assertFalse(fs.exists(directory)); + // now directory must be gone (this is eventually-consistent, though!) + checkPathExists(fs, directory, false, deadline); + } + + private static void checkPathExists( --- End diff -- makes sense - I created `org.apache.flink.core.fs.FileSystemTestUtils` for this helper method > HadoopS3FileSystemITCase.testDirectoryListing fails on Travis > - > > Key: FLINK-8402 > URL: https://issues.apache.org/jira/browse/FLINK-8402 > Project: Flink > Issue Type: Bug > Components: FileSystem, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Nico Kruber >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.3 > > > The test {{HadoopS3FileSystemITCase.testDirectoryListing}} fails on Travis. > https://travis-ci.org/tillrohrmann/flink/jobs/327021175 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT ca...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5624#discussion_r176492623 --- Diff: flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java --- @@ -143,7 +157,20 @@ public void testDirectoryListing() throws Exception { fs.delete(directory, true); } - // now directory must be gone - assertFalse(fs.exists(directory)); + // now directory must be gone (this is eventually-consistent, though!) + checkPathExists(fs, directory, false, deadline); + } + + private static void checkPathExists( --- End diff -- makes sense - I created `org.apache.flink.core.fs.FileSystemTestUtils` for this helper method ---
[jira] [Commented] (FLINK-8964) Port JobSubmissionFailsITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409841#comment-16409841 ] ASF GitHub Bot commented on FLINK-8964: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5727#discussion_r176489375 --- Diff: flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java --- @@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) { // - private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception { - if (detached) { - cluster.submitJobDetached(jobGraph); - return null; - } - else { - return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION()); - } - } - @Test - public void testExceptionInInitializeOnMaster() { - try { - final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); - failingJobVertex.setInvokableClass(NoOpInvokable.class); - - final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); + public void testExceptionInInitializeOnMaster() throws Exception { + final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); + failingJobVertex.setInvokableClass(NoOpInvokable.class); - try { - submitJob(failingJobGraph); - fail("Expected JobExecutionException."); - } - catch (JobExecutionException e) { - assertEquals("Test exception.", e.getCause().getMessage()); - } - catch (Throwable t) { - t.printStackTrace(); - fail("Caught wrong exception of type " + t.getClass() + "."); - } + final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); - cluster.submitJobAndWait(workingJobGraph, false); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } + ClusterClient client = MINI_CLUSTER_RESOURCE.getClusterClient(); + client.setDetached(detached); - @Test - public void testSubmitEmptyJobGraph() { try { - final JobGraph jobGraph = new JobGraph("Testing job"); - - try { - submitJob(jobGraph); - fail("Expected JobSubmissionException."); - } - catch (JobSubmissionException e) { - assertTrue(e.getMessage() != null && e.getMessage().contains("empty")); + client.submitJob(failingJobGraph, JobSubmissionFailsITCase.class.getClassLoader()); + fail("Job submission should have thrown an exception."); + } catch (Exception e) { + Optional expectedCause = ExceptionUtils.findThrowable(e, + candidate -> candidate.getMessage() != null && candidate.getMessage().equals("Test exception.")); + if (!expectedCause.isPresent()) { + throw e; } - catch (Throwable t) { - t.printStackTrace(); - fail("Caught wrong exception of type " + t.getClass() + "."); - } - - cluster.submitJobAndWait(workingJobGraph, false); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); } + + client.setDetached(false); + client.submitJob(getWorkingJobGraph(), JobSubmissionFailsITCase.class.getClassLoader()); --- End diff -- Alright, it's because of the `RunningJobsRegistry` which records that a previous job with the same `JobID` has already been executed. > Port JobSubmissionFailsITCase to flip6 > -- > > Key: FLINK-8964 > URL: https://issues.apache.org/jira/browse/FLINK-8964 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >
[GitHub] flink pull request #5727: [FLINK-8964][tests] Port JobSubmissionFailsITCase ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5727#discussion_r176489375 --- Diff: flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java --- @@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) { // - private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception { - if (detached) { - cluster.submitJobDetached(jobGraph); - return null; - } - else { - return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION()); - } - } - @Test - public void testExceptionInInitializeOnMaster() { - try { - final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); - failingJobVertex.setInvokableClass(NoOpInvokable.class); - - final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); + public void testExceptionInInitializeOnMaster() throws Exception { + final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); + failingJobVertex.setInvokableClass(NoOpInvokable.class); - try { - submitJob(failingJobGraph); - fail("Expected JobExecutionException."); - } - catch (JobExecutionException e) { - assertEquals("Test exception.", e.getCause().getMessage()); - } - catch (Throwable t) { - t.printStackTrace(); - fail("Caught wrong exception of type " + t.getClass() + "."); - } + final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); - cluster.submitJobAndWait(workingJobGraph, false); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } + ClusterClient client = MINI_CLUSTER_RESOURCE.getClusterClient(); + client.setDetached(detached); - @Test - public void testSubmitEmptyJobGraph() { try { - final JobGraph jobGraph = new JobGraph("Testing job"); - - try { - submitJob(jobGraph); - fail("Expected JobSubmissionException."); - } - catch (JobSubmissionException e) { - assertTrue(e.getMessage() != null && e.getMessage().contains("empty")); + client.submitJob(failingJobGraph, JobSubmissionFailsITCase.class.getClassLoader()); + fail("Job submission should have thrown an exception."); + } catch (Exception e) { + Optional expectedCause = ExceptionUtils.findThrowable(e, + candidate -> candidate.getMessage() != null && candidate.getMessage().equals("Test exception.")); + if (!expectedCause.isPresent()) { + throw e; } - catch (Throwable t) { - t.printStackTrace(); - fail("Caught wrong exception of type " + t.getClass() + "."); - } - - cluster.submitJobAndWait(workingJobGraph, false); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); } + + client.setDetached(false); + client.submitJob(getWorkingJobGraph(), JobSubmissionFailsITCase.class.getClassLoader()); --- End diff -- Alright, it's because of the `RunningJobsRegistry` which records that a previous job with the same `JobID` has already been executed. ---
[jira] [Commented] (FLINK-8964) Port JobSubmissionFailsITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409832#comment-16409832 ] ASF GitHub Bot commented on FLINK-8964: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5727#discussion_r176487399 --- Diff: flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java --- @@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) { // - private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception { - if (detached) { - cluster.submitJobDetached(jobGraph); - return null; - } - else { - return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION()); - } - } - @Test - public void testExceptionInInitializeOnMaster() { - try { - final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); - failingJobVertex.setInvokableClass(NoOpInvokable.class); - - final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); + public void testExceptionInInitializeOnMaster() throws Exception { + final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); + failingJobVertex.setInvokableClass(NoOpInvokable.class); - try { - submitJob(failingJobGraph); - fail("Expected JobExecutionException."); - } - catch (JobExecutionException e) { - assertEquals("Test exception.", e.getCause().getMessage()); - } - catch (Throwable t) { - t.printStackTrace(); - fail("Caught wrong exception of type " + t.getClass() + "."); - } + final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); - cluster.submitJobAndWait(workingJobGraph, false); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } + ClusterClient client = MINI_CLUSTER_RESOURCE.getClusterClient(); + client.setDetached(detached); - @Test - public void testSubmitEmptyJobGraph() { try { - final JobGraph jobGraph = new JobGraph("Testing job"); - - try { - submitJob(jobGraph); - fail("Expected JobSubmissionException."); - } - catch (JobSubmissionException e) { - assertTrue(e.getMessage() != null && e.getMessage().contains("empty")); + client.submitJob(failingJobGraph, JobSubmissionFailsITCase.class.getClassLoader()); + fail("Job submission should have thrown an exception."); + } catch (Exception e) { + Optional expectedCause = ExceptionUtils.findThrowable(e, + candidate -> candidate.getMessage() != null && candidate.getMessage().equals("Test exception.")); + if (!expectedCause.isPresent()) { + throw e; } - catch (Throwable t) { - t.printStackTrace(); - fail("Caught wrong exception of type " + t.getClass() + "."); - } - - cluster.submitJobAndWait(workingJobGraph, false); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); } + + client.setDetached(false); + client.submitJob(getWorkingJobGraph(), JobSubmissionFailsITCase.class.getClassLoader()); --- End diff -- Why didn't it work to submit twice the same `JobGraph`? > Port JobSubmissionFailsITCase to flip6 > -- > > Key: FLINK-8964 > URL: https://issues.apache.org/jira/browse/FLINK-8964 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >
[GitHub] flink pull request #5727: [FLINK-8964][tests] Port JobSubmissionFailsITCase ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5727#discussion_r176487399 --- Diff: flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java --- @@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) { // - private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception { - if (detached) { - cluster.submitJobDetached(jobGraph); - return null; - } - else { - return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION()); - } - } - @Test - public void testExceptionInInitializeOnMaster() { - try { - final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); - failingJobVertex.setInvokableClass(NoOpInvokable.class); - - final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); + public void testExceptionInInitializeOnMaster() throws Exception { + final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); + failingJobVertex.setInvokableClass(NoOpInvokable.class); - try { - submitJob(failingJobGraph); - fail("Expected JobExecutionException."); - } - catch (JobExecutionException e) { - assertEquals("Test exception.", e.getCause().getMessage()); - } - catch (Throwable t) { - t.printStackTrace(); - fail("Caught wrong exception of type " + t.getClass() + "."); - } + final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); - cluster.submitJobAndWait(workingJobGraph, false); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } + ClusterClient client = MINI_CLUSTER_RESOURCE.getClusterClient(); + client.setDetached(detached); - @Test - public void testSubmitEmptyJobGraph() { try { - final JobGraph jobGraph = new JobGraph("Testing job"); - - try { - submitJob(jobGraph); - fail("Expected JobSubmissionException."); - } - catch (JobSubmissionException e) { - assertTrue(e.getMessage() != null && e.getMessage().contains("empty")); + client.submitJob(failingJobGraph, JobSubmissionFailsITCase.class.getClassLoader()); + fail("Job submission should have thrown an exception."); + } catch (Exception e) { + Optional expectedCause = ExceptionUtils.findThrowable(e, + candidate -> candidate.getMessage() != null && candidate.getMessage().equals("Test exception.")); + if (!expectedCause.isPresent()) { + throw e; } - catch (Throwable t) { - t.printStackTrace(); - fail("Caught wrong exception of type " + t.getClass() + "."); - } - - cluster.submitJobAndWait(workingJobGraph, false); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); } + + client.setDetached(false); + client.submitJob(getWorkingJobGraph(), JobSubmissionFailsITCase.class.getClassLoader()); --- End diff -- Why didn't it work to submit twice the same `JobGraph`? ---
[jira] [Commented] (FLINK-8964) Port JobSubmissionFailsITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409833#comment-16409833 ] ASF GitHub Bot commented on FLINK-8964: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5727#discussion_r176487184 --- Diff: flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java --- @@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) { // - private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception { - if (detached) { - cluster.submitJobDetached(jobGraph); - return null; - } - else { - return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION()); - } - } - @Test - public void testExceptionInInitializeOnMaster() { - try { - final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); - failingJobVertex.setInvokableClass(NoOpInvokable.class); - - final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); + public void testExceptionInInitializeOnMaster() throws Exception { + final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); + failingJobVertex.setInvokableClass(NoOpInvokable.class); - try { - submitJob(failingJobGraph); - fail("Expected JobExecutionException."); - } - catch (JobExecutionException e) { - assertEquals("Test exception.", e.getCause().getMessage()); - } - catch (Throwable t) { - t.printStackTrace(); - fail("Caught wrong exception of type " + t.getClass() + "."); - } + final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); - cluster.submitJobAndWait(workingJobGraph, false); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } + ClusterClient client = MINI_CLUSTER_RESOURCE.getClusterClient(); + client.setDetached(detached); - @Test - public void testSubmitEmptyJobGraph() { try { - final JobGraph jobGraph = new JobGraph("Testing job"); - - try { - submitJob(jobGraph); - fail("Expected JobSubmissionException."); - } - catch (JobSubmissionException e) { - assertTrue(e.getMessage() != null && e.getMessage().contains("empty")); + client.submitJob(failingJobGraph, JobSubmissionFailsITCase.class.getClassLoader()); + fail("Job submission should have thrown an exception."); + } catch (Exception e) { + Optional expectedCause = ExceptionUtils.findThrowable(e, + candidate -> candidate.getMessage() != null && candidate.getMessage().equals("Test exception.")); --- End diff -- could be simplified by `"Test exception.".equals(candidate.getMessage)` > Port JobSubmissionFailsITCase to flip6 > -- > > Key: FLINK-8964 > URL: https://issues.apache.org/jira/browse/FLINK-8964 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5727: [FLINK-8964][tests] Port JobSubmissionFailsITCase ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5727#discussion_r176487184 --- Diff: flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java --- @@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) { // - private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception { - if (detached) { - cluster.submitJobDetached(jobGraph); - return null; - } - else { - return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION()); - } - } - @Test - public void testExceptionInInitializeOnMaster() { - try { - final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); - failingJobVertex.setInvokableClass(NoOpInvokable.class); - - final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); + public void testExceptionInInitializeOnMaster() throws Exception { + final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); + failingJobVertex.setInvokableClass(NoOpInvokable.class); - try { - submitJob(failingJobGraph); - fail("Expected JobExecutionException."); - } - catch (JobExecutionException e) { - assertEquals("Test exception.", e.getCause().getMessage()); - } - catch (Throwable t) { - t.printStackTrace(); - fail("Caught wrong exception of type " + t.getClass() + "."); - } + final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); - cluster.submitJobAndWait(workingJobGraph, false); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } + ClusterClient client = MINI_CLUSTER_RESOURCE.getClusterClient(); + client.setDetached(detached); - @Test - public void testSubmitEmptyJobGraph() { try { - final JobGraph jobGraph = new JobGraph("Testing job"); - - try { - submitJob(jobGraph); - fail("Expected JobSubmissionException."); - } - catch (JobSubmissionException e) { - assertTrue(e.getMessage() != null && e.getMessage().contains("empty")); + client.submitJob(failingJobGraph, JobSubmissionFailsITCase.class.getClassLoader()); + fail("Job submission should have thrown an exception."); + } catch (Exception e) { + Optional expectedCause = ExceptionUtils.findThrowable(e, + candidate -> candidate.getMessage() != null && candidate.getMessage().equals("Test exception.")); --- End diff -- could be simplified by `"Test exception.".equals(candidate.getMessage)` ---
[GitHub] flink pull request #5696: [hotfix][javadoc] fix doc of SlotProvider.allocate...
Github user sihuazhou closed the pull request at: https://github.com/apache/flink/pull/5696 ---
[jira] [Commented] (FLINK-9053) Exception in RecordWriter during cleanup of StreamTask with the checkpoint trigger running in parallel
[ https://issues.apache.org/jira/browse/FLINK-9053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409803#comment-16409803 ] ASF GitHub Bot commented on FLINK-9053: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5748 [FLINK-9053][runtime] only release outputs under the checkpoint lock ## What is the purpose of the change Releasing an operator chain's outputs will call `RecordWriter#clearBuffers()` and this may not be run in parallel with `RecordWriter#broadcastEvent()` which the asynchronous checkpoint barrier trigger inside `Task` may run via `StreamTask#triggerCheckpoint()`. Now, during the cleanup of `StreamTask#invoke()`, `StreamTask`'s asynchronous services are shut down but not those of the `Task` and also `operatorChain.releaseOutputs()` is not put under the checkpoint lock. Therefore, the following may run in parallel: - `Task`'s checkpoint trigger execution - `operatorChain.releaseOutputs()` We may guard `operatorChain.releaseOutputs()` with the the checkpoint lock and should be safe to do so since we already closed all of `StreamTask`'s asynchronous executors and also disposed the operators. Hence nothing should be blocking the cleanup by holding the checkpoint lock. @StephanEwen can you please have a look to verify the safety of this? ## Brief change log - add the checkpoint lock in the cleanup of `StreamTask#invoke()` around `operatorChain.releaseOutputs()` ## Verifying this change This is a very rare race condition that was uncovered by the `RescalingITCase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-9053 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5748.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5748 commit 9f0295aa3c02e4870b248241cb9094d14863a686 Author: Stefan RichterDate: 2018-02-26T17:03:14Z [hotfix] Improved logging for task local recovery (cherry picked from commit 56c7560) commit e9e13dec10f1a1ee57c46719d758885c4f33dcf3 Author: Stephan Ewen Date: 2018-02-27T15:53:03Z [hotfix] [core] Suppress unused warning config options only used in shell scripts and doc generation. commit a269f8519305faff153e84d729873b6f9497bd36 Author: Stephan Ewen Date: 2018-02-27T16:04:29Z [FLINK-8798] [core] Make force 'commons-logging' to be parent-first loaded. commit 1d26062de130c05fdbe7701b55766b4a8d433418 Author: Xingcan Cui Date: 2018-02-12T10:11:36Z [FLINK-8538][table]Add a Kafka table source factory with JSON format support commit db2c510fb4f171c9e9940759e5fbaf466ec74474 Author: Timo Walther Date: 2018-02-19T12:35:45Z [FLINK-8538] [table] Improve unified table sources This closes #5564. commit 23358ff87003fd6603c0ca19bc37f31944d2c494 Author: Stephan Ewen Date: 2018-02-26T15:41:24Z [FLINK-8791] [docs] Fix documentation about configuring dependencies commit acf114793c708f0ab207008c25195f6f65796e5f Author: gyao Date: 2018-02-21T15:02:01Z [FLINK-8730][REST] JSON serialize entire SerializedThrowable Do not only serialize the serialized exception but the entire SerializedThrowable object. This makes it possible to throw the SerializedThrowable itself without deserializing it. This closes #5546. commit 2f6cb37c775106bb684ef9c608585e7a72056460 Author: gyao Date: 2018-02-27T15:58:53Z [FLINK-8787][flip6] Do not copy flinkConfiguration in AbstractYarnClusterDescriptor This closes #5591. commit 51d5bc6c5151c2aed3f932f84c35da43689501ec Author: vinoyang Date: 2018-02-27T06:43:52Z [FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to convertValueToString This closes #5587. commit 08e615027acd426537dc580139a61bd4082b7c3f Author: Till Rohrmann Date: 2018-02-28T09:11:44Z [FLINK-8792] [rest] Change
[GitHub] flink pull request #5748: [FLINK-9053][runtime] only release outputs under t...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5748 [FLINK-9053][runtime] only release outputs under the checkpoint lock ## What is the purpose of the change Releasing an operator chain's outputs will call `RecordWriter#clearBuffers()` and this may not be run in parallel with `RecordWriter#broadcastEvent()` which the asynchronous checkpoint barrier trigger inside `Task` may run via `StreamTask#triggerCheckpoint()`. Now, during the cleanup of `StreamTask#invoke()`, `StreamTask`'s asynchronous services are shut down but not those of the `Task` and also `operatorChain.releaseOutputs()` is not put under the checkpoint lock. Therefore, the following may run in parallel: - `Task`'s checkpoint trigger execution - `operatorChain.releaseOutputs()` We may guard `operatorChain.releaseOutputs()` with the the checkpoint lock and should be safe to do so since we already closed all of `StreamTask`'s asynchronous executors and also disposed the operators. Hence nothing should be blocking the cleanup by holding the checkpoint lock. @StephanEwen can you please have a look to verify the safety of this? ## Brief change log - add the checkpoint lock in the cleanup of `StreamTask#invoke()` around `operatorChain.releaseOutputs()` ## Verifying this change This is a very rare race condition that was uncovered by the `RescalingITCase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-9053 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5748.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5748 commit 9f0295aa3c02e4870b248241cb9094d14863a686 Author: Stefan RichterDate: 2018-02-26T17:03:14Z [hotfix] Improved logging for task local recovery (cherry picked from commit 56c7560) commit e9e13dec10f1a1ee57c46719d758885c4f33dcf3 Author: Stephan Ewen Date: 2018-02-27T15:53:03Z [hotfix] [core] Suppress unused warning config options only used in shell scripts and doc generation. commit a269f8519305faff153e84d729873b6f9497bd36 Author: Stephan Ewen Date: 2018-02-27T16:04:29Z [FLINK-8798] [core] Make force 'commons-logging' to be parent-first loaded. commit 1d26062de130c05fdbe7701b55766b4a8d433418 Author: Xingcan Cui Date: 2018-02-12T10:11:36Z [FLINK-8538][table]Add a Kafka table source factory with JSON format support commit db2c510fb4f171c9e9940759e5fbaf466ec74474 Author: Timo Walther Date: 2018-02-19T12:35:45Z [FLINK-8538] [table] Improve unified table sources This closes #5564. commit 23358ff87003fd6603c0ca19bc37f31944d2c494 Author: Stephan Ewen Date: 2018-02-26T15:41:24Z [FLINK-8791] [docs] Fix documentation about configuring dependencies commit acf114793c708f0ab207008c25195f6f65796e5f Author: gyao Date: 2018-02-21T15:02:01Z [FLINK-8730][REST] JSON serialize entire SerializedThrowable Do not only serialize the serialized exception but the entire SerializedThrowable object. This makes it possible to throw the SerializedThrowable itself without deserializing it. This closes #5546. commit 2f6cb37c775106bb684ef9c608585e7a72056460 Author: gyao Date: 2018-02-27T15:58:53Z [FLINK-8787][flip6] Do not copy flinkConfiguration in AbstractYarnClusterDescriptor This closes #5591. commit 51d5bc6c5151c2aed3f932f84c35da43689501ec Author: vinoyang Date: 2018-02-27T06:43:52Z [FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to convertValueToString This closes #5587. commit 08e615027acd426537dc580139a61bd4082b7c3f Author: Till Rohrmann Date: 2018-02-28T09:11:44Z [FLINK-8792] [rest] Change MessageQueryParameter#convertValueFromString to convertStringToValue commit 302aaeb021bacf3f37cb9a3ee236304c94adbf30 Author: Timo Walther Date: 2018-02-22T16:22:54Z [FLINK-8451] [serializers] Make Scala tuple
[jira] [Commented] (FLINK-8957) Port JMXJobManagerMetricTest to flip6
[ https://issues.apache.org/jira/browse/FLINK-8957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409800#comment-16409800 ] ASF GitHub Bot commented on FLINK-8957: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5720#discussion_r176482195 --- Diff: flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java --- @@ -92,28 +101,26 @@ public void testJobManagerJMXMetricAccess() throws Exception { true), null)); - flink.waitForActorsToBeAlive(); - - flink.submitJobDetached(jobGraph); + ClusterClient client = MINI_CLUSTER_RESOURCE.getClusterClient(); + client.setDetached(true); + client.submitJob(jobGraph, JMXJobManagerMetricTest.class.getClassLoader()); - Future jobRunning = flink.getLeaderGateway(deadline.timeLeft()) - .ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), deadline.timeLeft()); - Await.ready(jobRunning, deadline.timeLeft()); + FutureUtils.retrySuccesfulWithDelay( + () -> client.getJobStatus(jobGraph.getJobID()), + Time.milliseconds(10), + deadline, + status -> status == JobStatus.RUNNING, --- End diff -- The check whether the job is running does not necessarily mean that all vertices are running. But I guess what we are waiting for is the initialization of the `CheckpointStatsTracker`. > Port JMXJobManagerMetricTest to flip6 > - > > Key: FLINK-8957 > URL: https://issues.apache.org/jira/browse/FLINK-8957 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5720: [FLINK-8957][tests] Port JMXJobManagerMetricTest t...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5720#discussion_r176482195 --- Diff: flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java --- @@ -92,28 +101,26 @@ public void testJobManagerJMXMetricAccess() throws Exception { true), null)); - flink.waitForActorsToBeAlive(); - - flink.submitJobDetached(jobGraph); + ClusterClient client = MINI_CLUSTER_RESOURCE.getClusterClient(); + client.setDetached(true); + client.submitJob(jobGraph, JMXJobManagerMetricTest.class.getClassLoader()); - Future jobRunning = flink.getLeaderGateway(deadline.timeLeft()) - .ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), deadline.timeLeft()); - Await.ready(jobRunning, deadline.timeLeft()); + FutureUtils.retrySuccesfulWithDelay( + () -> client.getJobStatus(jobGraph.getJobID()), + Time.milliseconds(10), + deadline, + status -> status == JobStatus.RUNNING, --- End diff -- The check whether the job is running does not necessarily mean that all vertices are running. But I guess what we are waiting for is the initialization of the `CheckpointStatsTracker`. ---
[jira] [Assigned] (FLINK-9053) Exception in RecordWriter during cleanup of StreamTask with the checkpoint trigger running in parallel
[ https://issues.apache.org/jira/browse/FLINK-9053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber reassigned FLINK-9053: -- Assignee: Nico Kruber > Exception in RecordWriter during cleanup of StreamTask with the checkpoint > trigger running in parallel > -- > > Key: FLINK-9053 > URL: https://issues.apache.org/jira/browse/FLINK-9053 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > https://travis-ci.org/apache/flink/jobs/356612100 > {code} > testSavepointRescalingOutKeyedStateDerivedMaxParallelism[0](org.apache.flink.test.checkpointing.RescalingITCase) > Time elapsed: 2.021 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.NoSuchElementException: No value present > at java.util.Optional.get(Optional.java:135) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.closeBufferBuilder(RecordWriter.java:218) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.clearBuffers(RecordWriter.java:178) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.close(StreamRecordWriter.java:99) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.close(RecordWriterOutput.java:161) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.releaseOutputs(OperatorChain.java:239) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:402) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9057) NPE in CreditBasedSequenceNumberingViewReader when cancelling before initilization was complete
[ https://issues.apache.org/jira/browse/FLINK-9057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409786#comment-16409786 ] ASF GitHub Bot commented on FLINK-9057: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5747 [FLINK-9057][network] fix an NPE when cleaning up before requesting a subpartition view ## What is the purpose of the change In `PartitionRequestServerHandler`, the view reader is created and immediately afterwards added to the `PartitionRequestQueue` which would attempt a cleanup of the view reader's subpartition view. This view, however, is currently only created after adding the reader to the `PartitionRequestQueue` and may thus result in a `NullPointerException` if the cleanup happens very early in the initialization phase, e.g. due to failures. ## Brief change log - call `NetworkSequenceViewReader#requestSubpartitionView` before calling `PartitionRequestQueue#notifyReaderCreated()` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-9057 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5747.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5747 commit 3a34531f8d5fd2c4e71102f2d9d66105e55eb697 Author: Nico KruberDate: 2018-03-22T12:49:45Z [hotfix][tests] add a name to the parameter of RescalingITCase commit 2e909f085bbc90f406eeae16efda15254c296c0e Author: Nico Kruber Date: 2018-03-22T12:50:07Z [FLINK-9057][network] fix an NPE when cleaning up before requesting a subpartition view > NPE in CreditBasedSequenceNumberingViewReader when cancelling before > initilization was complete > --- > > Key: FLINK-9057 > URL: https://issues.apache.org/jira/browse/FLINK-9057 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > {{RescalingITCase}} unveiled an exception which may occur when shutting down > before completely initializing the network stack: > https://travis-ci.org/apache/flink/jobs/356612100 > {code} > 01:08:13,458 WARN > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline - An > exception was thrown by a user handler's exceptionCaught() method while > handling the following exception: > java.lang.NullPointerException > at > org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.releaseAllResources(CreditBasedSequenceNumberingViewReader.java:192) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.releaseAllResources(PartitionRequestQueue.java:322) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.channelInactive(PartitionRequestQueue.java:298) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) >
[GitHub] flink pull request #5747: [FLINK-9057][network] fix an NPE when cleaning up ...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5747 [FLINK-9057][network] fix an NPE when cleaning up before requesting a subpartition view ## What is the purpose of the change In `PartitionRequestServerHandler`, the view reader is created and immediately afterwards added to the `PartitionRequestQueue` which would attempt a cleanup of the view reader's subpartition view. This view, however, is currently only created after adding the reader to the `PartitionRequestQueue` and may thus result in a `NullPointerException` if the cleanup happens very early in the initialization phase, e.g. due to failures. ## Brief change log - call `NetworkSequenceViewReader#requestSubpartitionView` before calling `PartitionRequestQueue#notifyReaderCreated()` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-9057 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5747.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5747 commit 3a34531f8d5fd2c4e71102f2d9d66105e55eb697 Author: Nico KruberDate: 2018-03-22T12:49:45Z [hotfix][tests] add a name to the parameter of RescalingITCase commit 2e909f085bbc90f406eeae16efda15254c296c0e Author: Nico Kruber Date: 2018-03-22T12:50:07Z [FLINK-9057][network] fix an NPE when cleaning up before requesting a subpartition view ---
[jira] [Commented] (FLINK-8959) Port AccumulatorLiveITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409777#comment-16409777 ] ASF GitHub Bot commented on FLINK-8959: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5719 Good work @zentol. I guess only a rebase is missing. +1 for merging then. > Port AccumulatorLiveITCase to flip6 > --- > > Key: FLINK-8959 > URL: https://issues.apache.org/jira/browse/FLINK-8959 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5719: [FLINK-8959][tests] Port AccumulatorLiveITCase to flip6
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5719 Good work @zentol. I guess only a rebase is missing. +1 for merging then. ---
[jira] [Updated] (FLINK-9053) Exception in RecordWriter during cleanup of StreamTask with the checkpoint trigger running in parallel
[ https://issues.apache.org/jira/browse/FLINK-9053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-9053: --- Summary: Exception in RecordWriter during cleanup of StreamTask with the checkpoint trigger running in parallel (was: RescalingITCase failed on travis) > Exception in RecordWriter during cleanup of StreamTask with the checkpoint > trigger running in parallel > -- > > Key: FLINK-9053 > URL: https://issues.apache.org/jira/browse/FLINK-9053 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > > https://travis-ci.org/apache/flink/jobs/356612100 > {code} > testSavepointRescalingOutKeyedStateDerivedMaxParallelism[0](org.apache.flink.test.checkpointing.RescalingITCase) > Time elapsed: 2.021 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.NoSuchElementException: No value present > at java.util.Optional.get(Optional.java:135) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.closeBufferBuilder(RecordWriter.java:218) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.clearBuffers(RecordWriter.java:178) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.close(StreamRecordWriter.java:99) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.close(RecordWriterOutput.java:161) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.releaseOutputs(OperatorChain.java:239) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:402) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9053) RescalingITCase failed on travis
[ https://issues.apache.org/jira/browse/FLINK-9053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-9053: --- Affects Version/s: 1.6.0 > RescalingITCase failed on travis > > > Key: FLINK-9053 > URL: https://issues.apache.org/jira/browse/FLINK-9053 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > > https://travis-ci.org/apache/flink/jobs/356612100 > {code} > testSavepointRescalingOutKeyedStateDerivedMaxParallelism[0](org.apache.flink.test.checkpointing.RescalingITCase) > Time elapsed: 2.021 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.NoSuchElementException: No value present > at java.util.Optional.get(Optional.java:135) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.closeBufferBuilder(RecordWriter.java:218) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.clearBuffers(RecordWriter.java:178) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.close(StreamRecordWriter.java:99) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.close(RecordWriterOutput.java:161) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.releaseOutputs(OperatorChain.java:239) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:402) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9053) RescalingITCase failed on travis
[ https://issues.apache.org/jira/browse/FLINK-9053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-9053: --- Component/s: (was: Tests) > RescalingITCase failed on travis > > > Key: FLINK-9053 > URL: https://issues.apache.org/jira/browse/FLINK-9053 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > > https://travis-ci.org/apache/flink/jobs/356612100 > {code} > testSavepointRescalingOutKeyedStateDerivedMaxParallelism[0](org.apache.flink.test.checkpointing.RescalingITCase) > Time elapsed: 2.021 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.NoSuchElementException: No value present > at java.util.Optional.get(Optional.java:135) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.closeBufferBuilder(RecordWriter.java:218) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.clearBuffers(RecordWriter.java:178) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.close(StreamRecordWriter.java:99) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.close(RecordWriterOutput.java:161) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.releaseOutputs(OperatorChain.java:239) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:402) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9053) RescalingITCase failed on travis
[ https://issues.apache.org/jira/browse/FLINK-9053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-9053: --- Labels: (was: test-stability) > RescalingITCase failed on travis > > > Key: FLINK-9053 > URL: https://issues.apache.org/jira/browse/FLINK-9053 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > > https://travis-ci.org/apache/flink/jobs/356612100 > {code} > testSavepointRescalingOutKeyedStateDerivedMaxParallelism[0](org.apache.flink.test.checkpointing.RescalingITCase) > Time elapsed: 2.021 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.NoSuchElementException: No value present > at java.util.Optional.get(Optional.java:135) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.closeBufferBuilder(RecordWriter.java:218) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.clearBuffers(RecordWriter.java:178) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.close(StreamRecordWriter.java:99) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.close(RecordWriterOutput.java:161) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.releaseOutputs(OperatorChain.java:239) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:402) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9057) NPE in CreditBasedSequenceNumberingViewReader when cancelling before initilization was complete
Nico Kruber created FLINK-9057: -- Summary: NPE in CreditBasedSequenceNumberingViewReader when cancelling before initilization was complete Key: FLINK-9057 URL: https://issues.apache.org/jira/browse/FLINK-9057 Project: Flink Issue Type: Bug Components: Network Affects Versions: 1.5.0, 1.6.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.5.0 {{RescalingITCase}} unveiled an exception which may occur when shutting down before completely initializing the network stack: https://travis-ci.org/apache/flink/jobs/356612100 {code} 01:08:13,458 WARN org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline - An exception was thrown by a user handler's exceptionCaught() method while handling the following exception: java.lang.NullPointerException at org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.releaseAllResources(CreditBasedSequenceNumberingViewReader.java:192) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.releaseAllResources(PartitionRequestQueue.java:322) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.channelInactive(PartitionRequestQueue.java:298) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8956) Port RescalingITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409766#comment-16409766 ] ASF GitHub Bot commented on FLINK-8956: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5715#discussion_r176475682 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java --- @@ -528,54 +454,44 @@ public void testSavepointRescalingPartitionedOperatorState(boolean scaleOut, Ope } try { - jobManager = cluster.getLeaderGateway(deadline.timeLeft()); - JobGraph jobGraph = createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod); - jobID = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); + final JobID jobID = jobGraph.getJobID(); - Object savepointResponse = null; + client.setDetached(true); + client.submitJob(jobGraph, RescalingITCase.class.getClassLoader()); // wait until the operator is started StateSourceBase.workStartedLatch.await(); - while (deadline.hasTimeLeft()) { - - Future savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.empty()), deadline.timeLeft()); - FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS); - savepointResponse = Await.result(savepointPathFuture, waitingTime); - - if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) { - break; - } - } - - assertTrue(savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess); - - final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResponse).savepointPath(); - - Future jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft()); - - Future cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft()); - - Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft()); + CompletableFuture savepointPathFuture = FutureUtils.retryWithDelay( + () -> { + try { + return client.triggerSavepoint(jobID, null); + } catch (FlinkException e) { + throw new RuntimeException(e); --- End diff -- Shouldn't we return a exceptionally completed future here? > Port RescalingITCase to flip6 > - > > Key: FLINK-8956 > URL: https://issues.apache.org/jira/browse/FLINK-8956 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5715: [FLINK-8956][tests] Port RescalingITCase to flip6
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5715#discussion_r176475682 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java --- @@ -528,54 +454,44 @@ public void testSavepointRescalingPartitionedOperatorState(boolean scaleOut, Ope } try { - jobManager = cluster.getLeaderGateway(deadline.timeLeft()); - JobGraph jobGraph = createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod); - jobID = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); + final JobID jobID = jobGraph.getJobID(); - Object savepointResponse = null; + client.setDetached(true); + client.submitJob(jobGraph, RescalingITCase.class.getClassLoader()); // wait until the operator is started StateSourceBase.workStartedLatch.await(); - while (deadline.hasTimeLeft()) { - - Future savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.empty()), deadline.timeLeft()); - FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS); - savepointResponse = Await.result(savepointPathFuture, waitingTime); - - if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) { - break; - } - } - - assertTrue(savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess); - - final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResponse).savepointPath(); - - Future jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft()); - - Future cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft()); - - Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft()); + CompletableFuture savepointPathFuture = FutureUtils.retryWithDelay( + () -> { + try { + return client.triggerSavepoint(jobID, null); + } catch (FlinkException e) { + throw new RuntimeException(e); --- End diff -- Shouldn't we return a exceptionally completed future here? ---
[jira] [Assigned] (FLINK-9055) WebUI shows job as Running although not enough resources are available
[ https://issues.apache.org/jira/browse/FLINK-9055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9055: - Assignee: Sihua Zhou > WebUI shows job as Running although not enough resources are available > -- > > Key: FLINK-9055 > URL: https://issues.apache.org/jira/browse/FLINK-9055 > Project: Flink > Issue Type: Bug > Components: JobManager, Webfrontend >Affects Versions: 1.5.0 > Environment: * FLIP-6 enabled > * Local Flink instance with fixed number of TMs > * Job parallelism exceeds available slots >Reporter: Fabian Hueske >Assignee: Sihua Zhou >Priority: Major > > The WebUI shows a (batch) job as "Running" although not enough resources have > been allocated to actually run the job with the requested parallelism. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8887) ClusterClient.getJobStatus can throw FencingTokenException
[ https://issues.apache.org/jira/browse/FLINK-8887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409738#comment-16409738 ] Till Rohrmann edited comment on FLINK-8887 at 3/22/18 3:41 PM: --- When did you observe the problem [~gjy]? I think this problem can always occur if you send a rpc while a leader change happens. However, such an event should not happen if the leader component is not killed, if I'm not mistaken. was (Author: till.rohrmann): When did you observe the problem [~gjy]? I think this problem can always occur if you send a rpc while a leader change happens. However, such an event should not happen if you don't kill the leader component, if I'm not mistaken. > ClusterClient.getJobStatus can throw FencingTokenException > -- > > Key: FLINK-8887 > URL: https://issues.apache.org/jira/browse/FLINK-8887 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > *Description* > Calling {{RestClusterClient.getJobStatus}} or > {{MiniClusterClient.getJobStatus}} can result in a {{FencingTokenException}}. > *Analysis* > {{Dispatcher.requestJobStatus}} first looks the {{JobManagerRunner}} up by > job id. If a reference is found, {{requestJobStatus}} is called on the > respective instance. If not, the {{ArchivedExecutionGraphStore}} is queried. > However, between the lookup and the method call, the {{JobMaster}} of the > respective job may have lost leadership already (job finished), and has set > the fencing token to {{null}}. > *Stacktrace* > {noformat} > Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: > Fencing token mismatch: Ignoring message LocalFencedMessage(null, > LocalRpcInvocation(requestJobStatus(Time))) because the fencing token null > did not match the expected fencing token b8423c75bc6838244b8c93c8bd4a4f51. > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:73) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {noformat} > {noformat} > Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: > Fencing token not set: Ignoring message LocalFencedMessage(null, > LocalRpcInvocation(requestJobStatus(Time))) because the fencing token is null. > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:56) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8887) ClusterClient.getJobStatus can throw FencingTokenException
[ https://issues.apache.org/jira/browse/FLINK-8887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409738#comment-16409738 ] Till Rohrmann commented on FLINK-8887: -- When did you observe the problem [~gjy]? I think this problem can always occur if you send a rpc while a leader change happens. However, such an event should not happen if you don't kill the leader component, if I'm not mistaken. > ClusterClient.getJobStatus can throw FencingTokenException > -- > > Key: FLINK-8887 > URL: https://issues.apache.org/jira/browse/FLINK-8887 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > *Description* > Calling {{RestClusterClient.getJobStatus}} or > {{MiniClusterClient.getJobStatus}} can result in a {{FencingTokenException}}. > *Analysis* > {{Dispatcher.requestJobStatus}} first looks the {{JobManagerRunner}} up by > job id. If a reference is found, {{requestJobStatus}} is called on the > respective instance. If not, the {{ArchivedExecutionGraphStore}} is queried. > However, between the lookup and the method call, the {{JobMaster}} of the > respective job may have lost leadership already (job finished), and has set > the fencing token to {{null}}. > *Stacktrace* > {noformat} > Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: > Fencing token mismatch: Ignoring message LocalFencedMessage(null, > LocalRpcInvocation(requestJobStatus(Time))) because the fencing token null > did not match the expected fencing token b8423c75bc6838244b8c93c8bd4a4f51. > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:73) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {noformat} > {noformat} > Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: > Fencing token not set: Ignoring message LocalFencedMessage(null, > LocalRpcInvocation(requestJobStatus(Time))) because the fencing token is null. > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:56) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)