[jira] [Commented] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component
[ https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723766#comment-16723766 ] Alexander Lehmann commented on FLINK-10429: --- Like [~zhuzh] mentioned, a pluggable scheduling mechanism would be great, e.g. in cases where one expects large operator states and therefore the latter shouldn't be randomly scheduled (worst case would be that all end up on a single TaskManager), but maybe follow a certain heuristic or scheduling hints or ... > Redesign Flink Scheduling, introducing dedicated Scheduler component > > > Key: FLINK-10429 > URL: https://issues.apache.org/jira/browse/FLINK-10429 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > This epic tracks the redesign of scheduling in Flink. Scheduling is currently > a concern that is scattered across different components, mainly the > ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on > the granularity of individual tasks, which make holistic scheduling > strategies hard to implement. In this epic we aim to introduce a dedicated > Scheduler component that can support use-case like auto-scaling, > local-recovery, and resource optimized batch. > The design for this feature is developed here: > https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] KarmaGYZ commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
KarmaGYZ commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r242114883 ## File path: docs/dev/stream/state/state.md ## @@ -394,6 +396,47 @@ val ttlConfig = StateTtlConfig This option is not applicable for the incremental checkpointing in the RocksDB state backend. +# Incremental cleanup + +Another option is to trigger cleanup of some state entries incrementally. +The trigger can be a callback from each state access or/and each record processing. +If this cleanup strategy is active for certain state, +The storage backend keeps a lazy global iterator for this state over all its entries. +Every time incremental cleanup is triggered, the iterator is advanced. +The traversed state entries are checked and expired ones are cleaned up. + +This feature can be activated in `StateTtlConfig`: + + + +{% highlight java %} +import org.apache.flink.api.common.state.StateTtlConfig; + StateTtlConfig ttlConfig = StateTtlConfig +.newBuilder(Time.seconds(1)) +.cleanupInRocksdbCompactFilter() +.build(); +{% endhighlight %} + + +{% highlight scala %} +import org.apache.flink.api.common.state.StateTtlConfig + val ttlConfig = StateTtlCon fig +.newBuilder(Time.seconds(1)) +.cleanupInRocksdbCompactFilter Review comment: +1, cleanupIncrementally This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KarmaGYZ commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
KarmaGYZ commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r242114799 ## File path: docs/dev/stream/state/state.md ## @@ -394,6 +396,47 @@ val ttlConfig = StateTtlConfig This option is not applicable for the incremental checkpointing in the RocksDB state backend. +# Incremental cleanup + +Another option is to trigger cleanup of some state entries incrementally. +The trigger can be a callback from each state access or/and each record processing. +If this cleanup strategy is active for certain state, +The storage backend keeps a lazy global iterator for this state over all its entries. +Every time incremental cleanup is triggered, the iterator is advanced. +The traversed state entries are checked and expired ones are cleaned up. + +This feature can be activated in `StateTtlConfig`: + + + +{% highlight java %} +import org.apache.flink.api.common.state.StateTtlConfig; + StateTtlConfig ttlConfig = StateTtlConfig +.newBuilder(Time.seconds(1)) +.cleanupInRocksdbCompactFilter() Review comment: It should be cleanupIncrementally. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-11180) ProcessFailureCancelingITCase#testCancelingOnProcessFailure
[ https://issues.apache.org/jira/browse/FLINK-11180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723762#comment-16723762 ] sunjincheng edited comment on FLINK-11180 at 12/18/18 7:02 AM: --- Hi [~hequn8128] Thanks for take this JIRA. I had check the issue again, I find that root cause is the port of 8081 already in used in my local. Thanks again for help me take the JIRA. So, this is not the blocker for release-1.7.1. was (Author: sunjincheng121): Hi [~hequn8128] Thanks for take this JIRA. I had check the issue again, I find that root cause is the port of 8081 already in used in my local. So this issue do not need fix! Thanks again for help me take the JIRA. So, this is not the blocker for release-1.7.1. > ProcessFailureCancelingITCase#testCancelingOnProcessFailure > --- > > Key: FLINK-11180 > URL: https://issues.apache.org/jira/browse/FLINK-11180 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > > tag: release-1.7.1-rc2 > org.apache.flink.util.FlinkException: Could not create the > DispatcherResourceManagerComponent. > at > org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:242) > at > org.apache.flink.test.recovery.ProcessFailureCancelingITCase.testCancelingOnProcessFailure(ProcessFailureCancelingITCase.java:148) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > Caused by: java.net.BindException: Address already in use > at sun.nio.ch.Net.bind0(Native Method) > at sun.nio.ch.Net.bind(Net.java:433) > at sun.nio.ch.Net.bind(Net.java:425) > at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1358) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:1019) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
[jira] [Commented] (FLINK-11180) ProcessFailureCancelingITCase#testCancelingOnProcessFailure
[ https://issues.apache.org/jira/browse/FLINK-11180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723762#comment-16723762 ] sunjincheng commented on FLINK-11180: - Hi [~hequn8128] Thanks for take this JIRA. I had check the issue again, I find that root cause is the port of 8081 already in used in my local. So this issue do not need fix! Thanks again for help me take the JIRA. So, this is not the blocker for release-1.7.1. > ProcessFailureCancelingITCase#testCancelingOnProcessFailure > --- > > Key: FLINK-11180 > URL: https://issues.apache.org/jira/browse/FLINK-11180 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > > tag: release-1.7.1-rc2 > org.apache.flink.util.FlinkException: Could not create the > DispatcherResourceManagerComponent. > at > org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:242) > at > org.apache.flink.test.recovery.ProcessFailureCancelingITCase.testCancelingOnProcessFailure(ProcessFailureCancelingITCase.java:148) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > Caused by: java.net.BindException: Address already in use > at sun.nio.ch.Net.bind0(Native Method) > at sun.nio.ch.Net.bind(Net.java:433) > at sun.nio.ch.Net.bind(Net.java:425) > at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1358) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:1019) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254) > at > org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:366) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThread
[GitHub] QiLuo-BD commented on issue #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions
QiLuo-BD commented on issue #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions URL: https://github.com/apache/flink/pull/7186#issuecomment-448115581 Hi Zhijiang/Till, Could you kindly share any further review on this? If no more comments, can this be merged? Thanks, Qi This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xueyumusic commented on issue #7318: [FLINK-11099][Table API&SQL] Migrate flink-table runtime CRow Types classes
xueyumusic commented on issue #7318: [FLINK-11099][Table API&SQL] Migrate flink-table runtime CRow Types classes URL: https://github.com/apache/flink/pull/7318#issuecomment-448114794 Hi, @XuQianJin-Stars , according to FLIP-28 it looks CRows may need to migrate into flink-table-runtime module. This moudle will be set up, is it right? @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11181) SimpleRecoveryITCaseBase test error
[ https://issues.apache.org/jira/browse/FLINK-11181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723753#comment-16723753 ] Hequn Cheng commented on FLINK-11181: - This is not a release blocker as it is a bug of the test. > SimpleRecoveryITCaseBase test error > --- > > Key: FLINK-11181 > URL: https://issues.apache.org/jira/browse/FLINK-11181 > Project: Flink > Issue Type: Sub-task >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Run many times always fail. > at > org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.executeAndRunAssertions(SimpleRecoveryITCaseBase.java:124) > at > org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.testRestart(SimpleRecoveryITCaseBase.java:150) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11181) SimpleRecoveryITCaseBase test error
[ https://issues.apache.org/jira/browse/FLINK-11181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723758#comment-16723758 ] sunjincheng commented on FLINK-11181: - Thanks for help me check this issue [~hequn8128] Glad to hear that the issue is not the blocker for release-1.7.1. > SimpleRecoveryITCaseBase test error > --- > > Key: FLINK-11181 > URL: https://issues.apache.org/jira/browse/FLINK-11181 > Project: Flink > Issue Type: Sub-task >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Run many times always fail. > at > org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.executeAndRunAssertions(SimpleRecoveryITCaseBase.java:124) > at > org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.testRestart(SimpleRecoveryITCaseBase.java:150) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on issue #7315: [FLINK-11179][tests] add wait time for every record for testCancelSor…
sunjincheng121 commented on issue #7315: [FLINK-11179][tests] add wait time for every record for testCancelSor… URL: https://github.com/apache/flink/pull/7315#issuecomment-448112450 Hi @hequn8128 I had update the PR. appreciate if you can look at it again! Thanks, Jincheng This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11102) Enable check for previous data in SpanningRecordSerializer
[ https://issues.apache.org/jira/browse/FLINK-11102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] boshu Zheng reassigned FLINK-11102: --- Assignee: boshu Zheng (was: vinoyang) > Enable check for previous data in SpanningRecordSerializer > -- > > Key: FLINK-11102 > URL: https://issues.apache.org/jira/browse/FLINK-11102 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.8.0 >Reporter: Nico Kruber >Assignee: boshu Zheng >Priority: Major > > {{SpanningRecordSerializer}} only verifies that there is no left-over data > from a previous serialization call if {{SpanningRecordSerializer#CHECKED}} is > {{true}} but this is hard-coded as {{false}}. Now if there was previous data, > we would silently drop this and continue with our data. The deserializer > would probably notice and fail but identifying the root cause may not be as > easy anymore. > -> We should enable that check by default since the only thing it does is to > verify {{!java.nio.Buffer#hasRemaining()}} which cannot be too expensive. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11182) JobManagerHACheckpointRecoveryITCase need be improved
[ https://issues.apache.org/jira/browse/FLINK-11182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723748#comment-16723748 ] sunjincheng edited comment on FLINK-11182 at 12/18/18 6:28 AM: --- Sounds good. It's grate that FLINK-10392 will solve this issue. and I had add the link to FLINK-10392. I agree that it's not the blocker for release-1.7.1. was (Author: sunjincheng121): Yes, that's grate that FLINK-10392 will solve this issue. and I had add the link to FLINK-10392. I agree that it's not the blocker for release-1.7.1. > JobManagerHACheckpointRecoveryITCase need be improved > - > > Key: FLINK-11182 > URL: https://issues.apache.org/jira/browse/FLINK-11182 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Priority: Major > > 警告: An exception was thrown by an exception handler. > java.util.concurrent.RejectedExecutionException: Worker has already been > shutdown > at > org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120) > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72) > at > org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56) > at > org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) > at > org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34) > at > org.jboss.netty.channel.DefaultChannelPipeline.execute(DefaultChannelPipeline.java:636) > at > org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496) > at > org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46) > at > org.jboss.netty.channel.DefaultChannelPipeline.notifyHandlerException(DefaultChannelPipeline.java:658) > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:781) > at > org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784) > at > org.jboss.netty.channel.SimpleChannelHandler.disconnectRequested(SimpleChannelHandler.java:320) > at > org.jboss.netty.channel.SimpleChannelHandler.handleDownstream(SimpleChannelHandler.java:274) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582) > at org.jboss.netty.channel.Channels.disconnect(Channels.java:781) > at > org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:219) > at > akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:241) > at > akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:240) > at scala.util.Success.foreach(Try.scala:236) > at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:206) > at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:206) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11182) JobManagerHACheckpointRecoveryITCase need be improved
[ https://issues.apache.org/jira/browse/FLINK-11182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723748#comment-16723748 ] sunjincheng commented on FLINK-11182: - Yes, that's grate that FLINK-10392 will solve this issue. and I had add the link to FLINK-10392. I agree that it's not the blocker for release-1.7.1. > JobManagerHACheckpointRecoveryITCase need be improved > - > > Key: FLINK-11182 > URL: https://issues.apache.org/jira/browse/FLINK-11182 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Priority: Major > > 警告: An exception was thrown by an exception handler. > java.util.concurrent.RejectedExecutionException: Worker has already been > shutdown > at > org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120) > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72) > at > org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56) > at > org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) > at > org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34) > at > org.jboss.netty.channel.DefaultChannelPipeline.execute(DefaultChannelPipeline.java:636) > at > org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496) > at > org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46) > at > org.jboss.netty.channel.DefaultChannelPipeline.notifyHandlerException(DefaultChannelPipeline.java:658) > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:781) > at > org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784) > at > org.jboss.netty.channel.SimpleChannelHandler.disconnectRequested(SimpleChannelHandler.java:320) > at > org.jboss.netty.channel.SimpleChannelHandler.handleDownstream(SimpleChannelHandler.java:274) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582) > at org.jboss.netty.channel.Channels.disconnect(Channels.java:781) > at > org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:219) > at > akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:241) > at > akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:240) > at scala.util.Success.foreach(Try.scala:236) > at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:206) > at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:206) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] leesf removed a comment on issue #7262: [FLINK-10478] Kafka Producer wrongly formats % for transaction ID
leesf removed a comment on issue #7262: [FLINK-10478] Kafka Producer wrongly formats % for transaction ID URL: https://github.com/apache/flink/pull/7262#issuecomment-447745066 @tillrohrmann hi till, could you please merge this pr when you are free. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] leesf commented on issue #7262: [FLINK-10478] Kafka Producer wrongly formats % for transaction ID
leesf commented on issue #7262: [FLINK-10478] Kafka Producer wrongly formats % for transaction ID URL: https://github.com/apache/flink/pull/7262#issuecomment-448110778 @tzulitai hi tzu, could you please merge this pr when you are free. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11179) JoinCancelingITCase#testCancelSortMatchWhileDoingHeavySorting test error
[ https://issues.apache.org/jira/browse/FLINK-11179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723742#comment-16723742 ] sunjincheng commented on FLINK-11179: - The issue is not the blocker for release-1.7.1. > JoinCancelingITCase#testCancelSortMatchWhileDoingHeavySorting test error > - > > Key: FLINK-11179 > URL: https://issues.apache.org/jira/browse/FLINK-11179 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > tag: release-1.7.1-rc2 > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find > Flink job (f8abcfa2bf2f9bf13024075e51891d2e) > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.client.program.MiniClusterClient.cancel(MiniClusterClient.java:118) > at > org.apache.flink.test.cancelling.CancelingTestBase.runAndCancelJob(CancelingTestBase.java:109) > at > org.apache.flink.test.cancelling.JoinCancelingITCase.executeTaskWithGenerator(JoinCancelingITCase.java:94) > at > org.apache.flink.test.cancelling.JoinCancelingITCase.testCancelSortMatchWhileDoingHeavySorting(JoinCancelingITCase.java:99) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could > not find Flink job (f8abcfa2bf2f9bf13024075e51891d2e) > at > org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGatewayFuture(Dispatcher.java:766) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kisimple opened a new pull request #7321: [hotfix] Fix typo in AbstractStreamOperator
kisimple opened a new pull request #7321: [hotfix] Fix typo in AbstractStreamOperator URL: https://github.com/apache/flink/pull/7321 retrieven -> retrieved This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kisimple commented on issue #7192: [hotfix][test][streaming] Fix a test failure in Windows 7 environment.
kisimple commented on issue #7192: [hotfix][test][streaming] Fix a test failure in Windows 7 environment. URL: https://github.com/apache/flink/pull/7192#issuecomment-448099846 This would be an annoying problem when working on Windows. cc @kl0u This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11083) CRowSerializerConfigSnapshot is not instantiable
[ https://issues.apache.org/jira/browse/FLINK-11083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723649#comment-16723649 ] Tzu-Li (Gordon) Tai commented on FLINK-11083: - Merged. master (1.8.0) - 45f9cccbe2f2aefe392b2c2df2a464eaed2e0323 release-1.7 - 5ce9f228959469b9e7c8e585aaffadc7eff5d028 release-1.6 - 26b182c3ce21968118b483bfa8aca38ecd54a8a3 > CRowSerializerConfigSnapshot is not instantiable > > > Key: FLINK-11083 > URL: https://issues.apache.org/jira/browse/FLINK-11083 > Project: Flink > Issue Type: Bug > Components: Table API & SQL, Type Serialization System >Reporter: boshu Zheng >Assignee: boshu Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.7.2, 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > An exception was encountered when restarting a job with savepoint in our > production env, > {code:java} > 2018-12-04 20:28:25,091 INFO 10595 org.apache.flink.runtime.taskmanager.Task > :917 - _OurCustomOperator_ -> select: () -> to: Tuple2 -> > Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING > to FAILED. > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkException: Could not restore operator > state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) > from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140) > ... 5 more > Caused by: java.lang.RuntimeException: The class > 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot' > is not instantiable: The class has no (implicit) public nullary constructor, > i.e. a constructor without arguments. > at > org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412) > at > org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) > at > org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) > at > org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:15
[jira] [Resolved] (FLINK-11083) CRowSerializerConfigSnapshot is not instantiable
[ https://issues.apache.org/jira/browse/FLINK-11083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-11083. - Resolution: Fixed > CRowSerializerConfigSnapshot is not instantiable > > > Key: FLINK-11083 > URL: https://issues.apache.org/jira/browse/FLINK-11083 > Project: Flink > Issue Type: Bug > Components: Table API & SQL, Type Serialization System >Reporter: boshu Zheng >Assignee: boshu Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.7.2, 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > An exception was encountered when restarting a job with savepoint in our > production env, > {code:java} > 2018-12-04 20:28:25,091 INFO 10595 org.apache.flink.runtime.taskmanager.Task > :917 - _OurCustomOperator_ -> select: () -> to: Tuple2 -> > Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING > to FAILED. > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkException: Could not restore operator > state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) > from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140) > ... 5 more > Caused by: java.lang.RuntimeException: The class > 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot' > is not instantiable: The class has no (implicit) public nullary constructor, > i.e. a constructor without arguments. > at > org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412) > at > org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) > at > org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) > at > org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) > ... 7 more > {code} > I add tests to CRowSerializerTest to make sure
[GitHub] asfgit closed pull request #7267: [FLINK-11083][Table&SQL] CRowSerializerConfigSnapshot is not instantiable
asfgit closed pull request #7267: [FLINK-11083][Table&SQL] CRowSerializerConfigSnapshot is not instantiable URL: https://github.com/apache/flink/pull/7267 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala index 0ce3aee3739..b3fe5085151 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala @@ -81,7 +81,7 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali // override def snapshotConfiguration(): TypeSerializerConfigSnapshot[CRow] = { -new CRowSerializer.CRowSerializerConfigSnapshot(rowSerializer) +new CRowSerializer.CRowSerializerConfigSnapshot(Array(rowSerializer)) } override def ensureCompatibility( @@ -115,9 +115,13 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali object CRowSerializer { - class CRowSerializerConfigSnapshot(rowSerializers: TypeSerializer[Row]*) + class CRowSerializerConfigSnapshot(rowSerializers: Array[TypeSerializer[Row]]) extends CompositeTypeSerializerConfigSnapshot[CRow](rowSerializers: _*) { +def this() { + this(Array.empty) +} + override def getVersion: Int = CRowSerializerConfigSnapshot.VERSION } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala index 7483b04d9ca..055501a6a01 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala @@ -18,8 +18,20 @@ package org.apache.flink.table.runtime.types -import org.apache.flink.util.TestLogger -import org.junit.Test +import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} +import org.apache.flink.api.common.typeinfo.Types +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, KeyedProcessOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, InstantiationUtil, TestLogger} + +import org.junit.{Assert, Test} class CRowSerializerTest extends TestLogger { @@ -29,6 +41,70 @@ class CRowSerializerTest extends TestLogger { @Test def testDefaultConstructor(): Unit = { new CRowSerializer.CRowSerializerConfigSnapshot() + + InstantiationUtil.instantiate(classOf[CRowSerializer.CRowSerializerConfigSnapshot]) + } + + @Test + def testStateRestore(): Unit = { + +class IKeyedProcessFunction extends KeyedProcessFunction[Integer, Integer, Integer] { + var state: ListState[CRow] = _ + override def open(parameters: Configuration): Unit = { +val stateDesc = new ListStateDescriptor[CRow]("CRow", + new CRowTypeInfo(new RowTypeInfo(Types.INT))) +state = getRuntimeContext.getListState(stateDesc) + } + override def processElement(value: Integer, + ctx: KeyedProcessFunction[Integer, Integer, Integer]#Context, + out: Collector[Integer]): Unit = { +state.add(new CRow(Row.of(value), true)) + } +} + +val operator = new KeyedProcessOperator[Integer, Integer, Integer](new IKeyedProcessFunction) + +var testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, Integer]( + operator, + new KeySelector[Integer, Integer] { +override def getKey(value: Integer): Integer= -1 + }, + Types.INT, 1, 1, 0) +testHarness.setup() +testHarness.open() + +testHarness.processElement(new StreamRecord[Integer](1, 1L)) +testHarness.processElement(new StreamRecord[Integer](2, 1L)) +testHarness.processElement(new StreamRecord[Integer](3, 1L)) + +Assert.assertEquals(1, numKeyedStateEntries(operator)) + +val snapshot = testHarnes
[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723617#comment-16723617 ] lining edited comment on FLINK-11162 at 12/18/18 3:28 AM: -- [~yanghua] we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this vinoyang we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this { "jid": "ccee3dc6ac63c532feea3e8cdbf05eab", "name": "Socket Window WordCount", "nodes": [ { "id": "cbc357ccb763df2852fee8c4fc7d55f2", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Source: Socket Stream -> Flat Map", "optimizer_properties": {} }, { "id": "90bea66de1c231edf33913ecd54406c1", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out", "inputs": [ { "num": 0, "id": "cbc357ccb763df2852fee8c4fc7d55f2", "ship_strategy": "HASH", "exchange": "pipelined" } ], "optimizer_properties": {} } ], "operators": [ { "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2", "operator_id": "7df19f87deec5680128845fd9a6ca18d", "name": "Flat Map", "inputs": [ \{ "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2", "partitioner": "FORWARD", "type_number": 0 },... ] } ] } was (Author: lining): [~yanghua] we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this vinoyang we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this { "jid": "ccee3dc6ac63c532feea3e8cdbf05eab", "name": "Socket Window WordCount", "nodes": [ { "id": "cbc357ccb763df2852fee8c4fc7d55f2", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Source: Socket Stream -> Flat Map", "optimizer_properties": {} }, { "id": "90bea66de1c231edf33913ecd54406c1", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out", "inputs": [ { "num": 0, "id": "cbc357ccb763df2852fee8c4fc7d55f2", "ship_strategy": "HASH", "exchange": "pipelined" } ], "optimizer_properties": {} } ], "operators": [ { "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2", "operator_id": "7df19f87deec5680128845fd9a6ca18d", "name": "Flat Map", "inputs": [ { "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2", "partitioner": "FORWARD", "type_number": 0 } , ... ] } ] } > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: lining >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Clarkkkkk commented on issue #7258: [FLINK-11084]Throw a hard exception to remind developers while there's no stream node between two split transformation
Clark commented on issue #7258: [FLINK-11084]Throw a hard exception to remind developers while there's no stream node between two split transformation URL: https://github.com/apache/flink/pull/7258#issuecomment-448084237 @walterddr Good point, I will tag it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723629#comment-16723629 ] lining commented on FLINK-11162: [~till.rohrmann] [~Zentol], what is your point? > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: lining >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723629#comment-16723629 ] lining edited comment on FLINK-11162 at 12/18/18 3:29 AM: -- [~till.rohrmann] [~Zentol], what are your point? was (Author: lining): [~till.rohrmann] [~Zentol], what is your point? > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: lining >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723617#comment-16723617 ] lining edited comment on FLINK-11162 at 12/18/18 3:25 AM: -- [~yanghua] we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this vinoyang we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this { "jid": "ccee3dc6ac63c532feea3e8cdbf05eab", "name": "Socket Window WordCount", "nodes": [ { "id": "cbc357ccb763df2852fee8c4fc7d55f2", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Source: Socket Stream -> Flat Map", "optimizer_properties": {} }, { "id": "90bea66de1c231edf33913ecd54406c1", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out", "inputs": [ { "num": 0, "id": "cbc357ccb763df2852fee8c4fc7d55f2", "ship_strategy": "HASH", "exchange": "pipelined" } ], "optimizer_properties": {} } ], "operators": [ { "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2", "operator_id": "7df19f87deec5680128845fd9a6ca18d", "name": "Flat Map", "inputs": [ { "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2", "partitioner": "FORWARD", "type_number": 0 } , ... ] } ] } was (Author: lining): [~yanghua] we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this vinoyang we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this { "jid": "ccee3dc6ac63c532feea3e8cdbf05eab", "name": "Socket Window WordCount", "nodes": [ { "id": "cbc357ccb763df2852fee8c4fc7d55f2", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Source: Socket Stream -> Flat Map", "optimizer_properties": {} }, { "id": "90bea66de1c231edf33913ecd54406c1", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out", "inputs": [ { "num": 0, "id": "cbc357ccb763df2852fee8c4fc7d55f2", "ship_strategy": "HASH", "exchange": "pipelined" } ], "optimizer_properties": {} } ], "operators": [ { "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2", "operator_id": "7df19f87deec5680128845fd9a6ca18d", "name": "Flat Map", "inputs": [ { "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2", "partitioner": "FORWARD", "type_number": 0 }, ... ] } ] } > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: lining >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723617#comment-16723617 ] lining edited comment on FLINK-11162 at 12/18/18 3:24 AM: -- [~yanghua] we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this vinoyang we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this { "jid": "ccee3dc6ac63c532feea3e8cdbf05eab", "name": "Socket Window WordCount", "nodes": [ { "id": "cbc357ccb763df2852fee8c4fc7d55f2", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Source: Socket Stream -> Flat Map", "optimizer_properties": {} }, { "id": "90bea66de1c231edf33913ecd54406c1", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out", "inputs": [ { "num": 0, "id": "cbc357ccb763df2852fee8c4fc7d55f2", "ship_strategy": "HASH", "exchange": "pipelined" } ], "optimizer_properties": {} } ], "operators": [ { "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2", "operator_id": "7df19f87deec5680128845fd9a6ca18d", "name": "Flat Map", "inputs": [ { "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2", "partitioner": "FORWARD", "type_number": 0 }, ... ] } ] } was (Author: lining): [~yanghua] we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this vinoyang we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this { "jid": "ccee3dc6ac63c532feea3e8cdbf05eab", "name": "Socket Window WordCount", "nodes": [ { "id": "cbc357ccb763df2852fee8c4fc7d55f2", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Source: Socket Stream -> Flat Map", "optimizer_properties": {} }, { "id": "90bea66de1c231edf33913ecd54406c1", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out", "inputs": [ { "num": 0, "id": "cbc357ccb763df2852fee8c4fc7d55f2", "ship_strategy": "HASH", "exchange": "pipelined" } ], "optimizer_properties": {} } ], "operators": [ { "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2", "operator_id": "7df19f87deec5680128845fd9a6ca18d", "name": "Flat Map", "inputs": [ { "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2", "partitioner": "FORWARD", "type_number": 0 } ] }, { "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2", "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2", "name": "Source: Socket Stream", "inputs": [], "metric_name": "Source__Socket_Stream" }, { "vertex_id": "90bea66de1c231edf33913ecd54406c1", "operator_id": "17fbfcaabad45985bbdf4da0490487e3", "name": "Sink: Print to Std. Out", "inputs": [ { "operator_id": "90bea66de1c231edf33913ecd54406c1", "partitioner": "FORWARD", "type_number": 0 } ] }, { "vertex_id": "90bea66de1c231edf33913ecd54406c1", "operator_id": "90bea66de1c231edf33913ecd54406c1", "name": "Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)", "inputs": [ { "operator_id": "7df19f87deec5680128845fd9a6ca18d", "partitioner": "HASH", "type_number": 0 } ] } ] } > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: lining >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723623#comment-16723623 ] vinoyang commented on FLINK-11162: -- My initial thought was to add to jobs:/jobid/plan, but this may cause incompatibilities with existing interfaces, but the subsequent rest API will carry the version number, maybe compatibility will no longer be a hindrance? Providing a separate interface, we don't need to worry about any problems, but I don't know if it is necessary enough. Maybe you can listen to the ideas of [~till.rohrmann] and [~Zentol] before you start. > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: lining >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723617#comment-16723617 ] lining edited comment on FLINK-11162 at 12/18/18 3:19 AM: -- [~yanghua] we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this vinoyang we need add this information in which api, create new or use old api like jobs:/jobid/plan? I prefer in old api, result like this { "jid": "ccee3dc6ac63c532feea3e8cdbf05eab", "name": "Socket Window WordCount", "nodes": [ { "id": "cbc357ccb763df2852fee8c4fc7d55f2", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Source: Socket Stream -> Flat Map", "optimizer_properties": {} }, { "id": "90bea66de1c231edf33913ecd54406c1", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out", "inputs": [ { "num": 0, "id": "cbc357ccb763df2852fee8c4fc7d55f2", "ship_strategy": "HASH", "exchange": "pipelined" } ], "optimizer_properties": {} } ], "operators": [ { "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2", "operator_id": "7df19f87deec5680128845fd9a6ca18d", "name": "Flat Map", "inputs": [ { "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2", "partitioner": "FORWARD", "type_number": 0 } ] }, { "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2", "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2", "name": "Source: Socket Stream", "inputs": [], "metric_name": "Source__Socket_Stream" }, { "vertex_id": "90bea66de1c231edf33913ecd54406c1", "operator_id": "17fbfcaabad45985bbdf4da0490487e3", "name": "Sink: Print to Std. Out", "inputs": [ { "operator_id": "90bea66de1c231edf33913ecd54406c1", "partitioner": "FORWARD", "type_number": 0 } ] }, { "vertex_id": "90bea66de1c231edf33913ecd54406c1", "operator_id": "90bea66de1c231edf33913ecd54406c1", "name": "Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)", "inputs": [ { "operator_id": "7df19f87deec5680128845fd9a6ca18d", "partitioner": "HASH", "type_number": 0 } ] } ] } was (Author: lining): [~yanghua] we need add this information in which api, create new or use old api like jobs:/jobid/plan? > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: lining >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r242392587 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ## @@ -489,6 +491,88 @@ object UserDefinedFunctionUtils { ) } +def decorateDataViewTypeInfo( +fieldTypeInfo: TypeInformation[_], +fieldInstance: AnyRef, +field: Field): (TypeInformation[_], Option[DataViewSpec[_]]) = fieldTypeInfo match { + case ct: CompositeType[_] if includesDataView(ct) => +throw new TableException( + "MapView and ListView only supported at first level of accumulators of Pojo, Tuple " + +"and Case Class type.") + case map: MapViewTypeInfo[_, _] => +val mapView = fieldInstance.asInstanceOf[MapView[_, _]] +val newTypeInfo = if (mapView != null && mapView.keyTypeInfo != null && + mapView.valueTypeInfo != null) { + new MapViewTypeInfo(mapView.keyTypeInfo, mapView.valueTypeInfo) +} else { + map +} + +if (isStateBackedDataViews) { + newTypeInfo.nullSerializer = true + + // create map view specs with unique id (used as state name) + val fieldName = field.getName + var spec = MapViewSpec( +"agg" + index + "$" + fieldName, +field, +newTypeInfo) + + (newTypeInfo, Some(spec)) +} else { + (newTypeInfo, None) +} + + case list: ListViewTypeInfo[_] => Review comment: Maybe add a ListView test case? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r242392503 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ## @@ -489,6 +491,88 @@ object UserDefinedFunctionUtils { ) } +def decorateDataViewTypeInfo( +fieldTypeInfo: TypeInformation[_], +fieldInstance: AnyRef, +field: Field): (TypeInformation[_], Option[DataViewSpec[_]]) = fieldTypeInfo match { + case ct: CompositeType[_] if includesDataView(ct) => +throw new TableException( + "MapView and ListView only supported at first level of accumulators of Pojo, Tuple " + +"and Case Class type.") + case map: MapViewTypeInfo[_, _] => +val mapView = fieldInstance.asInstanceOf[MapView[_, _]] +val newTypeInfo = if (mapView != null && mapView.keyTypeInfo != null && + mapView.valueTypeInfo != null) { + new MapViewTypeInfo(mapView.keyTypeInfo, mapView.valueTypeInfo) +} else { + map +} + +if (isStateBackedDataViews) { Review comment: I was wondering if this is ever needed. since the test case added in this PR doesn't cover this code path on the else case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r242393559 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class NullSerializer extends TypeSerializerSingleton[Any] { Review comment: I am assuming `NullSerializer` is needed because the actual map is actually handled by StateBackend and because we removed them through `removeStateViewFieldsFromAccTypeInfo`. I was wondering why is this needed since it was not introduced in previous approach. Neither was it reflected in the test case ( still passes without the nullserializer setting) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723617#comment-16723617 ] lining commented on FLINK-11162: [~yanghua] we need add this information in which api, create new or use old api like jobs:/jobid/plan? > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: lining >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11184) Rework TableSource and TableSink interfaces
[ https://issues.apache.org/jira/browse/FLINK-11184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723603#comment-16723603 ] Wenlong Lyu commented on FLINK-11184: - Hi, Timo, I think there is one more issue may needed to be considered:add an explicit way to defining the conversion from a user defined type to structed type in table, which make the conversion from a DataStream to a Dynamic Table more clear. > Rework TableSource and TableSink interfaces > --- > > Key: FLINK-11184 > URL: https://issues.apache.org/jira/browse/FLINK-11184 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > There are a couple of shortcomings with the current {{TableSource}} and > {{TableSink}} interface design. Some of the issues are covered in a [basic > design > document|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf] > that was published a while ago. > The design document has not been updated for some time and partially overlaps > with the [current SQL DDL > discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-SQL-DDL-Design-td25006.html] > for the {{CREATE TABLE}} statement on the ML. > What needs to be solved: > - How to unify sources and sinks in regards of schema and time attributes? > - How to define watermarks, timestamp extractors or timestamp ingestion? > - How to define primary keys and partitioning keys? > - How to differentiate between update modes for tables (i.e. how to read from > a append, retraction, or update table)? > - How to express all of the above without pulling in to many dependencies on > other Flink modules if source and sink interfaces are located in > {{flink-table-spi}} package? > As of the current state of the discussion, it seems that we might extend > {{TableSchema}} to allow for returning the information above and remove > current interfaces such as {{DefinedRowtimeAttribute}} or > {{DefinedFieldMapping}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] XiaoZYang closed pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector
XiaoZYang closed pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector URL: https://github.com/apache/flink/pull/5845 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml new file mode 100644 index 000..a0ac2c1facf --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/pom.xml @@ -0,0 +1,121 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.6-SNAPSHOT + .. + + + flink-connector-pulsar_${scala.binary.version} + flink-connector-pulsar + + 1.20.0-incubating + + + jar + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + provided + + + + org.apache.flink + flink-table_${scala.binary.version} + ${project.version} + provided + + true + + + + org.apache.pulsar + pulsar-client + shaded + ${pulsar.version} + + + + org.apache.flink + flink-runtime_${scala.binary.version} + ${project.version} + test-jar + test + + + + org.apache.flink + flink-tests_${scala.binary.version} + ${project.version} + test + test-jar + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + test-jar + test + + + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${project.version} + test + + + + org.javassist + javassist + 3.20.0-GA + test + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java new file mode 100644 index 000..64b1397c217 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.com
[GitHub] XiaoZYang commented on issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector
XiaoZYang commented on issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector URL: https://github.com/apache/flink/pull/5845#issuecomment-448070113 @sijie @tzulitai Thank you for your works and suggestions, I'm going to close this PR, any other advice? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11172) Remove the max retention time in StreamQueryConfig
[ https://issues.apache.org/jira/browse/FLINK-11172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723586#comment-16723586 ] Hequn Cheng commented on FLINK-11172: - Hi all, thanks for your discussion and suggestions. [~fhueske] I found bounded over will clean up state if retention time has been configured. Should we remove the retention logic for them? I created another jira to address the problem of bounded over. see FLINK-11188 > Remove the max retention time in StreamQueryConfig > -- > > Key: FLINK-11172 > URL: https://issues.apache.org/jira/browse/FLINK-11172 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.8.0 >Reporter: Yangze Guo >Assignee: Yangze Guo >Priority: Major > > [Stream Query > Config|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html] > is an important and useful feature to make a tradeoff between accuracy and > resource consumption when some query executed in unbounded streaming data. > This feature first proposed in > [FLINK-6491|https://issues.apache.org/jira/browse/FLINK-6491]. > At the first, *QueryConfig* take two parameters, i.e. > minIdleStateRetentionTime and maxIdleStateRetentionTime, to avoid to register > many timers if we have more freedom when to discard state. However, this > approach may cause new data expired earlier than old data and thus greater > accuracy loss appeared in some case. For example, we have an unbounded keyed > streaming data. We process key *_a_* in _*t0*_ and _*b*_ in _*t1,*_ *_t0 < > t1_*. *_a_* will expired in _*a+maxIdleStateRetentionTime*_ while _*b*_ > expired in *_b+maxIdleStateRetentionTime_*. Now, another data with key *_a_* > arrived in _*t2 (t1 < t2)*_. But _*t2+minIdleStateRetentionTime*_ < > _*a+maxIdleStateRetentionTime*_. The state of key *_a_* will still be expired > in _*a+maxIdleStateRetentionTime*_ which is early than the state of key > _*b*_. According to the guideline of > [LRU|https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_recently_used_(LRU)] > that the element has been most heavily used in the past few instructions are > most likely to be used heavily in the next few instructions too. The state > with key _*a*_ should live longer than the state with key _*b*_. Current > approach against this idea. > I think we now have a good chance to remove the maxIdleStateRetentionTime > argument in *StreamQueryConfig.* Below are my reasons. > * [FLINK-9423|https://issues.apache.org/jira/browse/FLINK-9423] implement > efficient deletes for heap-based timer service. We can leverage the deletion > op to mitigate the abuse of timer registration. > * Current approach can cause new data expired earlier than old data and thus > greater accuracy loss appeared in some case. Users need to fine-tune these > two parameter to avoid this scenario. Directly following the idea of LRU > looks like a better solution. > So, I plan to remove maxIdleStateRetentionTime, update the expire time only > depends on _*minIdleStateRetentionTime.*_ > cc to [~sunjincheng121], [~fhueske] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11188) Bounded over should not enable state retention time
Hequn Cheng created FLINK-11188: --- Summary: Bounded over should not enable state retention time Key: FLINK-11188 URL: https://issues.apache.org/jira/browse/FLINK-11188 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Hequn Cheng Assignee: Hequn Cheng As discussed in FLINK-11172, time-based operations (GROUP BY windows, OVER windows, time-windowed join, etc.) are inherently bound by time and automatically clean up their state. We should not add state cleanup or TTL for these operators. If I understand correctly, we should not add the retention logic for rows-bounded operations either. I think we should disable state retention logic for: - ProcTimeBoundedRangeOver - ProcTimeBoundedRowsOver - RowTimeBoundedRangeOver - RowTimeBoundedRowsOver Any suggestions are appreciated! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-11162: Assignee: lining (was: vinoyang) > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: lining >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723560#comment-16723560 ] vinoyang commented on FLINK-11162: -- [~lining] Done. > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: lining >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723559#comment-16723559 ] lining edited comment on FLINK-11162 at 12/18/18 1:30 AM: -- [~yanghua] ok, can you assign this to me. was (Author: lining): [~yanghua] ok, can you assign this to me. > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723559#comment-16723559 ] lining edited comment on FLINK-11162 at 12/18/18 1:29 AM: -- [~yanghua] ok, can you assign this to me. was (Author: lining): [~yanghua] ok > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723559#comment-16723559 ] lining commented on FLINK-11162: [~yanghua] ok > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11129) dashbord for job which contain important information
[ https://issues.apache.org/jira/browse/FLINK-11129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723557#comment-16723557 ] lining commented on FLINK-11129: [~till.rohrmann] [~Zentol]. Now, user need click many page, can we add dashbord for job. > dashbord for job which contain important information > > > Key: FLINK-11129 > URL: https://issues.apache.org/jira/browse/FLINK-11129 > Project: Flink > Issue Type: Improvement > Components: REST, Webfrontend >Reporter: lining >Assignee: lining >Priority: Major > > Now, to check job problem need clink many pages. Can we add one dashbord for > job which contains failover, subtask which in queue is 100%, taskmanagers > which have gc problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11129) dashbord for job which contain important information
[ https://issues.apache.org/jira/browse/FLINK-11129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lining reassigned FLINK-11129: -- Assignee: lining > dashbord for job which contain important information > > > Key: FLINK-11129 > URL: https://issues.apache.org/jira/browse/FLINK-11129 > Project: Flink > Issue Type: Improvement > Components: REST, Webfrontend >Reporter: lining >Assignee: lining >Priority: Major > > Now, to check job problem need clink many pages. Can we add one dashbord for > job which contains failover, subtask which in queue is 100%, taskmanagers > which have gc problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723553#comment-16723553 ] vinoyang commented on FLINK-11162: -- [~lining] Sounds good. I hope to see this PR as soon as possible. > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723551#comment-16723551 ] lining edited comment on FLINK-11162 at 12/18/18 1:20 AM: -- [~yanghua] we have done this. After the web refactor, we will push it. was (Author: lining): [~yanghua] we have done this. After the web refactor, we will push it. > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723551#comment-16723551 ] lining commented on FLINK-11162: [~yanghua] we have done this. After the web refactor, we will push it. > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11103) Set a default uncaught exception handler
[ https://issues.apache.org/jira/browse/FLINK-11103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723547#comment-16723547 ] vinoyang commented on FLINK-11103: -- Oh, I see you mentioned FLINK-5232, and it mainly adds exception catching to the thread pool of the actor system's dispatcher. So, I thought this is the same as the issue. However, I think your consideration is reasonable. In addition to the actor system, TM still uses threads in many places. > Set a default uncaught exception handler > > > Key: FLINK-11103 > URL: https://issues.apache.org/jira/browse/FLINK-11103 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.8.0 >Reporter: Nico Kruber >Assignee: vinoyang >Priority: Major > > We should set a default uncaught exception handler in Flink via > {{Thread.setDefaultUncaughtExceptionHandler()}} which at least logs the > exceptions. Ideally, we would even fail the job (could make this > configurable) but users may have some ill-behaving threads, e.g. through > libraries, which they would want to tolerate and we don't want to change > behaviour now. > (FLINK-5232 added this for the JobManager, we need it for the TaskManager) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11146) Get rid of legacy codes from ClusterClient
[ https://issues.apache.org/jira/browse/FLINK-11146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723520#comment-16723520 ] TisonKun commented on FLINK-11146: -- Thanks for your explanation [~mxm]. Given JobClient is just a static helper class, I can bravely peel those codes as legacy logics. IIRC [~till.rohrmann] introduced an interface {{NewClusterClient}} and many code paths changed. Do we still follow the designed structure on https://cwiki.apache.org/confluence/display/FLINK/Refactoring+of+client+classes+for+cluster+management ? If so, then the mainly job is to consummate {{RestClusterClient}}(to use a proper HTTP client) and to clean up legacy codes. > Get rid of legacy codes from ClusterClient > -- > > Key: FLINK-11146 > URL: https://issues.apache.org/jira/browse/FLINK-11146 > Project: Flink > Issue Type: Sub-task > Components: Client, Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Fix For: 1.8.0 > > > As [~StephanEwen] mentioned in ML, the client needs big refactoring / > cleanup. It should use a proper HTTP client library to help with future > authentication mechanisms. > After an investigation I notice that the valid cluster clients are only > {{MiniClusterClient}} and {{RestClusterClient}}. Legacy clients, > {{StandaloneClusterClient}} and {{YarnClusterClient}}, as well as pre-FLIP-6 > codes inside {{ClusterClient}}, should be removed as part of FLINK-10392. > With this removal we arrive a clean stage where we can think how to implement > a proper HTTP client more comfortably. > 1. {{StandaloneClusterClient}} is now depended on by > {{LegacyStandaloneClusterDescriptor}} (the removal is tracked by FLINK-10700) > and {{FlinkClient}}(part of flink-storm which is decided to be removed > FLINK-10571). Also relevant tests need to be ported(or directly removed). > 2. The removal of {{YarnClusterClient}} should go along with FLINK-11106 > Remove legacy flink-yarn component. > 3. Testing classes inheriting from {{ClusterClient}} need to be ported(or > directly removed). > 4. Get rid of legacy codes inside {{ClusterClient}} it self, such as > {{#run(JobGraph, ClassLoader)}} > Besides, what is {{JobClient}} used for? I cannot find valid usages of it. > (Till mentioned it at ML > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E) > cc [~mxm] [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile ZooKeeperCompletedCheckpointStoreMockitoTest with JDK 9
TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile ZooKeeperCompletedCheckpointStoreMockitoTest with JDK 9 URL: https://github.com/apache/flink/pull/7302#issuecomment-448048599 @GJL for "It is not obvious why Comparator.comparing does not work.", I did a survey and it points to a call to static method of interface(`Comparator#comparing` here) while using previous version of ASM(of course mock framework hacking bytecodes). Groovy community has suffered this issue. FYI, https://issues.apache.org/jira/browse/GROOVY-8338 https://issues.apache.org/jira/browse/GROOVY-8528 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #7315: [FLINK-11179][tests] add wait time for every record for testCancelSor…
sunjincheng121 commented on a change in pull request #7315: [FLINK-11179][tests] add wait time for every record for testCancelSor… URL: https://github.com/apache/flink/pull/7315#discussion_r242356994 ## File path: flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java ## @@ -122,9 +122,11 @@ public void testCancelSortMatchWithHighparallelism() throws Exception { private static final class SimpleMatcher implements JoinFunction, Tuple2, Tuple2> { private static final long serialVersionUID = 1L; + private static final int WAIT_TIME_PER_RECORD = 300; // 0.3 sec. Review comment: Hi Thanks for the review @hequn8128 ! If we using `DelayingMatcher ` then we do not need this test case anymore. So I think both add the wait_time and make delayingMatcher are can not achieve the purpose of testing。 Because wait_time and delayingMatcher only test the join phase,we want test the sore-merge phase. So we should make sore-merge phase take more time to achieve the purpose of testing. I got a better approach,and will update the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #7315: [FLINK-11179][tests] add wait time for every record for testCancelSor…
sunjincheng121 commented on a change in pull request #7315: [FLINK-11179][tests] add wait time for every record for testCancelSor… URL: https://github.com/apache/flink/pull/7315#discussion_r242356994 ## File path: flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java ## @@ -122,9 +122,11 @@ public void testCancelSortMatchWithHighparallelism() throws Exception { private static final class SimpleMatcher implements JoinFunction, Tuple2, Tuple2> { private static final long serialVersionUID = 1L; + private static final int WAIT_TIME_PER_RECORD = 300; // 0.3 sec. Review comment: Hi Thanks for the review @hequn8128 ! If we using `DelayingMatcher ` then we do not need this test case anymore. So I think both add the wait_time and make delayingMatcher are can not achieve the purpose of testing。 Because wait_time and delayingMatcher only test the join phase,we want test the sore-merge phase. So we should make sore-merge phase take more time to achieve the purpose of testing. I think a better approach,and will update the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #7315: [FLINK-11179][tests] add wait time for every record for testCancelSor…
sunjincheng121 commented on a change in pull request #7315: [FLINK-11179][tests] add wait time for every record for testCancelSor… URL: https://github.com/apache/flink/pull/7315#discussion_r242356994 ## File path: flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java ## @@ -122,9 +122,11 @@ public void testCancelSortMatchWithHighparallelism() throws Exception { private static final class SimpleMatcher implements JoinFunction, Tuple2, Tuple2> { private static final long serialVersionUID = 1L; + private static final int WAIT_TIME_PER_RECORD = 300; // 0.3 sec. Review comment: Hi Thanks for the review @hequn8128 ! I think both add the wait_time and make delayingMatcher are can not achieve the purpose of testing。 Because wait_time and delayingMatcher only test the join phase,we want test the sore-merge phase. So we should make sore-merge phase take more time to achieve the purpose of testing. I think a better approach,and will update the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-448037747 @azagrebin ,I've just updated the code, please help to review it when you have time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11187) StreamingFileSink with S3 backend transient socket timeout issues
[ https://issues.apache.org/jira/browse/FLINK-11187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723402#comment-16723402 ] Addison Higham commented on FLINK-11187: See [https://lists.apache.org/thread.html/87eb8e16abad09232da5fbc6999c19c4fba0f16d641c565e62096564@%3Cuser.flink.apache.org%3E] for the first discussion of this on the mailing list. > StreamingFileSink with S3 backend transient socket timeout issues > -- > > Key: FLINK-11187 > URL: https://issues.apache.org/jira/browse/FLINK-11187 > Project: Flink > Issue Type: Bug > Components: FileSystem, Streaming Connectors >Affects Versions: 1.7.0, 1.7.1 >Reporter: Addison Higham >Assignee: Addison Higham >Priority: Major > Fix For: 1.7.2 > > > When using the StreamingFileSink with S3A backend, occasionally, errors like > this will occur: > {noformat} > Caused by: > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: > Your socket connection to the server was not read from or written to within > the timeout period. Idle connections will be closed. (Service: Amazon S3; > Status Code: 400; Error Code: RequestTimeout; Request ID: xxx; S3 Extended > Request ID: xxx, S3 Extended Request ID: xxx > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056){noformat} > This causes a restart of flink job, which is often able to recover from, but > under heavy load, this can become very frequent. > > Turning on debug logs you can find the following relevant stack trace: > {noformat} > 2018-12-17 05:55:46,546 DEBUG > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient - FYI: > failed to reset content inputstream before throwing up > java.io.IOException: Resetting to invalid mark > at java.io.BufferedInputStream.reset(BufferedInputStream.java:448) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) > at > org.apache.flink.fs.shaded.h
[jira] [Created] (FLINK-11187) StreamingFileSink with S3 backend transient socket timeout issues
Addison Higham created FLINK-11187: -- Summary: StreamingFileSink with S3 backend transient socket timeout issues Key: FLINK-11187 URL: https://issues.apache.org/jira/browse/FLINK-11187 Project: Flink Issue Type: Bug Components: FileSystem, Streaming Connectors Affects Versions: 1.7.0, 1.7.1 Reporter: Addison Higham Assignee: Addison Higham Fix For: 1.7.2 When using the StreamingFileSink with S3A backend, occasionally, errors like this will occur: {noformat} Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed. (Service: Amazon S3; Status Code: 400; Error Code: RequestTimeout; Request ID: xxx; S3 Extended Request ID: xxx, S3 Extended Request ID: xxx at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056){noformat} This causes a restart of flink job, which is often able to recover from, but under heavy load, this can become very frequent. Turning on debug logs you can find the following relevant stack trace: {noformat} 2018-12-17 05:55:46,546 DEBUG org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient - FYI: failed to reset content inputstream before throwing up java.io.IOException: Resetting to invalid mark at java.io.BufferedInputStream.reset(BufferedInputStream.java:448) at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106) at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) at org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168) at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471) at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(Ha
[GitHub] tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r242281357 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: Yes. We want the ability to execute a job remotely regardless of the type of environment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-11171) Unexpected timestamp deserialization failure in RocksDB state backend
[ https://issues.apache.org/jira/browse/FLINK-11171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723279#comment-16723279 ] Sayat Satybaldiyev edited comment on FLINK-11171 at 12/17/18 7:11 PM: -- [~yunta] agree, the exception in job manager doesn't correlate with the serialization exception. I assume that due to checkpoint error, JM eventually deletes the blocks from HDFS and hence we got an error. Data nodes have enough disk space. was (Author: sayatez): [~yunta] agrees, the exception in job manager doesn't correlate with the serialization exception. I assume that due to checkpoint error, JM eventually deletes the blocks from HDFS and hence we got an error. Data nodes have enough disk space. > Unexpected timestamp deserialization failure in RocksDB state backend > - > > Key: FLINK-11171 > URL: https://issues.apache.org/jira/browse/FLINK-11171 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 >Reporter: Sayat Satybaldiyev >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > We have a job that joins two data stream via Process function and using > ValueState TTL with RocksDB backends. The jobs constantly fail to checkpoint > due to timestamp serialization error. > TTL state config > {code:java} > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.hours(recommendationRetentionHr)) > .neverReturnExpired() > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > .cleanupFullSnapshot() > .build(); > {code} > > Error > > {code:java} > 2018-12-16 06:02:12,609 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 31 @ 1544940132568 for job 7825029dc256542aa312c0b68ecf0631. > 2018-12-16 06:22:12,609 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 31 > of job 7825029dc256542aa312c0b68ecf0631 expired before completing. > 2018-12-16 06:22:12,637 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 32 @ 1544941332609 for job 7825029dc256542aa312c0b68ecf0631. > 2018-12-16 06:22:12,899 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline > checkpoint 32 by task 176c8b3c3ff190d183415ab77b89344c of job > 7825029dc256542aa312c0b68ecf0631. > 2018-12-16 06:22:12,900 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding > checkpoint 32 of job 7825029dc256542aa312c0b68ecf0631. > java.lang.Exception: Could not materialize checkpoint 32 for operator > joined-stream (1/6). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp > deserialization failure > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > ... 5 more > Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp > deserialization failure > at > org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:94) > at > org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:79) > at > org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.filterOrTransform(RocksTransformingIteratorWrapper.java:70) > at > org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.seekToFirst(RocksTransformingIteratorWrapper.java:48) > at > org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.bu
[jira] [Commented] (FLINK-11171) Unexpected timestamp deserialization failure in RocksDB state backend
[ https://issues.apache.org/jira/browse/FLINK-11171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723279#comment-16723279 ] Sayat Satybaldiyev commented on FLINK-11171: [~yunta] agrees, the exception in job manager doesn't correlate with the serialization exception. I assume that due to checkpoint error, JM eventually deletes the blocks from HDFS and hence we got an error. Data nodes have enough disk space. > Unexpected timestamp deserialization failure in RocksDB state backend > - > > Key: FLINK-11171 > URL: https://issues.apache.org/jira/browse/FLINK-11171 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 >Reporter: Sayat Satybaldiyev >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > We have a job that joins two data stream via Process function and using > ValueState TTL with RocksDB backends. The jobs constantly fail to checkpoint > due to timestamp serialization error. > TTL state config > {code:java} > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.hours(recommendationRetentionHr)) > .neverReturnExpired() > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > .cleanupFullSnapshot() > .build(); > {code} > > Error > > {code:java} > 2018-12-16 06:02:12,609 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 31 @ 1544940132568 for job 7825029dc256542aa312c0b68ecf0631. > 2018-12-16 06:22:12,609 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 31 > of job 7825029dc256542aa312c0b68ecf0631 expired before completing. > 2018-12-16 06:22:12,637 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 32 @ 1544941332609 for job 7825029dc256542aa312c0b68ecf0631. > 2018-12-16 06:22:12,899 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline > checkpoint 32 by task 176c8b3c3ff190d183415ab77b89344c of job > 7825029dc256542aa312c0b68ecf0631. > 2018-12-16 06:22:12,900 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding > checkpoint 32 of job 7825029dc256542aa312c0b68ecf0631. > java.lang.Exception: Could not materialize checkpoint 32 for operator > joined-stream (1/6). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp > deserialization failure > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > ... 5 more > Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp > deserialization failure > at > org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:94) > at > org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:79) > at > org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.filterOrTransform(RocksTransformingIteratorWrapper.java:70) > at > org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.seekToFirst(RocksTransformingIteratorWrapper.java:48) > at > org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.buildIteratorHeap(RocksStatesPerKeyGroupMergeIterator.java:128) > at > org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.(RocksStatesPerKeyGroupMergeIterator.java:68) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable
[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r242276315 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: Do we still need this new `static` method now that we have the constructor? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on issue #7258: [FLINK-11084]Throw a hard exception to remind developers while there's no stream node between two split transformation
walterddr commented on issue #7258: [FLINK-11084]Throw a hard exception to remind developers while there's no stream node between two split transformation URL: https://github.com/apache/flink/pull/7258#issuecomment-447930223 Thanks for the PR @Clark . based on the JIRA discussion, should we also put a `@deprecate` tagging on the `.split` method as well? I fully agree consecutive split are buggy features within proper safeguarding and also can be done much more elegantly using side output. Any other concerns that we want to keep the `splitTransform` feature? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on issue #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible
walterddr commented on issue #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible URL: https://github.com/apache/flink/pull/7286#issuecomment-447920610 @dianfu thanks for the explanation. I can see that #7253 has been merged. would you please update and rebase the test cases? :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11158) Do not show empty page for unknown jobs
[ https://issues.apache.org/jira/browse/FLINK-11158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723165#comment-16723165 ] Nico Kruber commented on FLINK-11158: - Thanks for the update. I guess, it would be nice to have it visible and probably also not too much to change then. > Do not show empty page for unknown jobs > --- > > Key: FLINK-11158 > URL: https://issues.apache.org/jira/browse/FLINK-11158 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Nico Kruber >Assignee: leesf >Priority: Major > > If you try to access the Web UI using an old/non-existing job ID, e.g. after > a cluster restart, you are currently presented with a white page with no > further info. It should at least contain "job not found" message to indicate > that there was no other error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] klion26 commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
klion26 commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r241639512 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.function.ThrowingRunnable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.flink.runtime.concurrent.Executors.directExecutorService; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +class RocksDbStateDataTransfer { + + static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws InterruptedException, IOException { + + final ExecutorService executorService = createExecutorService(restoringThreadNum); + + try { + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + List> futures = new ArrayList<>(runnables.size()); + for (Runnable runnable : runnables) { + futures.add(CompletableFuture.runAsync(runnable, executorService)); + } + + FutureUtils.waitForAll(futures).get(); + } catch (ExecutionException e) { + final Throwable throwable = ExceptionUtils.stripExecutionException(e); + if (throwable instanceof IOException) { + throw (IOException) throwable; Review comment: I followed [AbstractTaskManagerFileHandle](https://github.com/apache/flink/blob/951ae9a813cbf8a2db95519853f940277e4ea088/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java#L127) and [JobClient](https://github.com/apache/flink/blob/951ae9a813cbf8a2db95519853f940277e4ea088/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java#L445) to throw `IOException` after `ExceptionUtils.stripExecutionException(e)` here, please correct me if have to change the behavior. This is an automated message from the
[GitHub] klion26 commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
klion26 commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r241949427 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.function.ThrowingRunnable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.flink.runtime.concurrent.Executors.directExecutorService; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +class RocksDbStateDataTransfer { + + static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws InterruptedException, IOException { + + final ExecutorService executorService = createExecutorService(restoringThreadNum); + + try { + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + List> futures = new ArrayList<>(runnables.size()); + for (Runnable runnable : runnables) { + futures.add(CompletableFuture.runAsync(runnable, executorService)); + } + + FutureUtils.waitForAll(futures).get(); + } catch (ExecutionException e) { + final Throwable throwable = ExceptionUtils.stripExecutionException(e); + if (throwable instanceof IOException) { + throw (IOException) throwable; Review comment: Thank you for the explanation @azagrebin. I found `throw ExceptionUtils.stripExecutionException(e)` will throw `Throwable`, so we should add `throws Throwable` in the method signature. How about use the blow code ``` try { ... } catch (ExecutionException e) { final Throwable throwable = ExceptionUtils.stripExecutionException(e); ExceptionUtils.rethrow(throwable); } finally { executorService.shutdownNow(); } ``` Because I found the `ExecutionException` will be thrown when executing the runnalbes, and the runnables were wrapped by `ThrowingRunnable.unchecked`, `ThrowingRunnable.unchecked` will
[jira] [Comment Edited] (FLINK-11103) Set a default uncaught exception handler
[ https://issues.apache.org/jira/browse/FLINK-11103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723157#comment-16723157 ] Nico Kruber edited comment on FLINK-11103 at 12/17/18 4:54 PM: --- Maybe I did not look into FLINK-5232 thoroughly enough: I wasn't referring to any akka actor system, but instead any thread in Flink including those which can be spawned by users. Do you think, your commit also covers this? was (Author: nicok): Maybe I did not look into FLINK-5232 thoroughly enough: I wasn't referring to any akka actor system, but instead any thread in Flink including those which can be spawned by users. > Set a default uncaught exception handler > > > Key: FLINK-11103 > URL: https://issues.apache.org/jira/browse/FLINK-11103 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.8.0 >Reporter: Nico Kruber >Assignee: vinoyang >Priority: Major > > We should set a default uncaught exception handler in Flink via > {{Thread.setDefaultUncaughtExceptionHandler()}} which at least logs the > exceptions. Ideally, we would even fail the job (could make this > configurable) but users may have some ill-behaving threads, e.g. through > libraries, which they would want to tolerate and we don't want to change > behaviour now. > (FLINK-5232 added this for the JobManager, we need it for the TaskManager) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11103) Set a default uncaught exception handler
[ https://issues.apache.org/jira/browse/FLINK-11103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723157#comment-16723157 ] Nico Kruber commented on FLINK-11103: - Maybe I did not look into FLINK-5232 thoroughly enough: I wasn't referring to any akka actor system, but instead any thread in Flink including those which can be spawned by users. > Set a default uncaught exception handler > > > Key: FLINK-11103 > URL: https://issues.apache.org/jira/browse/FLINK-11103 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.8.0 >Reporter: Nico Kruber >Assignee: vinoyang >Priority: Major > > We should set a default uncaught exception handler in Flink via > {{Thread.setDefaultUncaughtExceptionHandler()}} which at least logs the > exceptions. Ideally, we would even fail the job (could make this > configurable) but users may have some ill-behaving threads, e.g. through > libraries, which they would want to tolerate and we don't want to change > behaviour now. > (FLINK-5232 added this for the JobManager, we need it for the TaskManager) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9680) Reduce heartbeat timeout for E2E tests
[ https://issues.apache.org/jira/browse/FLINK-9680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9680: --- Fix Version/s: (was: 1.6.3) 1.6.4 > Reduce heartbeat timeout for E2E tests > -- > > Key: FLINK-9680 > URL: https://issues.apache.org/jira/browse/FLINK-9680 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0 > > > Several end-to-end tests shoot down job- and taskmanagers and wait for them > to come back up before continuing the testing process. > {{heartbeat.timeout}} controls how long a container has to be unreachable to > be considered lost. The default for this option is 50 seconds, causing > significant idle times during the tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11171) Unexpected timestamp deserialization failure in RocksDB state backend
[ https://issues.apache.org/jira/browse/FLINK-11171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Zagrebin reassigned FLINK-11171: --- Assignee: Andrey Zagrebin > Unexpected timestamp deserialization failure in RocksDB state backend > - > > Key: FLINK-11171 > URL: https://issues.apache.org/jira/browse/FLINK-11171 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 >Reporter: Sayat Satybaldiyev >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > We have a job that joins two data stream via Process function and using > ValueState TTL with RocksDB backends. The jobs constantly fail to checkpoint > due to timestamp serialization error. > TTL state config > {code:java} > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.hours(recommendationRetentionHr)) > .neverReturnExpired() > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > .cleanupFullSnapshot() > .build(); > {code} > > Error > > {code:java} > 2018-12-16 06:02:12,609 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 31 @ 1544940132568 for job 7825029dc256542aa312c0b68ecf0631. > 2018-12-16 06:22:12,609 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 31 > of job 7825029dc256542aa312c0b68ecf0631 expired before completing. > 2018-12-16 06:22:12,637 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 32 @ 1544941332609 for job 7825029dc256542aa312c0b68ecf0631. > 2018-12-16 06:22:12,899 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline > checkpoint 32 by task 176c8b3c3ff190d183415ab77b89344c of job > 7825029dc256542aa312c0b68ecf0631. > 2018-12-16 06:22:12,900 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding > checkpoint 32 of job 7825029dc256542aa312c0b68ecf0631. > java.lang.Exception: Could not materialize checkpoint 32 for operator > joined-stream (1/6). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp > deserialization failure > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > ... 5 more > Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp > deserialization failure > at > org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:94) > at > org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:79) > at > org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.filterOrTransform(RocksTransformingIteratorWrapper.java:70) > at > org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.seekToFirst(RocksTransformingIteratorWrapper.java:48) > at > org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.buildIteratorHeap(RocksStatesPerKeyGroupMergeIterator.java:128) > at > org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.(RocksStatesPerKeyGroupMergeIterator.java:68) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:312) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258) > at > org.apache.flink.contrib.st
[GitHub] azagrebin opened a new pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
azagrebin opened a new pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320 ## What is the purpose of the change StateSnapshotTransformer objects are now created when state is initialised. Afterwards, they are called in asynchronous part of checkpoint. However, they can be non-thread safe if accessed from multiple checkpoint operations. This PR changes backends to keep transformer factory after state init and create transformer right before iterating the state entries in the same thread. ## Brief change log Change backends to keep transformer factory and create thread-confined transformer. ## Verifying this change Unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11171) Unexpected timestamp deserialization failure in RocksDB state backend
[ https://issues.apache.org/jira/browse/FLINK-11171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11171: --- Labels: pull-request-available (was: ) > Unexpected timestamp deserialization failure in RocksDB state backend > - > > Key: FLINK-11171 > URL: https://issues.apache.org/jira/browse/FLINK-11171 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 >Reporter: Sayat Satybaldiyev >Priority: Major > Labels: pull-request-available > > We have a job that joins two data stream via Process function and using > ValueState TTL with RocksDB backends. The jobs constantly fail to checkpoint > due to timestamp serialization error. > TTL state config > {code:java} > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.hours(recommendationRetentionHr)) > .neverReturnExpired() > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > .cleanupFullSnapshot() > .build(); > {code} > > Error > > {code:java} > 2018-12-16 06:02:12,609 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 31 @ 1544940132568 for job 7825029dc256542aa312c0b68ecf0631. > 2018-12-16 06:22:12,609 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 31 > of job 7825029dc256542aa312c0b68ecf0631 expired before completing. > 2018-12-16 06:22:12,637 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 32 @ 1544941332609 for job 7825029dc256542aa312c0b68ecf0631. > 2018-12-16 06:22:12,899 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline > checkpoint 32 by task 176c8b3c3ff190d183415ab77b89344c of job > 7825029dc256542aa312c0b68ecf0631. > 2018-12-16 06:22:12,900 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding > checkpoint 32 of job 7825029dc256542aa312c0b68ecf0631. > java.lang.Exception: Could not materialize checkpoint 32 for operator > joined-stream (1/6). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp > deserialization failure > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > ... 5 more > Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp > deserialization failure > at > org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:94) > at > org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:79) > at > org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.filterOrTransform(RocksTransformingIteratorWrapper.java:70) > at > org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.seekToFirst(RocksTransformingIteratorWrapper.java:48) > at > org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.buildIteratorHeap(RocksStatesPerKeyGroupMergeIterator.java:128) > at > org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.(RocksStatesPerKeyGroupMergeIterator.java:68) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:312) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callI
[jira] [Updated] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once
[ https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-10292: Fix Version/s: (was: 1.6.3) 1.6.4 > Generate JobGraph in StandaloneJobClusterEntrypoint only once > - > > Key: FLINK-10292 > URL: https://issues.apache.org/jira/browse/FLINK-10292 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.6.4, 1.8.0 > > > Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} > from the given user code every time it starts/is restarted. This can be > problematic if the the {{JobGraph}} generation has side effects. Therefore, > it would be better to generate the {{JobGraph}} only once and store it in HA > storage instead from where to retrieve. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10251) Handle oversized response messages in AkkaRpcActor
[ https://issues.apache.org/jira/browse/FLINK-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-10251: Fix Version/s: (was: 1.6.3) 1.6.4 > Handle oversized response messages in AkkaRpcActor > -- > > Key: FLINK-10251 > URL: https://issues.apache.org/jira/browse/FLINK-10251 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.6.4, 1.7.2, 1.8.0 > > > The {{AkkaRpcActor}} should check whether an RPC response which is sent to a > remote sender does not exceed the maximum framesize of the underlying > {{ActorSystem}}. If this is the case we should fail fast instead. We can > achieve this by serializing the response and sending the serialized byte > array. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10721) kafkaFetcher runFetchLoop throw exception will cause follow-up code not execute in FlinkKafkaConsumerBase run method
[ https://issues.apache.org/jira/browse/FLINK-10721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-10721: Fix Version/s: (was: 1.6.3) 1.6.4 > kafkaFetcher runFetchLoop throw exception will cause follow-up code not > execute in FlinkKafkaConsumerBase run method > - > > Key: FLINK-10721 > URL: https://issues.apache.org/jira/browse/FLINK-10721 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.6.2 >Reporter: zhaoshijie >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.4, 1.7.2, 1.8.0 > > > In FlinkKafkaConsumerBase run method on line 721(master branch), if > kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw > exception then finally execute cancel method, cancel method will execute > kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute > handover.close, then result in handover.pollNext throw ClosedException),then > next code will not execute,especially discoveryLoopError not be throwed,so, > real culprit exception will be Swallowed. > failed log like this: > {code:java} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > at > org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695) > at java.lang.Thread.run(Thread.java:745) > {code} > Shoud we modify it as follows? > {code:java} > try { > kafkaFetcher.runFetchLoop(); > } catch (Exception e) { > // if discoveryLoopErrorRef not null ,we should > throw real culprit exception > if (discoveryLoopErrorRef.get() != null){ > throw new > RuntimeException(discoveryLoopErrorRef.get()); > } else { > throw e; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9812) SpanningRecordSerializationTest fails on travis
[ https://issues.apache.org/jira/browse/FLINK-9812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9812: --- Fix Version/s: (was: 1.6.3) 1.6.4 > SpanningRecordSerializationTest fails on travis > --- > > Key: FLINK-9812 > URL: https://issues.apache.org/jira/browse/FLINK-9812 > Project: Flink > Issue Type: Bug > Components: Network, Tests, Type Serialization System >Affects Versions: 1.6.0 >Reporter: Chesnay Schepler >Assignee: Nico Kruber >Priority: Critical > Fix For: 1.6.4, 1.7.2, 1.8.0 > > > https://travis-ci.org/zentol/flink/jobs/402744191 > {code} > testHandleMixedLargeRecords(org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest) > Time elapsed: 6.113 sec <<< ERROR! > java.nio.channels.ClosedChannelException: null > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:199) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$SpanningWrapper.addNextChunkFromMemorySegment(SpillingAdaptiveSpanningRecordDeserializer.java:529) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$SpanningWrapper.access$200(SpillingAdaptiveSpanningRecordDeserializer.java:431) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.setNextBuffer(SpillingAdaptiveSpanningRecordDeserializer.java:76) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest.testSerializationRoundTrip(SpanningRecordSerializationTest.java:149) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest.testSerializationRoundTrip(SpanningRecordSerializationTest.java:115) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest.testHandleMixedLargeRecords(SpanningRecordSerializationTest.java:104) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11172) Remove the max retention time in StreamQueryConfig
[ https://issues.apache.org/jira/browse/FLINK-11172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723115#comment-16723115 ] Fabian Hueske commented on FLINK-11172: --- Hi, time-based operations (GROUP BY windows, OVER windows, time-windowed join, etc.) are inherently bound by time and automatically clean up their state. We should not add state cleanup or TTL for these operators. Besides that, I agree to put this simplification on hold until we can use TTL for clean up. > Remove the max retention time in StreamQueryConfig > -- > > Key: FLINK-11172 > URL: https://issues.apache.org/jira/browse/FLINK-11172 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.8.0 >Reporter: Yangze Guo >Assignee: Yangze Guo >Priority: Major > > [Stream Query > Config|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html] > is an important and useful feature to make a tradeoff between accuracy and > resource consumption when some query executed in unbounded streaming data. > This feature first proposed in > [FLINK-6491|https://issues.apache.org/jira/browse/FLINK-6491]. > At the first, *QueryConfig* take two parameters, i.e. > minIdleStateRetentionTime and maxIdleStateRetentionTime, to avoid to register > many timers if we have more freedom when to discard state. However, this > approach may cause new data expired earlier than old data and thus greater > accuracy loss appeared in some case. For example, we have an unbounded keyed > streaming data. We process key *_a_* in _*t0*_ and _*b*_ in _*t1,*_ *_t0 < > t1_*. *_a_* will expired in _*a+maxIdleStateRetentionTime*_ while _*b*_ > expired in *_b+maxIdleStateRetentionTime_*. Now, another data with key *_a_* > arrived in _*t2 (t1 < t2)*_. But _*t2+minIdleStateRetentionTime*_ < > _*a+maxIdleStateRetentionTime*_. The state of key *_a_* will still be expired > in _*a+maxIdleStateRetentionTime*_ which is early than the state of key > _*b*_. According to the guideline of > [LRU|https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_recently_used_(LRU)] > that the element has been most heavily used in the past few instructions are > most likely to be used heavily in the next few instructions too. The state > with key _*a*_ should live longer than the state with key _*b*_. Current > approach against this idea. > I think we now have a good chance to remove the maxIdleStateRetentionTime > argument in *StreamQueryConfig.* Below are my reasons. > * [FLINK-9423|https://issues.apache.org/jira/browse/FLINK-9423] implement > efficient deletes for heap-based timer service. We can leverage the deletion > op to mitigate the abuse of timer registration. > * Current approach can cause new data expired earlier than old data and thus > greater accuracy loss appeared in some case. Users need to fine-tune these > two parameter to avoid this scenario. Directly following the idea of LRU > looks like a better solution. > So, I plan to remove maxIdleStateRetentionTime, update the expire time only > depends on _*minIdleStateRetentionTime.*_ > cc to [~sunjincheng121], [~fhueske] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9485) Improving Flink’s timer management for large state
[ https://issues.apache.org/jira/browse/FLINK-9485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9485: --- Fix Version/s: (was: 1.6.4) > Improving Flink’s timer management for large state > -- > > Key: FLINK-9485 > URL: https://issues.apache.org/jira/browse/FLINK-9485 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Streaming >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > See > https://docs.google.com/document/d/1XbhJRbig5c5Ftd77d0mKND1bePyTC26Pz04EvxdA7Jc/edit#heading=h.17v0k3363r6q -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10079) Support external sink table in INSERT INTO statement
[ https://issues.apache.org/jira/browse/FLINK-10079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-10079: Fix Version/s: (was: 1.6.3) 1.6.4 > Support external sink table in INSERT INTO statement > > > Key: FLINK-10079 > URL: https://issues.apache.org/jira/browse/FLINK-10079 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Major > Fix For: 1.6.4, 1.8.0 > > > In the documentation, there is a description: > {quote}Once registered in a TableEnvironment, all tables defined in a > ExternalCatalog can be accessed from Table API or SQL queries by specifying > their full path, such as catalog.database.table. > {quote} > Currently, this is true only for source tables. For sink table (specified in > the Table API or SQL), the users have to explicitly register it even though > it is defined in a registered ExternalCatalog, otherwise "No table was > registered under the name XXX" TableException would be thrown. > It would be better keep consistent between source table and sink table, and > the users would enjoy more convenient approach to inserting into sink tables. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9469) Add tests that cover PatternStream#flatSelect
[ https://issues.apache.org/jira/browse/FLINK-9469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9469: --- Fix Version/s: (was: 1.6.3) 1.6.4 > Add tests that cover PatternStream#flatSelect > - > > Key: FLINK-9469 > URL: https://issues.apache.org/jira/browse/FLINK-9469 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Fix For: 1.6.4, 1.7.2, 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7991) Cleanup kafka example jar filters
[ https://issues.apache.org/jira/browse/FLINK-7991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-7991: --- Fix Version/s: (was: 1.6.3) 1.6.4 > Cleanup kafka example jar filters > - > > Key: FLINK-7991 > URL: https://issues.apache.org/jira/browse/FLINK-7991 > Project: Flink > Issue Type: Improvement > Components: Examples >Affects Versions: 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.6.4, 1.8.0 > > > The kafka example jar shading configuration includes the following files: > {code} > org/apache/flink/streaming/examples/kafka/** > org/apache/flink/streaming/** > org/apache/kafka/** > org/apache/curator/** > org/apache/zookeeper/** > org/apache/jute/** > org/I0Itec/** > jline/** > com/yammer/** > kafka/** > {code} > Problems: > * the inclusion of org.apache.flink.streaming causes large parts of the API > (and runtime...) to be included in the jar, along the _Twitter_ source and > *all* other examples > * the yammer, jline, l0ltec, jute, zookeeper and curator inclusions are > ineffective; none of these classes show up in the example jar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11151) FileUploadHandler stops working if the upload directory is removed
[ https://issues.apache.org/jira/browse/FLINK-11151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-11151: Fix Version/s: (was: 1.6.3) 1.6.4 > FileUploadHandler stops working if the upload directory is removed > -- > > Key: FLINK-11151 > URL: https://issues.apache.org/jira/browse/FLINK-11151 > Project: Flink > Issue Type: Bug > Components: Job-Submission, REST >Affects Versions: 1.6.2, 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.4, 1.7.2, 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > A user has reported on the ML that the FileUploadHandler does not accept any > files anymore if the upload directory was deleted after the cluster has been > started. > A cursory glance at the code shows that it currently uses > {{Files.createDirectory(...)}} to create a temporary directory for the > current request to store uploaded files in. > Changing this to use {{Files.createDirectories(...)}} instead should prevent > this from happening again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11023) Update LICENSE and NOTICE files for flink-connectors
[ https://issues.apache.org/jira/browse/FLINK-11023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-11023: Fix Version/s: (was: 1.6.3) 1.6.4 > Update LICENSE and NOTICE files for flink-connectors > > > Key: FLINK-11023 > URL: https://issues.apache.org/jira/browse/FLINK-11023 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.4, 1.7.2 > > > Similar to FLINK-10987 we should also update the {{LICENSE}} and {{NOTICE}} > files for {{flink-connectors}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9010) NoResourceAvailableException with FLIP-6
[ https://issues.apache.org/jira/browse/FLINK-9010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9010: --- Fix Version/s: (was: 1.6.3) 1.6.4 > NoResourceAvailableException with FLIP-6 > - > > Key: FLINK-9010 > URL: https://issues.apache.org/jira/browse/FLINK-9010 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: flip-6 > Fix For: 1.6.4, 1.7.2, 1.8.0 > > > I was trying to run a bigger program with 400 slots (100 TMs, 2 slots each) > with FLIP-6 mode and a checkpointing interval of 1000 and got the following > exception: > {code} > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1521038088305_0257_01_000101 - Remaining pending container > requests: 302 > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TaskExecutor container_1521038088305_0257_01_000101 will be > started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory > limit 3072 MB > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab path obtained null > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab principal obtained null > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote yarn conf path obtained null > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote krb5 path obtained null > 2018-03-16 03:41:20,155 INFO org.apache.flink.yarn.Utils > - Copying from > file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml > to > hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml > 2018-03-16 03:41:20,165 INFO org.apache.flink.yarn.YarnResourceManager > - Prepared local resource for modified yaml: resource { scheme: > "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: > "/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml" > } size: 595 timestamp: 1521171680164 type: FILE visibility: APPLICATION > 2018-03-16 03:41:20,168 INFO org.apache.flink.yarn.YarnResourceManager > - Creating container launch context for TaskManagers > 2018-03-16 03:41:20,168 INFO org.apache.flink.yarn.YarnResourceManager > - Starting TaskManagers with command: $JAVA_HOME/bin/java > -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m > -Dlog.file=/taskmanager.log > -Dlogback.configurationFile=file:./logback.xml > -Dlog4j.configuration=file:./log4j.properties > org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> > /taskmanager.out 2> /taskmanager.err > 2018-03-16 03:41:20,176 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Opening proxy : ip-172-31-3-221.eu-west-1.compute.internal:8041 > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1521038088305_0257_01_000102 - Remaining pending container > requests: 301 > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TaskExecutor container_1521038088305_0257_01_000102 will be > started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory > limit 3072 MB > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab path obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab principal obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote yarn conf path obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote krb5 path obtained null > 2018-03-16 03:41:20,181 INFO org.apache.flink.yarn.Utils > - Copying from > file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml > to > hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink
[jira] [Updated] (FLINK-9253) Make buffer count per InputGate always #channels*buffersPerChannel + ExclusiveBuffers
[ https://issues.apache.org/jira/browse/FLINK-9253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9253: --- Fix Version/s: (was: 1.6.3) 1.6.4 > Make buffer count per InputGate always #channels*buffersPerChannel + > ExclusiveBuffers > - > > Key: FLINK-9253 > URL: https://issues.apache.org/jira/browse/FLINK-9253 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Fix For: 1.6.4, 1.7.2, 1.8.0 > > > The credit-based flow control path assigns exclusive buffers only to remote > channels (which makes sense since local channels don't use any own buffers). > However, this is a bit intransparent with respect to how much data may be in > buffers since this depends on the actual schedule of the job and not the job > graph. > By adapting the floating buffers to use a maximum of > {{#channels*buffersPerChannel + floatingBuffersPerGate - #exclusiveBuffers}}, > we would be channel-type agnostic and keep the old behaviour. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9228) log details about task fail/task manager is shutting down
[ https://issues.apache.org/jira/browse/FLINK-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9228: --- Fix Version/s: (was: 1.6.3) 1.6.4 > log details about task fail/task manager is shutting down > - > > Key: FLINK-9228 > URL: https://issues.apache.org/jira/browse/FLINK-9228 > Project: Flink > Issue Type: Improvement > Components: Logging >Affects Versions: 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.6.4, 1.7.2, 1.8.0 > > > condition: > flink version:1.4.2 > jdk version:1.8.0.20 > linux version:3.10.0 > problem description: > one of my task manager is out of the cluster and I checked its log found > something below: > 2018-04-19 22:34:47,441 INFO org.apache.flink.runtime.taskmanager.Task > > - Attempting to fail task externally Process (115/120) > (19d0b0ce1ef3b8023b37bdfda643ef44). > 2018-04-19 22:34:47,441 INFO org.apache.flink.runtime.taskmanager.Task > > - Process (115/120) (19d0b0ce1ef3b8023b37bdfda643ef44) switched from RUNNING > to FAILED. > java.lang.Exception: TaskManager is shutting down. > at > org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala:220) > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager.scala:121) > > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) > > at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:374) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > suggestion: > # short term suggestion: > ## log reasons why task tail?maybe received some event from job > manager/can't connect to job manager? operator exception? the more claritify > the better > ## log reasons why task manager is shutting down? received some event from > job manager/can't connect to job manager? operator exception can't be > recovery? > # long term suggestion: > ## define the state machine of flink node clearly. if nothing happens, the > node should stay what it used to be, which means if it is processing events, > if nothing happens, it should still processing events.or in other words, if > its state changes from processing event to cancel, then event happens. > ## define the events which can cause node state changed clearly. like use > cancel, operator exception, heart beat timeout etc > ## log the state change and event which cause state chaged clearly in logs > ## show event details(time, node, event, state changed etc) in webui -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11079) Update LICENSE and NOTICE files for flnk-storm-examples
[ https://issues.apache.org/jira/browse/FLINK-11079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-11079: Fix Version/s: (was: 1.6.3) 1.6.4 > Update LICENSE and NOTICE files for flnk-storm-examples > --- > > Key: FLINK-11079 > URL: https://issues.apache.org/jira/browse/FLINK-11079 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0 > > > Similar to FLINK-10987 we should also update the {{LICENSE}} and {{NOTICE}} > for {{flink-storm-examples}}. > > This project creates several fat example jars that are deployed to maven > central. > Alternatively we could about dropping these examples. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11107) [state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured
[ https://issues.apache.org/jira/browse/FLINK-11107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-11107: Fix Version/s: (was: 1.6.3) 1.6.4 > [state] Avoid memory stateBackend to create arbitrary folders under HA path > when no checkpoint path configured > -- > > Key: FLINK-11107 > URL: https://issues.apache.org/jira/browse/FLINK-11107 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.6.2, 1.7.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.6.4, 1.7.2 > > > Currently, memory state-backend would create a folder named with random UUID > under HA directory if no checkpoint path ever configured. (the code logic > locates within {{StateBackendLoader#fromApplicationOrConfigOrDefault}}) > However, the default memory state-backend would not only be created on JM > side, but also on each task manager's side, which means many folders with > random UUID would be created under HA directory. It would result in exception > like: > {noformat} > The directory item limit of /tmp/flink/ha is exceeded: limit=1048576 > items=1048576{noformat} > If this happens, no new jobs could be submitted only if we clean up those > directories manually. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10954) Hardlink from files of previous local stored state might cross devices
[ https://issues.apache.org/jira/browse/FLINK-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-10954: Fix Version/s: (was: 1.6.3) 1.6.4 > Hardlink from files of previous local stored state might cross devices > -- > > Key: FLINK-10954 > URL: https://issues.apache.org/jira/browse/FLINK-10954 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.6.2 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.6.4, 1.7.2, 1.8.0 > > > Currently, local recovery's base directories is initialized from > '{{io.tmp.dirs}}' if parameter '{{taskmanager.state.local.root-dirs}}' is not > set. For Yarn environment, the tmp dirs is replaced by its '{{LOCAL_DIRS}}', > which might consist of directories from different devices, such as > /dump/1/nm-local-dir, /dump/2/nm-local-dir. The local directory for RocksDB > is initialized from IOManager's spillingDirectories, which might located in > different device from local recovery's folder. However, hard-link between > different devices is not allowed, it will throw exception below: > {code:java} > java.nio.file.FileSystemException: target -> souce: Invalid cross-device link > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11186) Support for event-time balancing for multiple Kafka partitions
[ https://issues.apache.org/jira/browse/FLINK-11186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Schamberger updated FLINK-11186: Summary: Support for event-time balancing for multiple Kafka partitions (was: Support for event-time balancing for multiple Kafka comsumer partitions) > Support for event-time balancing for multiple Kafka partitions > -- > > Key: FLINK-11186 > URL: https://issues.apache.org/jira/browse/FLINK-11186 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Kafka Connector >Reporter: Tom Schamberger >Priority: Minor > > Currently, it is not possible with Flink to back-pressure individual Kafka > partitions, which are faster in terms of event-time. This leads to > unnecessary memory consumption and can lead to deadlocks in the case of > back-pressure. > When multiple Kafka topics are consumed, succeeding event-time window > operators have to wait until the last Kafka partition has produced a > sufficient watermark to be triggered. If individual Kafka partitions differ > in read performance or the event-time of messages within partitions is not > monotonically distributed, this can lead to a situation, where 'fast' > partitions (event-time makes fast progress) outperform slower partitions > until back-pressuring prevents all partitions from being further consumed. > This leads to a deadlock of the application. > I suggest, that windows should be able to back-pressure individual > partitions, which progress faster in terms of event-time, so that slow > partitions can keep up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10435) Client sporadically hangs after Ctrl + C
[ https://issues.apache.org/jira/browse/FLINK-10435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-10435: Fix Version/s: (was: 1.6.3) 1.6.4 > Client sporadically hangs after Ctrl + C > > > Key: FLINK-10435 > URL: https://issues.apache.org/jira/browse/FLINK-10435 > Project: Flink > Issue Type: Bug > Components: Client, YARN >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Gary Yao >Priority: Major > Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0 > > > When submitting a YARN job cluster in attached mode, the client hangs > indefinitely if Ctrl + C is pressed at the right time. One can recover from > this by sending SIGKILL. > *Command to submit job* > {code} > HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster > examples/streaming/WordCount.jar > {code} > > *Output/Stacktrace* > {code} > [hadoop@ip-172-31-45-22 flink-1.5.4]$ HADOOP_CLASSPATH=`hadoop classpath` > bin/flink run -m yarn-cluster examples/streaming/WordCount.jar > Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set. > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/home/hadoop/flink-1.5.4/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2018-09-26 12:01:04,241 INFO org.apache.hadoop.yarn.client.RMProxy > - Connecting to ResourceManager at > ip-172-31-45-22.eu-central-1.compute.internal/172.31.45.22:8032 > 2018-09-26 12:01:04,386 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-09-26 12:01:04,386 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-09-26 12:01:04,402 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the > HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink > YARN Client needs one of these to be set to properly load the Hadoop > configuration for accessing YARN. > 2018-09-26 12:01:04,598 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster > specification: ClusterSpecification{masterMemoryMB=1024, > taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1} > 2018-09-26 12:01:04,972 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - The > configuration directory ('/home/hadoop/flink-1.5.4/conf') contains both LOG4J > and Logback configuration files. Please delete or rename one of them. > 2018-09-26 12:01:07,857 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting > application master application_1537944258063_0017 > 2018-09-26 12:01:07,913 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted > application application_1537944258063_0017 > 2018-09-26 12:01:07,913 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for > the cluster to be allocated > 2018-09-26 12:01:07,916 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying > cluster, current state ACCEPTED > ^C2018-09-26 12:01:08,851 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cancelling > deployment from Deployment Failure Hook > 2018-09-26 12:01:08,854 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Killing YARN > application > > The program finished with the following exception: > org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't > deploy Yarn session cluster > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:410) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:258) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation
[jira] [Updated] (FLINK-9879) Find sane defaults for (advanced) SSL session parameters
[ https://issues.apache.org/jira/browse/FLINK-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9879: --- Fix Version/s: (was: 1.6.3) 1.6.4 > Find sane defaults for (advanced) SSL session parameters > > > Key: FLINK-9879 > URL: https://issues.apache.org/jira/browse/FLINK-9879 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.2, 1.6.0 >Reporter: Nico Kruber >Priority: Major > Fix For: 1.6.4 > > > After adding these configuration parameters with > https://issues.apache.org/jira/browse/FLINK-9878: > - SSL session cache size > - SSL session timeout > - SSL handshake timeout > - SSL close notify flush timeout > We should try to find sane defaults that "just work" :) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9749) Rework Bucketing Sink
[ https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9749: --- Fix Version/s: (was: 1.6.3) > Rework Bucketing Sink > - > > Key: FLINK-9749 > URL: https://issues.apache.org/jira/browse/FLINK-9749 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > > The BucketingSink has a series of deficits at the moment. > Due to the long list of issues, I would suggest to add a new > StreamingFileSink with a new and cleaner design > h3. Encoders, Parquet, ORC > - It only efficiently supports row-wise data formats (avro, jso, sequence > files. > - Efforts to add (columnar) compression for blocks of data is inefficient, > because blocks cannot span checkpoints due to persistence-on-checkpoint. > - The encoders are part of the \{{flink-connector-filesystem project}}, > rather than in orthogonal formats projects. This blows up the dependencies of > the \{{flink-connector-filesystem project}} project. As an example, the > rolling file sink has dependencies on Hadoop and Avro, which messes up > dependency management. > h3. Use of FileSystems > - The BucketingSink works only on Hadoop's FileSystem abstraction not > support Flink's own FileSystem abstraction and cannot work with the packaged > S3, maprfs, and swift file systems > - The sink hence needs Hadoop as a dependency > - The sink relies on "trying out" whether truncation works, which requires > write access to the users working directory > - The sink relies on enumerating and counting files, rather than maintaining > its own state, making less efficient > h3. Correctness and Efficiency on S3 > - The BucketingSink relies on strong consistency in the file enumeration, > hence may work incorrectly on S3. > - The BucketingSink relies on persisting streams at intermediate points. > This is not working properly on S3, hence there may be data loss on S3. > h3. .valid-length companion file > - The valid length file makes it hard for consumers of the data and should > be dropped > We track this design in a series of sub issues. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10317) Configure Metaspace size by default
[ https://issues.apache.org/jira/browse/FLINK-10317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-10317: Fix Version/s: (was: 1.6.3) 1.6.4 > Configure Metaspace size by default > --- > > Key: FLINK-10317 > URL: https://issues.apache.org/jira/browse/FLINK-10317 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0 > > > We should set the size of the JVM Metaspace to a sane default, like > {{-XX:MaxMetaspaceSize=256m}}. > If not set, the JVM offheap memory will grow indefinitely with repeated > classloading and Jitting, eventually exceeding allowed memory on docker/yarn > or similar setups. > It is hard to come up with a good default, however, I believe the error > messages one gets when metaspace is too small are easy to understand (and > easy to take action), while it is very hard to figure out why the memory > footprint keeps growing steadily and infinitely. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9776) Interrupt TaskThread only while in User/Operator code
[ https://issues.apache.org/jira/browse/FLINK-9776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9776: --- Fix Version/s: (was: 1.6.3) 1.6.4 > Interrupt TaskThread only while in User/Operator code > - > > Key: FLINK-9776 > URL: https://issues.apache.org/jira/browse/FLINK-9776 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > Fix For: 1.6.4, 1.8.0 > > > Upon cancellation, the task thread is periodically interrupted. > This helps to pull the thread out of blocking operations in the user code. > Once the thread leaves the user code, the repeated interrupts may interfere > with the shutdown cleanup logic, causing confusing exceptions. > We should stop sending the periodic interrupts once the thread leaves the > user code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10241) Reduce performance/stability impact of latency metrics
[ https://issues.apache.org/jira/browse/FLINK-10241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-10241: Fix Version/s: (was: 1.6.3) > Reduce performance/stability impact of latency metrics > -- > > Key: FLINK-10241 > URL: https://issues.apache.org/jira/browse/FLINK-10241 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > Fix For: 1.5.6, 1.6.4, 1.8.0 > > > Umbrella issue for performance/stability improvements around the latency > metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9525) Missing META-INF/services/*FileSystemFactory in flink-hadoop-fs module
[ https://issues.apache.org/jira/browse/FLINK-9525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9525: --- Fix Version/s: (was: 1.6.3) 1.6.4 > Missing META-INF/services/*FileSystemFactory in flink-hadoop-fs module > -- > > Key: FLINK-9525 > URL: https://issues.apache.org/jira/browse/FLINK-9525 > Project: Flink > Issue Type: Bug > Components: FileSystem >Affects Versions: 1.4.0, 1.5.0, 1.6.0 >Reporter: Hai Zhou >Assignee: Hai Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0 > > Attachments: wx20180605-142...@2x.png > > > if flink job dependencies includes `hadoop-common` and `hadoop-hdfs`, will > throw runtime error. > like this case: > [https://stackoverflow.com/questions/47890596/java-util-serviceconfigurationerror-org-apache-hadoop-fs-filesystem-provider-o]. > the root cause: > see {{org.apache.flink.core.fs.FileSystem}} > This class will load all available file system factories via > {{ServiceLoader.load(FileSystemFactory.class)}}. > Since {{ META-INF / services / org.apache.flink.core.fs.FileSystemFactory }} > file in the classpath does not have an > `org.apache.flink.runtime.fs.hdfs.HadoopFsFactory`, > and finaly only loaded one available {{LocalFileSystemFactory}} . > more error messages see this screenshot. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9457) Cancel container requests when cancelling pending slot allocations
[ https://issues.apache.org/jira/browse/FLINK-9457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9457: --- Fix Version/s: (was: 1.6.3) 1.6.4 > Cancel container requests when cancelling pending slot allocations > -- > > Key: FLINK-9457 > URL: https://issues.apache.org/jira/browse/FLINK-9457 > Project: Flink > Issue Type: Improvement > Components: ResourceManager, YARN >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0 > > > When cancelling a pending slot allocation request on the {{ResourceManager}}, > then we should also check whether we still need all the requested containers. > If it turns out that we no longer need them, then we should try to cancel the > unnecessary container requests. That way Flink will be, for example, a better > Yarn citizen which quickly releases resources and resource requests if no > longer needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8899) Submitting YARN job with FLIP-6 may lead to ApplicationAttemptNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8899: --- Fix Version/s: (was: 1.6.3) 1.6.4 > Submitting YARN job with FLIP-6 may lead to > ApplicationAttemptNotFoundException > --- > > Key: FLINK-8899 > URL: https://issues.apache.org/jira/browse/FLINK-8899 > Project: Flink > Issue Type: Bug > Components: ResourceManager, YARN >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Priority: Major > Labels: flip-6 > Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0 > > > Occasionally, running a simple word count as this > {code} > ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING > {code} > leads to an {{ApplicationAttemptNotFoundException}} in the logs: > {code} > 2018-03-08 16:18:08,507 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming > WordCount (df707a3c9817ddf5936efe56d427e2bd) switched from state RUNNING to > FINISHED. > 2018-03-08 16:18:08,508 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job df707a3c9817ddf5936efe56d427e2bd > 2018-03-08 16:18:08,508 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Shutting down > 2018-03-08 16:18:08,536 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job > df707a3c9817ddf5936efe56d427e2bd reached globally terminal state FINISHED. > 2018-03-08 16:18:08,611 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Stopping the JobMaster for job Streaming > WordCount(df707a3c9817ddf5936efe56d427e2bd). > 2018-03-08 16:18:08,634 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Close ResourceManager connection > dcfdc329d61aae0ace2de26292c8916b: JobManager is shutting down.. > 2018-03-08 16:18:08,634 INFO org.apache.flink.yarn.YarnResourceManager > - Disconnect job manager > 0...@akka.tcp://fl...@ip-172-31-2-0.eu-west-1.compute.internal:38555/user/jobmanager_0 > for job df707a3c9817ddf5936efe56d427e2bd from the resource manager. > 2018-03-08 16:18:08,664 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending > SlotPool. > 2018-03-08 16:18:08,664 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping > SlotPool. > 2018-03-08 16:18:08,664 INFO > org.apache.flink.runtime.jobmaster.JobManagerRunner - > JobManagerRunner already shutdown. > 2018-03-08 16:18:09,650 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager adc8090bdb3f7052943ff86bde7d2a7b at the SlotManager. > 2018-03-08 16:18:09,654 INFO org.apache.flink.yarn.YarnResourceManager > - Replacing old instance of worker for ResourceID > container_1519984124671_0090_01_05 > 2018-03-08 16:18:09,654 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - > Unregister TaskManager adc8090bdb3f7052943ff86bde7d2a7b from the SlotManager. > 2018-03-08 16:18:09,654 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager b975dbd16e0fd59c1168d978490a4b76 at the SlotManager. > 2018-03-08 16:18:09,654 INFO org.apache.flink.yarn.YarnResourceManager > - The target with resource ID > container_1519984124671_0090_01_05 is already been monitored. > 2018-03-08 16:18:09,992 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 73c258a0dbad236501b8391971c330ba at the SlotManager. > 2018-03-08 16:18:10,000 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED > SIGNAL 15: SIGTERM. Shutting down as requested. > 2018-03-08 16:18:10,028 ERROR > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - Exception > on heartbeat > org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: > Application attempt appattempt_1519984124671_0090_01 doesn't exist in > ApplicationMasterService cache. > at > org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:403) > at > org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60) > at > org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99) > at > org.apache.hadoop.i
[jira] [Updated] (FLINK-9142) Lower the minimum number of buffers for incoming channels to 1
[ https://issues.apache.org/jira/browse/FLINK-9142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9142: --- Fix Version/s: (was: 1.6.3) 1.6.4 > Lower the minimum number of buffers for incoming channels to 1 > -- > > Key: FLINK-9142 > URL: https://issues.apache.org/jira/browse/FLINK-9142 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: boshu Zheng >Priority: Major > Fix For: 1.6.4, 1.7.2, 1.8.0 > > > Even if we make the floating buffers optional, we still require > {{taskmanager.network.memory.buffers-per-channel}} number of (exclusive) > buffers per incoming channel with credit-based flow control while without, > the minimum was 1 and only the maximum number of buffers was influenced by > this parameter. > {{taskmanager.network.memory.buffers-per-channel}} is set to {{2}} by default > with the argumentation that this way we will have one buffer available for > netty to process while a worker thread is processing/deserializing the other > buffer. While this seems reasonable, it does increase our minimum > requirements. Instead, we could probably live with {{1}} exclusive buffer and > up to {{gate.getNumberOfInputChannels() * (networkBuffersPerChannel - 1) + > extraNetworkBuffersPerGate}} floating buffers. That way we will have the same > memory footprint as before with only slightly changed behaviour. -- This message was sent by Atlassian JIRA (v7.6.3#76005)