[jira] [Commented] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component

2018-12-17 Thread Alexander Lehmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723766#comment-16723766
 ] 

Alexander Lehmann commented on FLINK-10429:
---

Like [~zhuzh] mentioned, a pluggable scheduling mechanism would be great, e.g. 
in cases where one expects large operator states and therefore the latter 
shouldn't be randomly scheduled (worst case would be that all end up on a 
single TaskManager), but maybe follow a certain heuristic or scheduling hints 
or ...

> Redesign Flink Scheduling, introducing dedicated Scheduler component
> 
>
> Key: FLINK-10429
> URL: https://issues.apache.org/jira/browse/FLINK-10429
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> This epic tracks the redesign of scheduling in Flink. Scheduling is currently 
> a concern that is scattered across different components, mainly the 
> ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on 
> the granularity of individual tasks, which make holistic scheduling 
> strategies hard to implement. In this epic we aim to introduce a dedicated 
> Scheduler component that can support use-case like auto-scaling, 
> local-recovery, and resource optimized batch.
> The design for this feature is developed here: 
> https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] KarmaGYZ commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2018-12-17 Thread GitBox
KarmaGYZ commented on a change in pull request #7188: [FLINK-10473][State TTL] 
TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r242114883
 
 

 ##
 File path: docs/dev/stream/state/state.md
 ##
 @@ -394,6 +396,47 @@ val ttlConfig = StateTtlConfig
 
 This option is not applicable for the incremental checkpointing in the RocksDB 
state backend.
 
+# Incremental cleanup
+
+Another option is to trigger cleanup of some state entries incrementally.
+The trigger can be a callback from each state access or/and each record 
processing.
+If this cleanup strategy is active for certain state,
+The storage backend keeps a lazy global iterator for this state over all its 
entries.
+Every time incremental cleanup is triggered, the iterator is advanced.
+The traversed state entries are checked and expired ones are cleaned up.
+
+This feature can be activated in `StateTtlConfig`:
+
+
+
+{% highlight java %}
+import org.apache.flink.api.common.state.StateTtlConfig;
+ StateTtlConfig ttlConfig = StateTtlConfig
+.newBuilder(Time.seconds(1))
+.cleanupInRocksdbCompactFilter()
+.build();
+{% endhighlight %}
+
+ 
+{% highlight scala %}
+import org.apache.flink.api.common.state.StateTtlConfig
+ val ttlConfig = StateTtlCon fig
+.newBuilder(Time.seconds(1))
+.cleanupInRocksdbCompactFilter
 
 Review comment:
   +1, cleanupIncrementally


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] KarmaGYZ commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2018-12-17 Thread GitBox
KarmaGYZ commented on a change in pull request #7188: [FLINK-10473][State TTL] 
TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r242114799
 
 

 ##
 File path: docs/dev/stream/state/state.md
 ##
 @@ -394,6 +396,47 @@ val ttlConfig = StateTtlConfig
 
 This option is not applicable for the incremental checkpointing in the RocksDB 
state backend.
 
+# Incremental cleanup
+
+Another option is to trigger cleanup of some state entries incrementally.
+The trigger can be a callback from each state access or/and each record 
processing.
+If this cleanup strategy is active for certain state,
+The storage backend keeps a lazy global iterator for this state over all its 
entries.
+Every time incremental cleanup is triggered, the iterator is advanced.
+The traversed state entries are checked and expired ones are cleaned up.
+
+This feature can be activated in `StateTtlConfig`:
+
+
+
+{% highlight java %}
+import org.apache.flink.api.common.state.StateTtlConfig;
+ StateTtlConfig ttlConfig = StateTtlConfig
+.newBuilder(Time.seconds(1))
+.cleanupInRocksdbCompactFilter()
 
 Review comment:
   It should be cleanupIncrementally.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-11180) ProcessFailureCancelingITCase#testCancelingOnProcessFailure

2018-12-17 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723762#comment-16723762
 ] 

sunjincheng edited comment on FLINK-11180 at 12/18/18 7:02 AM:
---

Hi [~hequn8128] Thanks for take this JIRA. I had check the issue again, I find 
that root cause is the port of 8081 already in used in my local. 

Thanks again for help me take the JIRA. 

So, this is not the blocker for release-1.7.1.


was (Author: sunjincheng121):
Hi [~hequn8128] Thanks for take this JIRA. I had check the issue again, I find 
that root cause is the port of 8081 already in used in my local. So this issue 
do not need fix!

Thanks again for help me take the JIRA. 

So, this is not the blocker for release-1.7.1.

> ProcessFailureCancelingITCase#testCancelingOnProcessFailure
> ---
>
> Key: FLINK-11180
> URL: https://issues.apache.org/jira/browse/FLINK-11180
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: Hequn Cheng
>Priority: Major
>
> tag: release-1.7.1-rc2
> org.apache.flink.util.FlinkException: Could not create the 
> DispatcherResourceManagerComponent.
> at 
> org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:242)
>  at 
> org.apache.flink.test.recovery.ProcessFailureCancelingITCase.testCancelingOnProcessFailure(ProcessFailureCancelingITCase.java:148)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>  at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>  at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>  at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>  at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>  at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>  at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>  at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.net.BindException: Address already in use
>  at sun.nio.ch.Net.bind0(Native Method)
>  at sun.nio.ch.Net.bind(Net.java:433)
>  at sun.nio.ch.Net.bind(Net.java:425)
>  at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1358)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:1019)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
>  at 
> 

[jira] [Commented] (FLINK-11180) ProcessFailureCancelingITCase#testCancelingOnProcessFailure

2018-12-17 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723762#comment-16723762
 ] 

sunjincheng commented on FLINK-11180:
-

Hi [~hequn8128] Thanks for take this JIRA. I had check the issue again, I find 
that root cause is the port of 8081 already in used in my local. So this issue 
do not need fix!

Thanks again for help me take the JIRA. 

So, this is not the blocker for release-1.7.1.

> ProcessFailureCancelingITCase#testCancelingOnProcessFailure
> ---
>
> Key: FLINK-11180
> URL: https://issues.apache.org/jira/browse/FLINK-11180
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: Hequn Cheng
>Priority: Major
>
> tag: release-1.7.1-rc2
> org.apache.flink.util.FlinkException: Could not create the 
> DispatcherResourceManagerComponent.
> at 
> org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:242)
>  at 
> org.apache.flink.test.recovery.ProcessFailureCancelingITCase.testCancelingOnProcessFailure(ProcessFailureCancelingITCase.java:148)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>  at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>  at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>  at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>  at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>  at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>  at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>  at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.net.BindException: Address already in use
>  at sun.nio.ch.Net.bind0(Native Method)
>  at sun.nio.ch.Net.bind(Net.java:433)
>  at sun.nio.ch.Net.bind(Net.java:425)
>  at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1358)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:1019)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
>  at 
> org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:366)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>  at 
> 

[GitHub] QiLuo-BD commented on issue #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions

2018-12-17 Thread GitBox
QiLuo-BD commented on issue #7186: [FLINK-10941] Keep slots which contain 
unconsumed result partitions
URL: https://github.com/apache/flink/pull/7186#issuecomment-448115581
 
 
   Hi Zhijiang/Till,
   
   Could you kindly share any further review on this? If no more comments, can 
this be merged?
   
   Thanks,
   Qi


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xueyumusic commented on issue #7318: [FLINK-11099][Table API] Migrate flink-table runtime CRow Types classes

2018-12-17 Thread GitBox
xueyumusic commented on issue #7318: [FLINK-11099][Table API] Migrate 
flink-table runtime  CRow  Types classes
URL: https://github.com/apache/flink/pull/7318#issuecomment-448114794
 
 
   Hi, @XuQianJin-Stars , according to FLIP-28 it looks CRows may need to 
migrate into flink-table-runtime module. This moudle will be set up, is it 
right? @twalthr 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11181) SimpleRecoveryITCaseBase test error

2018-12-17 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723753#comment-16723753
 ] 

Hequn Cheng commented on FLINK-11181:
-

This is not a release blocker as it is a bug of the test. 

> SimpleRecoveryITCaseBase test error
> ---
>
> Key: FLINK-11181
> URL: https://issues.apache.org/jira/browse/FLINK-11181
> Project: Flink
>  Issue Type: Sub-task
>Reporter: sunjincheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Run many times always fail.
> at 
> org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.executeAndRunAssertions(SimpleRecoveryITCaseBase.java:124)
>  at 
> org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.testRestart(SimpleRecoveryITCaseBase.java:150)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>  at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>  at org.junit.runners.Suite.runChild(Suite.java:128)
>  at org.junit.runners.Suite.runChild(Suite.java:27)
>  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>  at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>  at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>  at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>  at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>  at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11181) SimpleRecoveryITCaseBase test error

2018-12-17 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723758#comment-16723758
 ] 

sunjincheng commented on FLINK-11181:
-

Thanks for help me check this issue [~hequn8128]
Glad to hear that the issue is not the blocker for release-1.7.1.


> SimpleRecoveryITCaseBase test error
> ---
>
> Key: FLINK-11181
> URL: https://issues.apache.org/jira/browse/FLINK-11181
> Project: Flink
>  Issue Type: Sub-task
>Reporter: sunjincheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Run many times always fail.
> at 
> org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.executeAndRunAssertions(SimpleRecoveryITCaseBase.java:124)
>  at 
> org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.testRestart(SimpleRecoveryITCaseBase.java:150)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>  at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>  at org.junit.runners.Suite.runChild(Suite.java:128)
>  at org.junit.runners.Suite.runChild(Suite.java:27)
>  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>  at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>  at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>  at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>  at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>  at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 commented on issue #7315: [FLINK-11179][tests] add wait time for every record for testCancelSor…

2018-12-17 Thread GitBox
sunjincheng121 commented on issue #7315: [FLINK-11179][tests] add wait time for 
every record for testCancelSor…
URL: https://github.com/apache/flink/pull/7315#issuecomment-448112450
 
 
   Hi @hequn8128 I had update the PR. appreciate if you can look at it again!
   Thanks,
   Jincheng


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11102) Enable check for previous data in SpanningRecordSerializer

2018-12-17 Thread boshu Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

boshu Zheng reassigned FLINK-11102:
---

Assignee: boshu Zheng  (was: vinoyang)

> Enable check for previous data in SpanningRecordSerializer
> --
>
> Key: FLINK-11102
> URL: https://issues.apache.org/jira/browse/FLINK-11102
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.8.0
>Reporter: Nico Kruber
>Assignee: boshu Zheng
>Priority: Major
>
> {{SpanningRecordSerializer}} only verifies that there is no left-over data 
> from a previous serialization call if {{SpanningRecordSerializer#CHECKED}} is 
> {{true}} but this is hard-coded as {{false}}. Now if there was previous data, 
> we would silently drop this and continue with our data. The deserializer 
> would probably notice and fail but identifying the root cause may not be as 
> easy anymore.
> -> We should enable that check by default since the only thing it does is to 
> verify {{!java.nio.Buffer#hasRemaining()}} which cannot be too expensive.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11182) JobManagerHACheckpointRecoveryITCase need be improved

2018-12-17 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723748#comment-16723748
 ] 

sunjincheng edited comment on FLINK-11182 at 12/18/18 6:28 AM:
---

Sounds good. It's grate that  FLINK-10392  will solve this issue. and I had add 
the link to  FLINK-10392.
 I agree that it's not the blocker for release-1.7.1. 


was (Author: sunjincheng121):
Yes, that's grate that  FLINK-10392  will solve this issue. and I had add the 
link to  FLINK-10392.
 I agree that it's not the blocker for release-1.7.1. 

> JobManagerHACheckpointRecoveryITCase need be improved
> -
>
> Key: FLINK-11182
> URL: https://issues.apache.org/jira/browse/FLINK-11182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Priority: Major
>
> 警告: An exception was thrown by an exception handler.
> java.util.concurrent.RejectedExecutionException: Worker has already been 
> shutdown
>  at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120)
>  at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72)
>  at 
> org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
>  at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56)
>  at 
> org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
>  at 
> org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline.execute(DefaultChannelPipeline.java:636)
>  at 
> org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496)
>  at 
> org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline.notifyHandlerException(DefaultChannelPipeline.java:658)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:781)
>  at 
> org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784)
>  at 
> org.jboss.netty.channel.SimpleChannelHandler.disconnectRequested(SimpleChannelHandler.java:320)
>  at 
> org.jboss.netty.channel.SimpleChannelHandler.handleDownstream(SimpleChannelHandler.java:274)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
>  at org.jboss.netty.channel.Channels.disconnect(Channels.java:781)
>  at 
> org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:219)
>  at 
> akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:241)
>  at 
> akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:240)
>  at scala.util.Success.foreach(Try.scala:236)
>  at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:206)
>  at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:206)
>  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>  at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>  at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>  at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>  at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>  at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>  at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>  at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11182) JobManagerHACheckpointRecoveryITCase need be improved

2018-12-17 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723748#comment-16723748
 ] 

sunjincheng commented on FLINK-11182:
-

Yes, that's grate that  FLINK-10392  will solve this issue. and I had add the 
link to  FLINK-10392.
 I agree that it's not the blocker for release-1.7.1. 

> JobManagerHACheckpointRecoveryITCase need be improved
> -
>
> Key: FLINK-11182
> URL: https://issues.apache.org/jira/browse/FLINK-11182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Priority: Major
>
> 警告: An exception was thrown by an exception handler.
> java.util.concurrent.RejectedExecutionException: Worker has already been 
> shutdown
>  at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120)
>  at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72)
>  at 
> org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
>  at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56)
>  at 
> org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
>  at 
> org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline.execute(DefaultChannelPipeline.java:636)
>  at 
> org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496)
>  at 
> org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline.notifyHandlerException(DefaultChannelPipeline.java:658)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:781)
>  at 
> org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784)
>  at 
> org.jboss.netty.channel.SimpleChannelHandler.disconnectRequested(SimpleChannelHandler.java:320)
>  at 
> org.jboss.netty.channel.SimpleChannelHandler.handleDownstream(SimpleChannelHandler.java:274)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
>  at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
>  at org.jboss.netty.channel.Channels.disconnect(Channels.java:781)
>  at 
> org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:219)
>  at 
> akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:241)
>  at 
> akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:240)
>  at scala.util.Success.foreach(Try.scala:236)
>  at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:206)
>  at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:206)
>  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>  at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>  at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>  at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>  at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>  at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>  at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>  at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] leesf removed a comment on issue #7262: [FLINK-10478] Kafka Producer wrongly formats % for transaction ID

2018-12-17 Thread GitBox
leesf removed a comment on issue #7262: [FLINK-10478] Kafka Producer wrongly 
formats % for transaction ID
URL: https://github.com/apache/flink/pull/7262#issuecomment-447745066
 
 
   @tillrohrmann hi till, could you please merge this pr when you are free. 
Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] leesf commented on issue #7262: [FLINK-10478] Kafka Producer wrongly formats % for transaction ID

2018-12-17 Thread GitBox
leesf commented on issue #7262: [FLINK-10478] Kafka Producer wrongly formats % 
for transaction ID
URL: https://github.com/apache/flink/pull/7262#issuecomment-448110778
 
 
   @tzulitai hi tzu, could you please merge this pr when you are free. Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11179) JoinCancelingITCase#testCancelSortMatchWhileDoingHeavySorting test error

2018-12-17 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723742#comment-16723742
 ] 

sunjincheng commented on FLINK-11179:
-

The issue is not the blocker for release-1.7.1. 

>  JoinCancelingITCase#testCancelSortMatchWhileDoingHeavySorting test error
> -
>
> Key: FLINK-11179
> URL: https://issues.apache.org/jira/browse/FLINK-11179
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> tag: release-1.7.1-rc2
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
> Flink job (f8abcfa2bf2f9bf13024075e51891d2e)
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.client.program.MiniClusterClient.cancel(MiniClusterClient.java:118)
>  at 
> org.apache.flink.test.cancelling.CancelingTestBase.runAndCancelJob(CancelingTestBase.java:109)
>  at 
> org.apache.flink.test.cancelling.JoinCancelingITCase.executeTaskWithGenerator(JoinCancelingITCase.java:94)
>  at 
> org.apache.flink.test.cancelling.JoinCancelingITCase.testCancelSortMatchWhileDoingHeavySorting(JoinCancelingITCase.java:99)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>  at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>  at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>  at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>  at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>  at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>  at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>  at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could 
> not find Flink job (f8abcfa2bf2f9bf13024075e51891d2e)
>  at 
> org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGatewayFuture(Dispatcher.java:766)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kisimple opened a new pull request #7321: [hotfix] Fix typo in AbstractStreamOperator

2018-12-17 Thread GitBox
kisimple opened a new pull request #7321: [hotfix] Fix typo in 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/7321
 
 
   retrieven -> retrieved


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kisimple commented on issue #7192: [hotfix][test][streaming] Fix a test failure in Windows 7 environment.

2018-12-17 Thread GitBox
kisimple commented on issue #7192: [hotfix][test][streaming] Fix a test failure 
in Windows 7 environment.
URL: https://github.com/apache/flink/pull/7192#issuecomment-448099846
 
 
   This would be an annoying problem when working on Windows. cc @kl0u 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11083) CRowSerializerConfigSnapshot is not instantiable

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723649#comment-16723649
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11083:
-

Merged.

master (1.8.0) - 45f9cccbe2f2aefe392b2c2df2a464eaed2e0323
release-1.7 - 5ce9f228959469b9e7c8e585aaffadc7eff5d028
release-1.6 - 26b182c3ce21968118b483bfa8aca38ecd54a8a3

> CRowSerializerConfigSnapshot is not instantiable
> 
>
> Key: FLINK-11083
> URL: https://issues.apache.org/jira/browse/FLINK-11083
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL, Type Serialization System
>Reporter: boshu Zheng
>Assignee: boshu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.2, 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> An exception was encountered when restarting a job with savepoint in our 
> production env,
> {code:java}
> 2018-12-04 20:28:25,091 INFO  10595 org.apache.flink.runtime.taskmanager.Task 
>   :917  - _OurCustomOperator_ -> select: () -> to: Tuple2 -> 
> Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING 
> to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) 
> from any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>   ... 5 more
> Caused by: java.lang.RuntimeException: The class 
> 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot'
>  is not instantiable: The class has no (implicit) public nullary constructor, 
> i.e. a constructor without arguments.
>   at 
> org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412)
>   at 
> org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
>   at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
>   at 
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>   at 
> 

[jira] [Resolved] (FLINK-11083) CRowSerializerConfigSnapshot is not instantiable

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-11083.
-
Resolution: Fixed

> CRowSerializerConfigSnapshot is not instantiable
> 
>
> Key: FLINK-11083
> URL: https://issues.apache.org/jira/browse/FLINK-11083
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL, Type Serialization System
>Reporter: boshu Zheng
>Assignee: boshu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.2, 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> An exception was encountered when restarting a job with savepoint in our 
> production env,
> {code:java}
> 2018-12-04 20:28:25,091 INFO  10595 org.apache.flink.runtime.taskmanager.Task 
>   :917  - _OurCustomOperator_ -> select: () -> to: Tuple2 -> 
> Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING 
> to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) 
> from any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>   ... 5 more
> Caused by: java.lang.RuntimeException: The class 
> 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot'
>  is not instantiable: The class has no (implicit) public nullary constructor, 
> i.e. a constructor without arguments.
>   at 
> org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412)
>   at 
> org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
>   at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
>   at 
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>   ... 7 more
> {code}
> I add tests to CRowSerializerTest to make sure 

[GitHub] asfgit closed pull request #7267: [FLINK-11083][Table] CRowSerializerConfigSnapshot is not instantiable

2018-12-17 Thread GitBox
asfgit closed pull request #7267: [FLINK-11083][Table] 
CRowSerializerConfigSnapshot is not instantiable
URL: https://github.com/apache/flink/pull/7267
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
index 0ce3aee3739..b3fe5085151 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -81,7 +81,7 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) 
extends TypeSeriali
   // 

 
   override def snapshotConfiguration(): TypeSerializerConfigSnapshot[CRow] = {
-new CRowSerializer.CRowSerializerConfigSnapshot(rowSerializer)
+new CRowSerializer.CRowSerializerConfigSnapshot(Array(rowSerializer))
   }
 
   override def ensureCompatibility(
@@ -115,9 +115,13 @@ class CRowSerializer(val rowSerializer: 
TypeSerializer[Row]) extends TypeSeriali
 
 object CRowSerializer {
 
-  class CRowSerializerConfigSnapshot(rowSerializers: TypeSerializer[Row]*)
+  class CRowSerializerConfigSnapshot(rowSerializers: 
Array[TypeSerializer[Row]])
 extends CompositeTypeSerializerConfigSnapshot[CRow](rowSerializers: _*) {
 
+def this() {
+  this(Array.empty)
+}
+
 override def getVersion: Int = CRowSerializerConfigSnapshot.VERSION
   }
 
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala
index 7483b04d9ca..055501a6a01 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala
@@ -18,8 +18,20 @@
 
 package org.apache.flink.table.runtime.types
 
-import org.apache.flink.util.TestLogger
-import org.junit.Test
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
+import org.apache.flink.api.common.typeinfo.Types
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
KeyedProcessOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, InstantiationUtil, TestLogger}
+
+import org.junit.{Assert, Test}
 
 class CRowSerializerTest extends TestLogger {
 
@@ -29,6 +41,70 @@ class CRowSerializerTest extends TestLogger {
   @Test
   def testDefaultConstructor(): Unit = {
 new CRowSerializer.CRowSerializerConfigSnapshot()
+
+
InstantiationUtil.instantiate(classOf[CRowSerializer.CRowSerializerConfigSnapshot])
+  }
+
+  @Test
+  def testStateRestore(): Unit = {
+
+class IKeyedProcessFunction extends KeyedProcessFunction[Integer, Integer, 
Integer] {
+  var state: ListState[CRow] = _
+  override def open(parameters: Configuration): Unit = {
+val stateDesc = new ListStateDescriptor[CRow]("CRow",
+  new CRowTypeInfo(new RowTypeInfo(Types.INT)))
+state = getRuntimeContext.getListState(stateDesc)
+  }
+  override def processElement(value: Integer,
+  ctx: KeyedProcessFunction[Integer, Integer, Integer]#Context,
+  out: Collector[Integer]): Unit = {
+state.add(new CRow(Row.of(value), true))
+  }
+}
+
+val operator = new KeyedProcessOperator[Integer, Integer, Integer](new 
IKeyedProcessFunction)
+
+var testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, 
Integer, Integer](
+  operator,
+  new KeySelector[Integer, Integer] {
+override def getKey(value: Integer): Integer= -1
+  },
+  Types.INT, 1, 1, 0)
+testHarness.setup()
+testHarness.open()
+
+testHarness.processElement(new StreamRecord[Integer](1, 1L))
+testHarness.processElement(new StreamRecord[Integer](2, 1L))
+testHarness.processElement(new StreamRecord[Integer](3, 1L))
+
+Assert.assertEquals(1, numKeyedStateEntries(operator))
+
+val snapshot = 

[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723617#comment-16723617
 ] 

lining edited comment on FLINK-11162 at 12/18/18 3:28 AM:
--

[~yanghua] we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

vinoyang we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

{
   "jid": "ccee3dc6ac63c532feea3e8cdbf05eab",
   "name": "Socket Window WordCount",
   "nodes": [
     {
       "id": "cbc357ccb763df2852fee8c4fc7d55f2",
       "parallelism": 1,
       "operator": "",
       "operator_strategy": "",
       "description": "Source: Socket Stream -> Flat Map",
       "optimizer_properties": {}
     },
     {
       "id": "90bea66de1c231edf33913ecd54406c1",
       "parallelism": 1,
       "operator": "",
       "operator_strategy": "",
       "description": "Window(TumblingProcessingTimeWindows(5000), 
ProcessingTimeTrigger,      ReduceFunction$1, PassThroughWindowFunction) -> 
Sink: Print to Std. Out",
       "inputs": [
          {                                    

           "num": 0, 

           "id": "cbc357ccb763df2852fee8c4fc7d55f2", 

          "ship_strategy": "HASH",  

           "exchange": "pipelined"                          

        }

      ],
        "optimizer_properties": {}
      }
    ],
   "operators": [
       {
          "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2",
         "operator_id": "7df19f87deec5680128845fd9a6ca18d",
         "name": "Flat Map",
         "inputs": [ \{  "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2",  
"partitioner": "FORWARD",   "type_number": 0   },...

        ]
      }
   ]
 }

 


was (Author: lining):
[~yanghua] we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

vinoyang we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

{
   "jid": "ccee3dc6ac63c532feea3e8cdbf05eab",
   "name": "Socket Window WordCount",
   "nodes": [
     {
       "id": "cbc357ccb763df2852fee8c4fc7d55f2",
       "parallelism": 1,
       "operator": "",
       "operator_strategy": "",
       "description": "Source: Socket Stream -> Flat Map",
       "optimizer_properties": {}
     },
     {
       "id": "90bea66de1c231edf33913ecd54406c1",
       "parallelism": 1,
       "operator": "",
       "operator_strategy": "",
       "description": "Window(TumblingProcessingTimeWindows(5000), 
ProcessingTimeTrigger,      ReduceFunction$1, PassThroughWindowFunction) -> 
Sink: Print to Std. Out",
       "inputs": [
          {                        

           "num": 0,

          "id": "cbc357ccb763df2852fee8c4fc7d55f2",              

         "ship_strategy": "HASH",            

        "exchange": "pipelined"                  

        }

      ],
        "optimizer_properties": {}
      }
    ],
   "operators": [
       {
          "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2",
         "operator_id": "7df19f87deec5680128845fd9a6ca18d",
         "name": "Flat Map",
         "inputs": [ 

         

{             "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2",             
"partitioner": "FORWARD",             "type_number": 0           }

,

         ...

        ]
      }
   ]
 }

 

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: lining
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Clarkkkkk commented on issue #7258: [FLINK-11084]Throw a hard exception to remind developers while there's no stream node between two split transformation

2018-12-17 Thread GitBox
Clark commented on issue #7258: [FLINK-11084]Throw a hard exception to 
remind developers while there's no stream node between two split transformation
URL: https://github.com/apache/flink/pull/7258#issuecomment-448084237
 
 
   @walterddr Good point, I will tag it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723629#comment-16723629
 ] 

lining commented on FLINK-11162:


[~till.rohrmann] [~Zentol], what is your point?

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: lining
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723629#comment-16723629
 ] 

lining edited comment on FLINK-11162 at 12/18/18 3:29 AM:
--

[~till.rohrmann] [~Zentol], what are your point?


was (Author: lining):
[~till.rohrmann] [~Zentol], what is your point?

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: lining
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723617#comment-16723617
 ] 

lining edited comment on FLINK-11162 at 12/18/18 3:25 AM:
--

[~yanghua] we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

vinoyang we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

{
   "jid": "ccee3dc6ac63c532feea3e8cdbf05eab",
   "name": "Socket Window WordCount",
   "nodes": [
     {
       "id": "cbc357ccb763df2852fee8c4fc7d55f2",
       "parallelism": 1,
       "operator": "",
       "operator_strategy": "",
       "description": "Source: Socket Stream -> Flat Map",
       "optimizer_properties": {}
     },
     {
       "id": "90bea66de1c231edf33913ecd54406c1",
       "parallelism": 1,
       "operator": "",
       "operator_strategy": "",
       "description": "Window(TumblingProcessingTimeWindows(5000), 
ProcessingTimeTrigger,      ReduceFunction$1, PassThroughWindowFunction) -> 
Sink: Print to Std. Out",
       "inputs": [
          {                        

           "num": 0,

          "id": "cbc357ccb763df2852fee8c4fc7d55f2",              

         "ship_strategy": "HASH",            

        "exchange": "pipelined"                  

        }

      ],
        "optimizer_properties": {}
      }
    ],
   "operators": [
       {
          "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2",
         "operator_id": "7df19f87deec5680128845fd9a6ca18d",
         "name": "Flat Map",
         "inputs": [ 

         

{             "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2",             
"partitioner": "FORWARD",             "type_number": 0           }

,

         ...

        ]
      }
   ]
 }

 


was (Author: lining):
[~yanghua] we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

vinoyang we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

{
   "jid": "ccee3dc6ac63c532feea3e8cdbf05eab",
   "name": "Socket Window WordCount",
   "nodes": [
     {
       "id": "cbc357ccb763df2852fee8c4fc7d55f2",
       "parallelism": 1,
       "operator": "",
       "operator_strategy": "",
       "description": "Source: Socket Stream -> Flat Map",
       "optimizer_properties": {}
     },
     {
       "id": "90bea66de1c231edf33913ecd54406c1",
       "parallelism": 1,
       "operator": "",
       "operator_strategy": "",
       "description": "Window(TumblingProcessingTimeWindows(5000), 
ProcessingTimeTrigger,      ReduceFunction$1, PassThroughWindowFunction) -> 
Sink: Print to Std. Out",
       "inputs": [
          {          

             "num": 0,

             "id": "cbc357ccb763df2852fee8c4fc7d55f2",  

             "ship_strategy": "HASH",

             "exchange": "pipelined"        

          }

       ],
       "optimizer_properties": {}
     }
   ],
  "operators": [
      {
         "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2",
        "operator_id": "7df19f87deec5680128845fd9a6ca18d",
        "name": "Flat Map",
        "inputs": [ 

          {

            "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2",

            "partitioner": "FORWARD",

            "type_number": 0

          },

         ...

        ]
     }
  ]
 }

 

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: lining
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723617#comment-16723617
 ] 

lining edited comment on FLINK-11162 at 12/18/18 3:24 AM:
--

[~yanghua] we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

vinoyang we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

{
   "jid": "ccee3dc6ac63c532feea3e8cdbf05eab",
   "name": "Socket Window WordCount",
   "nodes": [
     {
       "id": "cbc357ccb763df2852fee8c4fc7d55f2",
       "parallelism": 1,
       "operator": "",
       "operator_strategy": "",
       "description": "Source: Socket Stream -> Flat Map",
       "optimizer_properties": {}
     },
     {
       "id": "90bea66de1c231edf33913ecd54406c1",
       "parallelism": 1,
       "operator": "",
       "operator_strategy": "",
       "description": "Window(TumblingProcessingTimeWindows(5000), 
ProcessingTimeTrigger,      ReduceFunction$1, PassThroughWindowFunction) -> 
Sink: Print to Std. Out",
       "inputs": [
          {          

             "num": 0,

             "id": "cbc357ccb763df2852fee8c4fc7d55f2",  

             "ship_strategy": "HASH",

             "exchange": "pipelined"        

          }

       ],
       "optimizer_properties": {}
     }
   ],
  "operators": [
      {
         "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2",
        "operator_id": "7df19f87deec5680128845fd9a6ca18d",
        "name": "Flat Map",
        "inputs": [ 

          {

            "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2",

            "partitioner": "FORWARD",

            "type_number": 0

          },

         ...

        ]
     }
  ]
 }

 


was (Author: lining):
[~yanghua] we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

vinoyang we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

{
  "jid": "ccee3dc6ac63c532feea3e8cdbf05eab",
  "name": "Socket Window WordCount",
  "nodes": [
    {
      "id": "cbc357ccb763df2852fee8c4fc7d55f2",
      "parallelism": 1,
      "operator": "",
      "operator_strategy": "",
      "description": "Source: Socket Stream - Flat Map",
      "optimizer_properties": {}
    },
    {
      "id": "90bea66de1c231edf33913ecd54406c1",
      "parallelism": 1,
      "operator": "",
      "operator_strategy": "",
      "description": "Window(TumblingProcessingTimeWindows(5000), 
ProcessingTimeTrigger,      ReduceFunction$1, PassThroughWindowFunction) - 
Sink: Print to Std. Out",
      "inputs": [
        {
           "num": 0,
           "id": "cbc357ccb763df2852fee8c4fc7d55f2",
           "ship_strategy": "HASH",
           "exchange": "pipelined"
        }
     ],
    "optimizer_properties": {}
  }
 ],
 "operators": [
 {
 "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2",
 "operator_id": "7df19f87deec5680128845fd9a6ca18d",
 "name": "Flat Map",
 "inputs": [
 {
 "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2",
 "partitioner": "FORWARD",
 "type_number": 0
 }
 ]
 },
 {
 "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2",
 "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2",
 "name": "Source: Socket Stream",
 "inputs": [],
 "metric_name": "Source__Socket_Stream"
 },
 {
 "vertex_id": "90bea66de1c231edf33913ecd54406c1",
 "operator_id": "17fbfcaabad45985bbdf4da0490487e3",
 "name": "Sink: Print to Std. Out",
 "inputs": [
 {
 "operator_id": "90bea66de1c231edf33913ecd54406c1",
 "partitioner": "FORWARD",
 "type_number": 0
 }
 ]
 },
 {
  "vertex_id": "90bea66de1c231edf33913ecd54406c1",
 "operator_id": "90bea66de1c231edf33913ecd54406c1",
 "name": "Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, 
ReduceFunction$1, PassThroughWindowFunction)",
 "inputs": [
 {
 "operator_id": "7df19f87deec5680128845fd9a6ca18d",
 "partitioner": "HASH",
 "type_number": 0
 }
 ]
 }
 ]
}

 

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: lining
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723623#comment-16723623
 ] 

vinoyang commented on FLINK-11162:
--

My initial thought was to add to jobs:/jobid/plan, but this may cause 
incompatibilities with existing interfaces, but the subsequent rest API will 
carry the version number, maybe compatibility will no longer be a hindrance? 
Providing a separate interface, we don't need to worry about any problems, but 
I don't know if it is necessary enough. Maybe you can listen to the ideas of 
[~till.rohrmann] and [~Zentol] before you start.

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: lining
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723617#comment-16723617
 ] 

lining edited comment on FLINK-11162 at 12/18/18 3:19 AM:
--

[~yanghua] we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

vinoyang we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

I prefer in old api, result like this

{
  "jid": "ccee3dc6ac63c532feea3e8cdbf05eab",
  "name": "Socket Window WordCount",
  "nodes": [
    {
      "id": "cbc357ccb763df2852fee8c4fc7d55f2",
      "parallelism": 1,
      "operator": "",
      "operator_strategy": "",
      "description": "Source: Socket Stream - Flat Map",
      "optimizer_properties": {}
    },
    {
      "id": "90bea66de1c231edf33913ecd54406c1",
      "parallelism": 1,
      "operator": "",
      "operator_strategy": "",
      "description": "Window(TumblingProcessingTimeWindows(5000), 
ProcessingTimeTrigger,      ReduceFunction$1, PassThroughWindowFunction) - 
Sink: Print to Std. Out",
      "inputs": [
        {
           "num": 0,
           "id": "cbc357ccb763df2852fee8c4fc7d55f2",
           "ship_strategy": "HASH",
           "exchange": "pipelined"
        }
     ],
    "optimizer_properties": {}
  }
 ],
 "operators": [
 {
 "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2",
 "operator_id": "7df19f87deec5680128845fd9a6ca18d",
 "name": "Flat Map",
 "inputs": [
 {
 "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2",
 "partitioner": "FORWARD",
 "type_number": 0
 }
 ]
 },
 {
 "vertex_id": "cbc357ccb763df2852fee8c4fc7d55f2",
 "operator_id": "cbc357ccb763df2852fee8c4fc7d55f2",
 "name": "Source: Socket Stream",
 "inputs": [],
 "metric_name": "Source__Socket_Stream"
 },
 {
 "vertex_id": "90bea66de1c231edf33913ecd54406c1",
 "operator_id": "17fbfcaabad45985bbdf4da0490487e3",
 "name": "Sink: Print to Std. Out",
 "inputs": [
 {
 "operator_id": "90bea66de1c231edf33913ecd54406c1",
 "partitioner": "FORWARD",
 "type_number": 0
 }
 ]
 },
 {
  "vertex_id": "90bea66de1c231edf33913ecd54406c1",
 "operator_id": "90bea66de1c231edf33913ecd54406c1",
 "name": "Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, 
ReduceFunction$1, PassThroughWindowFunction)",
 "inputs": [
 {
 "operator_id": "7df19f87deec5680128845fd9a6ca18d",
 "partitioner": "HASH",
 "type_number": 0
 }
 ]
 }
 ]
}

 


was (Author: lining):
[~yanghua] we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: lining
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-17 Thread GitBox
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r242392587
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ##
 @@ -489,6 +491,88 @@ object UserDefinedFunctionUtils {
   )
 }
 
+def decorateDataViewTypeInfo(
+fieldTypeInfo: TypeInformation[_],
+fieldInstance: AnyRef,
+field: Field): (TypeInformation[_], Option[DataViewSpec[_]]) = 
fieldTypeInfo match {
+  case ct: CompositeType[_] if includesDataView(ct) =>
+throw new TableException(
+  "MapView and ListView only supported at first level of accumulators 
of Pojo, Tuple " +
+"and Case Class type.")
+  case map: MapViewTypeInfo[_, _] =>
+val mapView = fieldInstance.asInstanceOf[MapView[_, _]]
+val newTypeInfo = if (mapView != null && mapView.keyTypeInfo != null &&
+  mapView.valueTypeInfo != null) {
+  new MapViewTypeInfo(mapView.keyTypeInfo, mapView.valueTypeInfo)
+} else {
+  map
+}
+
+if (isStateBackedDataViews) {
+  newTypeInfo.nullSerializer = true
+
+  // create map view specs with unique id (used as state name)
+  val fieldName = field.getName
+  var spec = MapViewSpec(
+"agg" + index + "$" + fieldName,
+field,
+newTypeInfo)
+
+  (newTypeInfo, Some(spec))
+} else {
+  (newTypeInfo, None)
+}
+
+  case list: ListViewTypeInfo[_] =>
 
 Review comment:
   Maybe add a ListView test case?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-17 Thread GitBox
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r242392503
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ##
 @@ -489,6 +491,88 @@ object UserDefinedFunctionUtils {
   )
 }
 
+def decorateDataViewTypeInfo(
+fieldTypeInfo: TypeInformation[_],
+fieldInstance: AnyRef,
+field: Field): (TypeInformation[_], Option[DataViewSpec[_]]) = 
fieldTypeInfo match {
+  case ct: CompositeType[_] if includesDataView(ct) =>
+throw new TableException(
+  "MapView and ListView only supported at first level of accumulators 
of Pojo, Tuple " +
+"and Case Class type.")
+  case map: MapViewTypeInfo[_, _] =>
+val mapView = fieldInstance.asInstanceOf[MapView[_, _]]
+val newTypeInfo = if (mapView != null && mapView.keyTypeInfo != null &&
+  mapView.valueTypeInfo != null) {
+  new MapViewTypeInfo(mapView.keyTypeInfo, mapView.valueTypeInfo)
+} else {
+  map
+}
+
+if (isStateBackedDataViews) {
 
 Review comment:
   I was wondering if this is ever needed. since the test case added in this PR 
doesn't cover this code path on the else case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-17 Thread GitBox
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r242393559
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+
+class NullSerializer extends TypeSerializerSingleton[Any] {
 
 Review comment:
   I am assuming `NullSerializer` is needed because the actual map is actually 
handled by StateBackend and because we removed them through 
`removeStateViewFieldsFromAccTypeInfo`. I was wondering why is this needed 
since it was not introduced in previous approach. Neither was it reflected in 
the test case ( still passes without the nullserializer setting)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723617#comment-16723617
 ] 

lining commented on FLINK-11162:


[~yanghua] we need add this information in which api, create new or use old api 
like jobs:/jobid/plan?

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: lining
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11184) Rework TableSource and TableSink interfaces

2018-12-17 Thread Wenlong Lyu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723603#comment-16723603
 ] 

Wenlong Lyu commented on FLINK-11184:
-

Hi, Timo, I think there is one more issue may needed to be considered:add an 
explicit way to defining the conversion from a user defined type  to 
structed type in table, which make the conversion from a DataStream to a 
Dynamic Table more clear.

> Rework TableSource and TableSink interfaces
> ---
>
> Key: FLINK-11184
> URL: https://issues.apache.org/jira/browse/FLINK-11184
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> There are a couple of shortcomings with the current {{TableSource}} and 
> {{TableSink}} interface design. Some of the issues are covered in a [basic 
> design 
> document|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf]
>  that was published a while ago.
> The design document has not been updated for some time and partially overlaps 
> with the [current SQL DDL 
> discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-SQL-DDL-Design-td25006.html]
>  for the {{CREATE TABLE}} statement on the ML.
> What needs to be solved:
> - How to unify sources and sinks in regards of schema and time attributes?
> - How to define watermarks, timestamp extractors or timestamp ingestion?
> - How to define primary keys and partitioning keys?
> - How to differentiate between update modes for tables (i.e. how to read from 
> a append, retraction, or update table)?
> - How to express all of the above without pulling in to many dependencies on 
> other Flink modules if source and sink interfaces are located in 
> {{flink-table-spi}} package?
> As of the current state of the discussion, it seems that we might extend 
> {{TableSchema}} to allow for returning the information above and remove 
> current interfaces such as {{DefinedRowtimeAttribute}} or 
> {{DefinedFieldMapping}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] XiaoZYang closed pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector

2018-12-17 Thread GitBox
XiaoZYang closed pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink 
connector
URL: https://github.com/apache/flink/pull/5845
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-pulsar/pom.xml 
b/flink-connectors/flink-connector-pulsar/pom.xml
new file mode 100644
index 000..a0ac2c1facf
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/pom.xml
@@ -0,0 +1,121 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.6-SNAPSHOT
+   ..
+   
+
+   flink-connector-pulsar_${scala.binary.version}
+   flink-connector-pulsar
+   
+   1.20.0-incubating
+   
+
+   jar
+
+   
+
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
flink-table_${scala.binary.version}
+   ${project.version}
+   provided
+   
+   true
+   
+
+   
+   org.apache.pulsar
+   pulsar-client
+   shaded
+   ${pulsar.version}
+   
+
+   
+   org.apache.flink
+   
flink-runtime_${scala.binary.version}
+   ${project.version}
+   test-jar
+   test
+   
+
+   
+   org.apache.flink
+   
flink-tests_${scala.binary.version}
+   ${project.version}
+   test
+   test-jar
+   
+
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   test-jar
+   test
+   
+
+   
+   org.apache.flink
+   
flink-test-utils_${scala.binary.version}
+   ${project.version}
+   test
+   
+
+   
+   org.javassist
+   javassist
+   3.20.0-GA
+   test
+   
+
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
+   test-jar
+   
+   
+   
+   
+   
+   
+
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
new file mode 100644
index 000..64b1397c217
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import 

[GitHub] XiaoZYang commented on issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector

2018-12-17 Thread GitBox
XiaoZYang commented on issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink 
connector
URL: https://github.com/apache/flink/pull/5845#issuecomment-448070113
 
 
   @sijie @tzulitai Thank you for your works and suggestions, I'm going to 
close this PR, any other advice?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11172) Remove the max retention time in StreamQueryConfig

2018-12-17 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723586#comment-16723586
 ] 

Hequn Cheng commented on FLINK-11172:
-

Hi all, thanks for your discussion and suggestions.
 [~fhueske] I found bounded over will clean up state if retention time has been 
configured. Should we remove the retention logic for them? I created another 
jira to address the problem of bounded over. see FLINK-11188

> Remove the max retention time in StreamQueryConfig
> --
>
> Key: FLINK-11172
> URL: https://issues.apache.org/jira/browse/FLINK-11172
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.8.0
>Reporter: Yangze Guo
>Assignee: Yangze Guo
>Priority: Major
>
> [Stream Query 
> Config|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html]
>  is an important and useful feature to make a tradeoff between accuracy and 
> resource consumption when some query executed in unbounded streaming data. 
> This feature first proposed in 
> [FLINK-6491|https://issues.apache.org/jira/browse/FLINK-6491].
> At the first, *QueryConfig* take two parameters, i.e. 
> minIdleStateRetentionTime and maxIdleStateRetentionTime, to avoid to register 
> many timers if we have more freedom when to discard state. However, this 
> approach may cause new data expired earlier than old data and thus greater 
> accuracy loss appeared in some case. For example, we have an unbounded keyed 
> streaming data. We process key *_a_* in _*t0*_ and _*b*_ in _*t1,*_ *_t0 < 
> t1_*.  *_a_* will expired in _*a+maxIdleStateRetentionTime*_ while _*b*_ 
> expired in *_b+maxIdleStateRetentionTime_*. Now, another data with key *_a_* 
> arrived in _*t2 (t1 < t2)*_. But _*t2+minIdleStateRetentionTime*_ <  
> _*a+maxIdleStateRetentionTime*_. The state of key *_a_* will still be expired 
> in _*a+maxIdleStateRetentionTime*_ which is early than the state of key 
> _*b*_. According to the guideline of 
> [LRU|https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_recently_used_(LRU)]
>  that the element has been most heavily used in the past few instructions are 
> most likely to be used heavily in the next few instructions too. The state 
> with key _*a*_ should live longer than the state with key _*b*_. Current 
> approach against this idea.
> I think we now have a good chance to remove the maxIdleStateRetentionTime 
> argument in *StreamQueryConfig.* Below are my reasons.
>  * [FLINK-9423|https://issues.apache.org/jira/browse/FLINK-9423] implement 
> efficient deletes for heap-based timer service. We can leverage the deletion 
> op to mitigate the abuse of timer registration.
>  * Current approach can cause new data expired earlier than old data and thus 
> greater accuracy loss appeared in some case. Users need to fine-tune these 
> two parameter to avoid this scenario. Directly following the idea of LRU 
> looks like a better solution.
> So, I plan to remove maxIdleStateRetentionTime, update the expire time only 
> depends on  _*minIdleStateRetentionTime.*_
> cc to [~sunjincheng121], [~fhueske] 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11188) Bounded over should not enable state retention time

2018-12-17 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-11188:
---

 Summary: Bounded over should not enable state retention time 
 Key: FLINK-11188
 URL: https://issues.apache.org/jira/browse/FLINK-11188
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Hequn Cheng
Assignee: Hequn Cheng


As discussed in FLINK-11172, time-based operations (GROUP BY windows, OVER 
windows, time-windowed join, etc.) are inherently bound by time and 
automatically clean up their state. We should not add state cleanup or TTL for 
these operators.

If I understand correctly, we should not add the retention logic for 
rows-bounded operations either. I think we should disable state retention logic 
for:
 - ProcTimeBoundedRangeOver
 - ProcTimeBoundedRowsOver
 - RowTimeBoundedRangeOver
 - RowTimeBoundedRowsOver

Any suggestions are appreciated!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-11162:


Assignee: lining  (was: vinoyang)

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: lining
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723560#comment-16723560
 ] 

vinoyang commented on FLINK-11162:
--

[~lining] Done.

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: lining
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723559#comment-16723559
 ] 

lining edited comment on FLINK-11162 at 12/18/18 1:30 AM:
--

[~yanghua] ok, can you assign this to me.


was (Author: lining):
[~yanghua] ok, can you assign this to me.

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723559#comment-16723559
 ] 

lining edited comment on FLINK-11162 at 12/18/18 1:29 AM:
--

[~yanghua] ok, can you assign this to me.


was (Author: lining):
[~yanghua] ok

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723559#comment-16723559
 ] 

lining commented on FLINK-11162:


[~yanghua] ok

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11129) dashbord for job which contain important information

2018-12-17 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723557#comment-16723557
 ] 

lining commented on FLINK-11129:


[~till.rohrmann] [~Zentol]. Now, user need click many page, can we add dashbord 
for job.

> dashbord for job which contain important information
> 
>
> Key: FLINK-11129
> URL: https://issues.apache.org/jira/browse/FLINK-11129
> Project: Flink
>  Issue Type: Improvement
>  Components: REST, Webfrontend
>Reporter: lining
>Assignee: lining
>Priority: Major
>
> Now, to check job problem need clink many pages. Can we add one dashbord for 
> job which contains failover, subtask which in queue is 100%, taskmanagers 
> which have gc problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11129) dashbord for job which contain important information

2018-12-17 Thread lining (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining reassigned FLINK-11129:
--

Assignee: lining

> dashbord for job which contain important information
> 
>
> Key: FLINK-11129
> URL: https://issues.apache.org/jira/browse/FLINK-11129
> Project: Flink
>  Issue Type: Improvement
>  Components: REST, Webfrontend
>Reporter: lining
>Assignee: lining
>Priority: Major
>
> Now, to check job problem need clink many pages. Can we add one dashbord for 
> job which contains failover, subtask which in queue is 100%, taskmanagers 
> which have gc problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723553#comment-16723553
 ] 

vinoyang commented on FLINK-11162:
--

[~lining] Sounds good. I hope to see this PR as soon as possible.

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723551#comment-16723551
 ] 

lining edited comment on FLINK-11162 at 12/18/18 1:20 AM:
--

[~yanghua] we have done this. After the web refactor, we will push it. 


was (Author: lining):
[~yanghua] we have done this. After the web refactor, we will push it.

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-17 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723551#comment-16723551
 ] 

lining commented on FLINK-11162:


[~yanghua] we have done this. After the web refactor, we will push it.

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11103) Set a default uncaught exception handler

2018-12-17 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723547#comment-16723547
 ] 

vinoyang commented on FLINK-11103:
--

Oh, I see you mentioned FLINK-5232, and it mainly adds exception catching to 
the thread pool of the actor system's dispatcher. So, I thought this is the 
same as the issue. However, I think your consideration is reasonable. In 
addition to the actor system, TM still uses threads in many places.

> Set a default uncaught exception handler
> 
>
> Key: FLINK-11103
> URL: https://issues.apache.org/jira/browse/FLINK-11103
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.8.0
>Reporter: Nico Kruber
>Assignee: vinoyang
>Priority: Major
>
> We should set a default uncaught exception handler in Flink via 
> {{Thread.setDefaultUncaughtExceptionHandler()}} which at least logs the 
> exceptions. Ideally, we would even fail the job (could make this 
> configurable) but users may have some ill-behaving threads, e.g. through 
> libraries, which they would want to tolerate and we don't want to change 
> behaviour now.
> (FLINK-5232 added this for the JobManager, we need it for the TaskManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11146) Get rid of legacy codes from ClusterClient

2018-12-17 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723520#comment-16723520
 ] 

TisonKun commented on FLINK-11146:
--

Thanks for your explanation [~mxm]. Given JobClient is just a static helper 
class, I can bravely peel those codes as legacy logics. IIRC [~till.rohrmann] 
introduced an interface {{NewClusterClient}} and many code paths changed.

Do we still follow the designed structure on 
https://cwiki.apache.org/confluence/display/FLINK/Refactoring+of+client+classes+for+cluster+management
 ? If so, then the mainly job is to consummate {{RestClusterClient}}(to use a 
proper HTTP client) and to clean up legacy codes.

> Get rid of legacy codes from ClusterClient
> --
>
> Key: FLINK-11146
> URL: https://issues.apache.org/jira/browse/FLINK-11146
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.8.0
>
>
> As [~StephanEwen] mentioned in ML, the client needs big refactoring / 
> cleanup. It should use a proper HTTP client library to help with future 
> authentication mechanisms.
> After an investigation I notice that the valid cluster clients are only 
> {{MiniClusterClient}} and {{RestClusterClient}}. Legacy clients, 
> {{StandaloneClusterClient}} and {{YarnClusterClient}}, as well as pre-FLIP-6 
> codes inside {{ClusterClient}}, should be removed as part of FLINK-10392. 
> With this removal we arrive a clean stage where we can think how to implement 
> a proper HTTP client more comfortably.
> 1. {{StandaloneClusterClient}} is now depended on by 
> {{LegacyStandaloneClusterDescriptor}} (the removal is tracked by FLINK-10700) 
> and {{FlinkClient}}(part of flink-storm which is decided to be removed 
> FLINK-10571). Also relevant tests need to be ported(or directly removed).
> 2. The removal of {{YarnClusterClient}} should go along with FLINK-11106 
> Remove legacy flink-yarn component.
> 3. Testing classes inheriting from {{ClusterClient}} need to be ported(or 
> directly removed).
> 4. Get rid of legacy codes inside {{ClusterClient}} it self, such as 
> {{#run(JobGraph, ClassLoader)}}
> Besides, what is {{JobClient}} used for? I cannot find valid usages of it. 
> (Till mentioned it at ML 
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E)
> cc [~mxm] [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile ZooKeeperCompletedCheckpointStoreMockitoTest with JDK 9

2018-12-17 Thread GitBox
TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile 
ZooKeeperCompletedCheckpointStoreMockitoTest with JDK 9
URL: https://github.com/apache/flink/pull/7302#issuecomment-448048599
 
 
   @GJL for "It is not obvious why Comparator.comparing does not work.", I did 
a survey and
   
   it points to a call to static method of interface(`Comparator#comparing` 
here) while using previous version of ASM(of course mock framework hacking 
bytecodes).
   
   Groovy community has suffered this issue. FYI, 
https://issues.apache.org/jira/browse/GROOVY-8338 
https://issues.apache.org/jira/browse/GROOVY-8528


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #7315: [FLINK-11179][tests] add wait time for every record for testCancelSor…

2018-12-17 Thread GitBox
sunjincheng121 commented on a change in pull request #7315: 
[FLINK-11179][tests] add wait time for every record for testCancelSor…
URL: https://github.com/apache/flink/pull/7315#discussion_r242356994
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
 ##
 @@ -122,9 +122,11 @@ public void testCancelSortMatchWithHighparallelism() 
throws Exception {
 
private static final class SimpleMatcher implements 
JoinFunction, Tuple2, Tuple2> {
private static final long serialVersionUID = 1L;
+   private static final int WAIT_TIME_PER_RECORD = 300; // 0.3 sec.
 
 Review comment:
   Hi Thanks for the review @hequn8128 !
   If we using `DelayingMatcher ` then we do not need this test case anymore. 
So I think both add the wait_time and make delayingMatcher are can not achieve 
the purpose of testing。 Because wait_time and delayingMatcher only test the 
join phase,we want test the sore-merge phase. So we should make sore-merge 
phase take more time to achieve the purpose of testing. I got a better 
approach,and will update the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #7315: [FLINK-11179][tests] add wait time for every record for testCancelSor…

2018-12-17 Thread GitBox
sunjincheng121 commented on a change in pull request #7315: 
[FLINK-11179][tests] add wait time for every record for testCancelSor…
URL: https://github.com/apache/flink/pull/7315#discussion_r242356994
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
 ##
 @@ -122,9 +122,11 @@ public void testCancelSortMatchWithHighparallelism() 
throws Exception {
 
private static final class SimpleMatcher implements 
JoinFunction, Tuple2, Tuple2> {
private static final long serialVersionUID = 1L;
+   private static final int WAIT_TIME_PER_RECORD = 300; // 0.3 sec.
 
 Review comment:
   Hi Thanks for the review @hequn8128 !
   If we using `DelayingMatcher ` then we do not need this test case anymore. 
So I think both add the wait_time and make delayingMatcher are can not achieve 
the purpose of testing。 Because wait_time and delayingMatcher only test the 
join phase,we want test the sore-merge phase. So we should make sore-merge 
phase take more time to achieve the purpose of testing. I think a better 
approach,and will update the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #7315: [FLINK-11179][tests] add wait time for every record for testCancelSor…

2018-12-17 Thread GitBox
sunjincheng121 commented on a change in pull request #7315: 
[FLINK-11179][tests] add wait time for every record for testCancelSor…
URL: https://github.com/apache/flink/pull/7315#discussion_r242356994
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
 ##
 @@ -122,9 +122,11 @@ public void testCancelSortMatchWithHighparallelism() 
throws Exception {
 
private static final class SimpleMatcher implements 
JoinFunction, Tuple2, Tuple2> {
private static final long serialVersionUID = 1L;
+   private static final int WAIT_TIME_PER_RECORD = 300; // 0.3 sec.
 
 Review comment:
   Hi Thanks for the review @hequn8128 !
   I think both add the wait_time and make delayingMatcher are can not achieve 
the purpose of testing。 Because wait_time and delayingMatcher only test the 
join phase,we want test the sore-merge phase. So we should make sore-merge 
phase take more time to achieve the purpose of testing. I think a better 
approach,and will update the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-12-17 Thread GitBox
klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-448037747
 
 
   @azagrebin ,I've just updated the code, please help to review it when you 
have time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11187) StreamingFileSink with S3 backend transient socket timeout issues

2018-12-17 Thread Addison Higham (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723402#comment-16723402
 ] 

Addison Higham commented on FLINK-11187:


See 
[https://lists.apache.org/thread.html/87eb8e16abad09232da5fbc6999c19c4fba0f16d641c565e62096564@%3Cuser.flink.apache.org%3E]
 for the first discussion of this on the mailing list.

 

> StreamingFileSink with S3 backend transient socket timeout issues 
> --
>
> Key: FLINK-11187
> URL: https://issues.apache.org/jira/browse/FLINK-11187
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Streaming Connectors
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Addison Higham
>Assignee: Addison Higham
>Priority: Major
> Fix For: 1.7.2
>
>
> When using the StreamingFileSink with S3A backend, occasionally, errors like 
> this will occur:
> {noformat}
> Caused by: 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  Your socket connection to the server was not read from or written to within 
> the timeout period. Idle connections will be closed. (Service: Amazon S3; 
> Status Code: 400; Error Code: RequestTimeout; Request ID: xxx; S3 Extended 
> Request ID: xxx, S3 Extended Request ID: xxx
>    at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
>    at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>    at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056){noformat}
> This causes a restart of flink job, which is often able to recover from, but 
> under heavy load, this can become very frequent.
>  
> Turning on debug logs you can find the following relevant stack trace:
> {noformat}
> 2018-12-17 05:55:46,546 DEBUG 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: 
> failed to reset content inputstream before throwing up
> java.io.IOException: Resetting to invalid mark
>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>   at 
> 

[jira] [Created] (FLINK-11187) StreamingFileSink with S3 backend transient socket timeout issues

2018-12-17 Thread Addison Higham (JIRA)
Addison Higham created FLINK-11187:
--

 Summary: StreamingFileSink with S3 backend transient socket 
timeout issues 
 Key: FLINK-11187
 URL: https://issues.apache.org/jira/browse/FLINK-11187
 Project: Flink
  Issue Type: Bug
  Components: FileSystem, Streaming Connectors
Affects Versions: 1.7.0, 1.7.1
Reporter: Addison Higham
Assignee: Addison Higham
 Fix For: 1.7.2


When using the StreamingFileSink with S3A backend, occasionally, errors like 
this will occur:
{noformat}
Caused by: 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 Your socket connection to the server was not read from or written to within 
the timeout period. Idle connections will be closed. (Service: Amazon S3; 
Status Code: 400; Error Code: RequestTimeout; Request ID: xxx; S3 Extended 
Request ID: xxx, S3 Extended Request ID: xxx
   at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
   at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
   at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056){noformat}
This causes a restart of flink job, which is often able to recover from, but 
under heavy load, this can become very frequent.

 

Turning on debug logs you can find the following relevant stack trace:
{noformat}
2018-12-17 05:55:46,546 DEBUG 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: 
failed to reset content inputstream before throwing up
java.io.IOException: Resetting to invalid mark
  at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
  at 

[GitHub] tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-17 Thread GitBox
tweise commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r242281357
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   Yes. We want the ability to execute a job remotely regardless of the type of 
environment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-11171) Unexpected timestamp deserialization failure in RocksDB state backend

2018-12-17 Thread Sayat Satybaldiyev (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723279#comment-16723279
 ] 

Sayat Satybaldiyev edited comment on FLINK-11171 at 12/17/18 7:11 PM:
--

[~yunta] agree, the exception in job manager doesn't correlate with the 
serialization exception. I assume that due to checkpoint error, JM eventually 
deletes the blocks from HDFS and hence we got an error. Data nodes have enough 
disk space.


was (Author: sayatez):
[~yunta] agrees, the exception in job manager doesn't correlate with the 
serialization exception. I assume that due to checkpoint error, JM eventually 
deletes the blocks from HDFS and hence we got an error. Data nodes have enough 
disk space.

> Unexpected timestamp deserialization failure in RocksDB state backend
> -
>
> Key: FLINK-11171
> URL: https://issues.apache.org/jira/browse/FLINK-11171
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
>Reporter: Sayat Satybaldiyev
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We have a job that joins two data stream via Process function and using 
> ValueState TTL with RocksDB backends. The jobs constantly fail to checkpoint 
> due to timestamp serialization error.
> TTL state config
> {code:java}
> StateTtlConfig ttlConfig = StateTtlConfig
>  .newBuilder(Time.hours(recommendationRetentionHr))
>  .neverReturnExpired()
>  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>  .cleanupFullSnapshot()
>  .build();
> {code}
>  
> Error
>  
> {code:java}
> 2018-12-16 06:02:12,609 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 31 @ 1544940132568 for job 7825029dc256542aa312c0b68ecf0631.
>  2018-12-16 06:22:12,609 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 31 
> of job 7825029dc256542aa312c0b68ecf0631 expired before completing.
>  2018-12-16 06:22:12,637 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 32 @ 1544941332609 for job 7825029dc256542aa312c0b68ecf0631.
>  2018-12-16 06:22:12,899 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
> checkpoint 32 by task 176c8b3c3ff190d183415ab77b89344c of job 
> 7825029dc256542aa312c0b68ecf0631.
>  2018-12-16 06:22:12,900 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
> checkpoint 32 of job 7825029dc256542aa312c0b68ecf0631.
>  java.lang.Exception: Could not materialize checkpoint 32 for operator 
> joined-stream (1/6).
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp 
> deserialization failure
>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>  at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>  at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>  ... 5 more
>  Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp 
> deserialization failure
>  at 
> org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:94)
>  at 
> org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:79)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.filterOrTransform(RocksTransformingIteratorWrapper.java:70)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.seekToFirst(RocksTransformingIteratorWrapper.java:48)
>  at 
> 

[jira] [Commented] (FLINK-11171) Unexpected timestamp deserialization failure in RocksDB state backend

2018-12-17 Thread Sayat Satybaldiyev (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723279#comment-16723279
 ] 

Sayat Satybaldiyev commented on FLINK-11171:


[~yunta] agrees, the exception in job manager doesn't correlate with the 
serialization exception. I assume that due to checkpoint error, JM eventually 
deletes the blocks from HDFS and hence we got an error. Data nodes have enough 
disk space.

> Unexpected timestamp deserialization failure in RocksDB state backend
> -
>
> Key: FLINK-11171
> URL: https://issues.apache.org/jira/browse/FLINK-11171
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
>Reporter: Sayat Satybaldiyev
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We have a job that joins two data stream via Process function and using 
> ValueState TTL with RocksDB backends. The jobs constantly fail to checkpoint 
> due to timestamp serialization error.
> TTL state config
> {code:java}
> StateTtlConfig ttlConfig = StateTtlConfig
>  .newBuilder(Time.hours(recommendationRetentionHr))
>  .neverReturnExpired()
>  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>  .cleanupFullSnapshot()
>  .build();
> {code}
>  
> Error
>  
> {code:java}
> 2018-12-16 06:02:12,609 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 31 @ 1544940132568 for job 7825029dc256542aa312c0b68ecf0631.
>  2018-12-16 06:22:12,609 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 31 
> of job 7825029dc256542aa312c0b68ecf0631 expired before completing.
>  2018-12-16 06:22:12,637 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 32 @ 1544941332609 for job 7825029dc256542aa312c0b68ecf0631.
>  2018-12-16 06:22:12,899 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
> checkpoint 32 by task 176c8b3c3ff190d183415ab77b89344c of job 
> 7825029dc256542aa312c0b68ecf0631.
>  2018-12-16 06:22:12,900 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
> checkpoint 32 of job 7825029dc256542aa312c0b68ecf0631.
>  java.lang.Exception: Could not materialize checkpoint 32 for operator 
> joined-stream (1/6).
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp 
> deserialization failure
>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>  at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>  at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>  ... 5 more
>  Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp 
> deserialization failure
>  at 
> org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:94)
>  at 
> org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:79)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.filterOrTransform(RocksTransformingIteratorWrapper.java:70)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.seekToFirst(RocksTransformingIteratorWrapper.java:48)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.buildIteratorHeap(RocksStatesPerKeyGroupMergeIterator.java:128)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.(RocksStatesPerKeyGroupMergeIterator.java:68)
>  at 
> 

[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-17 Thread GitBox
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r242276315
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   Do we still need this new `static` method now that we have the constructor?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on issue #7258: [FLINK-11084]Throw a hard exception to remind developers while there's no stream node between two split transformation

2018-12-17 Thread GitBox
walterddr commented on issue #7258: [FLINK-11084]Throw a hard exception to 
remind developers while there's no stream node between two split transformation
URL: https://github.com/apache/flink/pull/7258#issuecomment-447930223
 
 
   Thanks for the PR @Clark . based on the JIRA discussion, should we also 
put a `@deprecate` tagging on the `.split` method as well? 
   
   I fully agree consecutive split are buggy features within proper 
safeguarding and also can be done much more elegantly using side output. Any 
other concerns that we want to keep the `splitTransform` feature?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on issue #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible

2018-12-17 Thread GitBox
walterddr commented on issue #7286: [FLINK-8739] [table] Optimize DISTINCT 
aggregates to use the same distinct accumulator if possible
URL: https://github.com/apache/flink/pull/7286#issuecomment-447920610
 
 
   @dianfu thanks for the explanation. I can see that #7253 has been merged. 
would you please update and rebase the test cases? :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11158) Do not show empty page for unknown jobs

2018-12-17 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723165#comment-16723165
 ] 

Nico Kruber commented on FLINK-11158:
-

Thanks for the update. I guess, it would be nice to have it visible and 
probably also not too much to change then.

> Do not show empty page for unknown jobs
> ---
>
> Key: FLINK-11158
> URL: https://issues.apache.org/jira/browse/FLINK-11158
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Nico Kruber
>Assignee: leesf
>Priority: Major
>
> If you try to access the Web UI using an old/non-existing job ID, e.g. after 
> a cluster restart, you are currently presented with a white page with no 
> further info. It should at least contain "job not found" message to indicate 
> that there was no other error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] klion26 commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-12-17 Thread GitBox
klion26 commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r241639512
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static 
org.apache.flink.runtime.concurrent.Executors.directExecutorService;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+class RocksDbStateDataTransfer {
+
+   static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws 
InterruptedException, IOException {
+
+   final ExecutorService executorService = 
createExecutorService(restoringThreadNum);
+
+   try {
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+   List> futures = new 
ArrayList<>(runnables.size());
+   for (Runnable runnable : runnables) {
+   
futures.add(CompletableFuture.runAsync(runnable, executorService));
+   }
+
+   FutureUtils.waitForAll(futures).get();
+   } catch (ExecutionException e) {
+   final Throwable throwable = 
ExceptionUtils.stripExecutionException(e);
+   if (throwable instanceof IOException) {
+   throw (IOException) throwable;
 
 Review comment:
   I followed 
[AbstractTaskManagerFileHandle](https://github.com/apache/flink/blob/951ae9a813cbf8a2db95519853f940277e4ea088/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java#L127)
 and 
[JobClient](https://github.com/apache/flink/blob/951ae9a813cbf8a2db95519853f940277e4ea088/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java#L445)
 to throw `IOException` after `ExceptionUtils.stripExecutionException(e)` here, 
please correct me if have to change the behavior.


This is an automated message from 

[GitHub] klion26 commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-12-17 Thread GitBox
klion26 commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r241949427
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static 
org.apache.flink.runtime.concurrent.Executors.directExecutorService;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+class RocksDbStateDataTransfer {
+
+   static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws 
InterruptedException, IOException {
+
+   final ExecutorService executorService = 
createExecutorService(restoringThreadNum);
+
+   try {
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+   List> futures = new 
ArrayList<>(runnables.size());
+   for (Runnable runnable : runnables) {
+   
futures.add(CompletableFuture.runAsync(runnable, executorService));
+   }
+
+   FutureUtils.waitForAll(futures).get();
+   } catch (ExecutionException e) {
+   final Throwable throwable = 
ExceptionUtils.stripExecutionException(e);
+   if (throwable instanceof IOException) {
+   throw (IOException) throwable;
 
 Review comment:
   Thank you for the explanation @azagrebin.
   I found `throw ExceptionUtils.stripExecutionException(e)` will throw 
`Throwable`, so we should add `throws Throwable` in the method signature.
   How about use the blow code
   ```
   try {
   ...
   } catch (ExecutionException e) {
   final Throwable throwable = ExceptionUtils.stripExecutionException(e);
   ExceptionUtils.rethrow(throwable);
   } finally {
   executorService.shutdownNow();
   }
   ```
   Because I found the `ExecutionException` will be thrown when executing the 
runnalbes, and the runnables were wrapped by `ThrowingRunnable.unchecked`, 
`ThrowingRunnable.unchecked` will 

[jira] [Comment Edited] (FLINK-11103) Set a default uncaught exception handler

2018-12-17 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723157#comment-16723157
 ] 

Nico Kruber edited comment on FLINK-11103 at 12/17/18 4:54 PM:
---

Maybe I did not look into FLINK-5232 thoroughly enough: I wasn't referring to 
any akka actor system, but instead any thread in Flink including those which 
can be spawned by users. Do you think, your commit also covers this?


was (Author: nicok):
Maybe I did not look into FLINK-5232 thoroughly enough: I wasn't referring to 
any akka actor system, but instead any thread in Flink including those which 
can be spawned by users. 

> Set a default uncaught exception handler
> 
>
> Key: FLINK-11103
> URL: https://issues.apache.org/jira/browse/FLINK-11103
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.8.0
>Reporter: Nico Kruber
>Assignee: vinoyang
>Priority: Major
>
> We should set a default uncaught exception handler in Flink via 
> {{Thread.setDefaultUncaughtExceptionHandler()}} which at least logs the 
> exceptions. Ideally, we would even fail the job (could make this 
> configurable) but users may have some ill-behaving threads, e.g. through 
> libraries, which they would want to tolerate and we don't want to change 
> behaviour now.
> (FLINK-5232 added this for the JobManager, we need it for the TaskManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11103) Set a default uncaught exception handler

2018-12-17 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723157#comment-16723157
 ] 

Nico Kruber commented on FLINK-11103:
-

Maybe I did not look into FLINK-5232 thoroughly enough: I wasn't referring to 
any akka actor system, but instead any thread in Flink including those which 
can be spawned by users. 

> Set a default uncaught exception handler
> 
>
> Key: FLINK-11103
> URL: https://issues.apache.org/jira/browse/FLINK-11103
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.8.0
>Reporter: Nico Kruber
>Assignee: vinoyang
>Priority: Major
>
> We should set a default uncaught exception handler in Flink via 
> {{Thread.setDefaultUncaughtExceptionHandler()}} which at least logs the 
> exceptions. Ideally, we would even fail the job (could make this 
> configurable) but users may have some ill-behaving threads, e.g. through 
> libraries, which they would want to tolerate and we don't want to change 
> behaviour now.
> (FLINK-5232 added this for the JobManager, we need it for the TaskManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9680) Reduce heartbeat timeout for E2E tests

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9680:
---
Fix Version/s: (was: 1.6.3)
   1.6.4

> Reduce heartbeat timeout for E2E tests
> --
>
> Key: FLINK-9680
> URL: https://issues.apache.org/jira/browse/FLINK-9680
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0
>
>
> Several end-to-end tests shoot down job- and taskmanagers and wait for them 
> to come back up before continuing the testing process.
> {{heartbeat.timeout}} controls how long a container has to be unreachable to 
> be considered lost. The default for this option is 50 seconds, causing 
> significant idle times during the tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11171) Unexpected timestamp deserialization failure in RocksDB state backend

2018-12-17 Thread Andrey Zagrebin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Zagrebin reassigned FLINK-11171:
---

Assignee: Andrey Zagrebin

> Unexpected timestamp deserialization failure in RocksDB state backend
> -
>
> Key: FLINK-11171
> URL: https://issues.apache.org/jira/browse/FLINK-11171
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
>Reporter: Sayat Satybaldiyev
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We have a job that joins two data stream via Process function and using 
> ValueState TTL with RocksDB backends. The jobs constantly fail to checkpoint 
> due to timestamp serialization error.
> TTL state config
> {code:java}
> StateTtlConfig ttlConfig = StateTtlConfig
>  .newBuilder(Time.hours(recommendationRetentionHr))
>  .neverReturnExpired()
>  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>  .cleanupFullSnapshot()
>  .build();
> {code}
>  
> Error
>  
> {code:java}
> 2018-12-16 06:02:12,609 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 31 @ 1544940132568 for job 7825029dc256542aa312c0b68ecf0631.
>  2018-12-16 06:22:12,609 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 31 
> of job 7825029dc256542aa312c0b68ecf0631 expired before completing.
>  2018-12-16 06:22:12,637 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 32 @ 1544941332609 for job 7825029dc256542aa312c0b68ecf0631.
>  2018-12-16 06:22:12,899 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
> checkpoint 32 by task 176c8b3c3ff190d183415ab77b89344c of job 
> 7825029dc256542aa312c0b68ecf0631.
>  2018-12-16 06:22:12,900 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
> checkpoint 32 of job 7825029dc256542aa312c0b68ecf0631.
>  java.lang.Exception: Could not materialize checkpoint 32 for operator 
> joined-stream (1/6).
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp 
> deserialization failure
>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>  at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>  at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>  ... 5 more
>  Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp 
> deserialization failure
>  at 
> org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:94)
>  at 
> org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:79)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.filterOrTransform(RocksTransformingIteratorWrapper.java:70)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.seekToFirst(RocksTransformingIteratorWrapper.java:48)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.buildIteratorHeap(RocksStatesPerKeyGroupMergeIterator.java:128)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.(RocksStatesPerKeyGroupMergeIterator.java:68)
>  at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:312)
>  at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
>  at 
> 

[GitHub] azagrebin opened a new pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2018-12-17 Thread GitBox
azagrebin opened a new pull request #7320: [FLINK-11171] Avoid concurrent usage 
of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320
 
 
   ## What is the purpose of the change
   
   StateSnapshotTransformer objects are now created when state is initialised. 
Afterwards, they are called in asynchronous part of checkpoint. However, they 
can be non-thread safe if accessed from multiple checkpoint operations.
   
   This PR changes backends to keep transformer factory after state init and 
create transformer right before iterating the state entries in the same thread.
   
   ## Brief change log
   
   Change backends to keep transformer factory and create thread-confined 
transformer.
   
   ## Verifying this change
   
   Unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11171) Unexpected timestamp deserialization failure in RocksDB state backend

2018-12-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11171:
---
Labels: pull-request-available  (was: )

> Unexpected timestamp deserialization failure in RocksDB state backend
> -
>
> Key: FLINK-11171
> URL: https://issues.apache.org/jira/browse/FLINK-11171
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
>Reporter: Sayat Satybaldiyev
>Priority: Major
>  Labels: pull-request-available
>
> We have a job that joins two data stream via Process function and using 
> ValueState TTL with RocksDB backends. The jobs constantly fail to checkpoint 
> due to timestamp serialization error.
> TTL state config
> {code:java}
> StateTtlConfig ttlConfig = StateTtlConfig
>  .newBuilder(Time.hours(recommendationRetentionHr))
>  .neverReturnExpired()
>  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>  .cleanupFullSnapshot()
>  .build();
> {code}
>  
> Error
>  
> {code:java}
> 2018-12-16 06:02:12,609 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 31 @ 1544940132568 for job 7825029dc256542aa312c0b68ecf0631.
>  2018-12-16 06:22:12,609 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 31 
> of job 7825029dc256542aa312c0b68ecf0631 expired before completing.
>  2018-12-16 06:22:12,637 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 32 @ 1544941332609 for job 7825029dc256542aa312c0b68ecf0631.
>  2018-12-16 06:22:12,899 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
> checkpoint 32 by task 176c8b3c3ff190d183415ab77b89344c of job 
> 7825029dc256542aa312c0b68ecf0631.
>  2018-12-16 06:22:12,900 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
> checkpoint 32 of job 7825029dc256542aa312c0b68ecf0631.
>  java.lang.Exception: Could not materialize checkpoint 32 for operator 
> joined-stream (1/6).
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp 
> deserialization failure
>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>  at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>  at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>  ... 5 more
>  Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp 
> deserialization failure
>  at 
> org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:94)
>  at 
> org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:79)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.filterOrTransform(RocksTransformingIteratorWrapper.java:70)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.seekToFirst(RocksTransformingIteratorWrapper.java:48)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.buildIteratorHeap(RocksStatesPerKeyGroupMergeIterator.java:128)
>  at 
> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.(RocksStatesPerKeyGroupMergeIterator.java:68)
>  at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:312)
>  at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
>  at 
> 

[jira] [Updated] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10292:

Fix Version/s: (was: 1.6.3)
   1.6.4

> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.8.0
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10251) Handle oversized response messages in AkkaRpcActor

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10251:

Fix Version/s: (was: 1.6.3)
   1.6.4

> Handle oversized response messages in AkkaRpcActor
> --
>
> Key: FLINK-10251
> URL: https://issues.apache.org/jira/browse/FLINK-10251
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>
> The {{AkkaRpcActor}} should check whether an RPC response which is sent to a 
> remote sender does not exceed the maximum framesize of the underlying 
> {{ActorSystem}}. If this is the case we should fail fast instead. We can 
> achieve this by serializing the response and sending the serialized byte 
> array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10721) kafkaFetcher runFetchLoop throw exception will cause follow-up code not execute in FlinkKafkaConsumerBase run method

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10721:

Fix Version/s: (was: 1.6.3)
   1.6.4

> kafkaFetcher runFetchLoop throw exception will cause follow-up code not 
> execute in FlinkKafkaConsumerBase run method 
> -
>
> Key: FLINK-10721
> URL: https://issues.apache.org/jira/browse/FLINK-10721
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.6.2
>Reporter: zhaoshijie
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>
> In FlinkKafkaConsumerBase run method on line 721(master branch), if 
> kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw 
> exception then finally execute cancel method, cancel method will execute 
> kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute 
> handover.close, then result in handover.pollNext throw ClosedException),then 
> next code will not execute,especially discoveryLoopError not be throwed,so, 
> real culprit exception will be Swallowed.
> failed log like this:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Shoud we modify it as follows?
> {code:java}
> try {
>   kafkaFetcher.runFetchLoop();
>   } catch (Exception e) {
>   // if discoveryLoopErrorRef not null ,we should 
> throw real culprit exception
>   if (discoveryLoopErrorRef.get() != null){
>   throw new 
> RuntimeException(discoveryLoopErrorRef.get());
>   } else {
>   throw e;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9812) SpanningRecordSerializationTest fails on travis

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9812:
---
Fix Version/s: (was: 1.6.3)
   1.6.4

> SpanningRecordSerializationTest fails on travis
> ---
>
> Key: FLINK-9812
> URL: https://issues.apache.org/jira/browse/FLINK-9812
> Project: Flink
>  Issue Type: Bug
>  Components: Network, Tests, Type Serialization System
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Nico Kruber
>Priority: Critical
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>
> https://travis-ci.org/zentol/flink/jobs/402744191
> {code}
> testHandleMixedLargeRecords(org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest)
>   Time elapsed: 6.113 sec  <<< ERROR!
> java.nio.channels.ClosedChannelException: null
>   at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
>   at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:199)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$SpanningWrapper.addNextChunkFromMemorySegment(SpillingAdaptiveSpanningRecordDeserializer.java:529)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$SpanningWrapper.access$200(SpillingAdaptiveSpanningRecordDeserializer.java:431)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.setNextBuffer(SpillingAdaptiveSpanningRecordDeserializer.java:76)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest.testSerializationRoundTrip(SpanningRecordSerializationTest.java:149)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest.testSerializationRoundTrip(SpanningRecordSerializationTest.java:115)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest.testHandleMixedLargeRecords(SpanningRecordSerializationTest.java:104)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11172) Remove the max retention time in StreamQueryConfig

2018-12-17 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723115#comment-16723115
 ] 

Fabian Hueske commented on FLINK-11172:
---

Hi, time-based operations (GROUP BY windows, OVER windows, time-windowed join, 
etc.) are inherently bound by time and automatically clean up their state. We 
should not add state cleanup or TTL for these operators.

Besides that, I agree to put this simplification on hold until we can use TTL 
for clean up.

> Remove the max retention time in StreamQueryConfig
> --
>
> Key: FLINK-11172
> URL: https://issues.apache.org/jira/browse/FLINK-11172
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.8.0
>Reporter: Yangze Guo
>Assignee: Yangze Guo
>Priority: Major
>
> [Stream Query 
> Config|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html]
>  is an important and useful feature to make a tradeoff between accuracy and 
> resource consumption when some query executed in unbounded streaming data. 
> This feature first proposed in 
> [FLINK-6491|https://issues.apache.org/jira/browse/FLINK-6491].
> At the first, *QueryConfig* take two parameters, i.e. 
> minIdleStateRetentionTime and maxIdleStateRetentionTime, to avoid to register 
> many timers if we have more freedom when to discard state. However, this 
> approach may cause new data expired earlier than old data and thus greater 
> accuracy loss appeared in some case. For example, we have an unbounded keyed 
> streaming data. We process key *_a_* in _*t0*_ and _*b*_ in _*t1,*_ *_t0 < 
> t1_*.  *_a_* will expired in _*a+maxIdleStateRetentionTime*_ while _*b*_ 
> expired in *_b+maxIdleStateRetentionTime_*. Now, another data with key *_a_* 
> arrived in _*t2 (t1 < t2)*_. But _*t2+minIdleStateRetentionTime*_ <  
> _*a+maxIdleStateRetentionTime*_. The state of key *_a_* will still be expired 
> in _*a+maxIdleStateRetentionTime*_ which is early than the state of key 
> _*b*_. According to the guideline of 
> [LRU|https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_recently_used_(LRU)]
>  that the element has been most heavily used in the past few instructions are 
> most likely to be used heavily in the next few instructions too. The state 
> with key _*a*_ should live longer than the state with key _*b*_. Current 
> approach against this idea.
> I think we now have a good chance to remove the maxIdleStateRetentionTime 
> argument in *StreamQueryConfig.* Below are my reasons.
>  * [FLINK-9423|https://issues.apache.org/jira/browse/FLINK-9423] implement 
> efficient deletes for heap-based timer service. We can leverage the deletion 
> op to mitigate the abuse of timer registration.
>  * Current approach can cause new data expired earlier than old data and thus 
> greater accuracy loss appeared in some case. Users need to fine-tune these 
> two parameter to avoid this scenario. Directly following the idea of LRU 
> looks like a better solution.
> So, I plan to remove maxIdleStateRetentionTime, update the expire time only 
> depends on  _*minIdleStateRetentionTime.*_
> cc to [~sunjincheng121], [~fhueske] 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9485) Improving Flink’s timer management for large state

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9485:
---
Fix Version/s: (was: 1.6.4)

> Improving Flink’s timer management for large state
> --
>
> Key: FLINK-9485
> URL: https://issues.apache.org/jira/browse/FLINK-9485
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Streaming
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> See 
> https://docs.google.com/document/d/1XbhJRbig5c5Ftd77d0mKND1bePyTC26Pz04EvxdA7Jc/edit#heading=h.17v0k3363r6q



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10079) Support external sink table in INSERT INTO statement

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10079:

Fix Version/s: (was: 1.6.3)
   1.6.4

> Support external sink table in INSERT INTO statement
> 
>
> Key: FLINK-10079
> URL: https://issues.apache.org/jira/browse/FLINK-10079
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Major
> Fix For: 1.6.4, 1.8.0
>
>
> In the documentation, there is a description:
> {quote}Once registered in a TableEnvironment, all tables defined in a 
> ExternalCatalog can be accessed from Table API or SQL queries by specifying 
> their full path, such as catalog.database.table.
> {quote}
> Currently, this is true only for source tables. For sink table (specified in 
> the Table API or SQL), the users have to explicitly register it even though 
> it is defined in a registered ExternalCatalog, otherwise "No table was 
> registered under the name XXX" TableException would be thrown.
> It would be better keep consistent between source table and sink table, and 
> the users would enjoy more convenient approach to inserting into sink tables. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9469) Add tests that cover PatternStream#flatSelect

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9469:
---
Fix Version/s: (was: 1.6.3)
   1.6.4

> Add tests that cover PatternStream#flatSelect
> -
>
> Key: FLINK-9469
> URL: https://issues.apache.org/jira/browse/FLINK-9469
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7991) Cleanup kafka example jar filters

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-7991:
---
Fix Version/s: (was: 1.6.3)
   1.6.4

> Cleanup kafka example jar filters
> -
>
> Key: FLINK-7991
> URL: https://issues.apache.org/jira/browse/FLINK-7991
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.6.4, 1.8.0
>
>
> The kafka example jar shading configuration includes the following files:
> {code}
> org/apache/flink/streaming/examples/kafka/**
> org/apache/flink/streaming/**
> org/apache/kafka/**
> org/apache/curator/**
> org/apache/zookeeper/**
> org/apache/jute/**
> org/I0Itec/**
> jline/**
> com/yammer/**
> kafka/**
> {code}
> Problems:
> * the inclusion of org.apache.flink.streaming causes large parts of the API 
> (and runtime...) to be included in the jar, along the _Twitter_ source and 
> *all* other examples
> * the yammer, jline, l0ltec, jute, zookeeper and curator inclusions are 
> ineffective; none of these classes show up in the example jar



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11151) FileUploadHandler stops working if the upload directory is removed

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-11151:

Fix Version/s: (was: 1.6.3)
   1.6.4

> FileUploadHandler stops working if the upload directory is removed
> --
>
> Key: FLINK-11151
> URL: https://issues.apache.org/jira/browse/FLINK-11151
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, REST
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> A user has reported on the ML that the FileUploadHandler does not accept any 
> files anymore if the upload directory was deleted after the cluster has been 
> started.
> A cursory glance at the code shows that it currently uses 
> {{Files.createDirectory(...)}} to create a temporary directory for the 
> current request to store uploaded files in.
> Changing this to use {{Files.createDirectories(...)}} instead should prevent 
> this from happening again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11023) Update LICENSE and NOTICE files for flink-connectors

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-11023:

Fix Version/s: (was: 1.6.3)
   1.6.4

> Update LICENSE and NOTICE files for flink-connectors
> 
>
> Key: FLINK-11023
> URL: https://issues.apache.org/jira/browse/FLINK-11023
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.7.2
>
>
> Similar to FLINK-10987 we should also update the {{LICENSE}} and {{NOTICE}} 
> files for {{flink-connectors}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9010) NoResourceAvailableException with FLIP-6

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9010:
---
Fix Version/s: (was: 1.6.3)
   1.6.4

> NoResourceAvailableException with FLIP-6 
> -
>
> Key: FLINK-9010
> URL: https://issues.apache.org/jira/browse/FLINK-9010
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: flip-6
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>
> I was trying to run a bigger program with 400 slots (100 TMs, 2 slots each) 
> with FLIP-6 mode and a checkpointing interval of 1000 and got the following 
> exception:
> {code}
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000101 - Remaining pending container 
> requests: 302
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000101 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,155 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml
>  to 
> hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml
> 2018-03-16 03:41:20,165 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Prepared local resource for modified yaml: resource { scheme: 
> "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: 
> "/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml"
>  } size: 595 timestamp: 1521171680164 type: FILE visibility: APPLICATION
> 2018-03-16 03:41:20,168 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Creating container launch context for TaskManagers
> 2018-03-16 03:41:20,168 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Starting TaskManagers with command: $JAVA_HOME/bin/java 
> -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m  
> -Dlog.file=/taskmanager.log 
> -Dlogback.configurationFile=file:./logback.xml 
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
> /taskmanager.out 2> /taskmanager.err
> 2018-03-16 03:41:20,176 INFO  
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
> Opening proxy : ip-172-31-3-221.eu-west-1.compute.internal:8041
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000102 - Remaining pending container 
> requests: 301
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000102 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,181 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
>  to 
> 

[jira] [Updated] (FLINK-9253) Make buffer count per InputGate always #channels*buffersPerChannel + ExclusiveBuffers

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9253:
---
Fix Version/s: (was: 1.6.3)
   1.6.4

> Make buffer count per InputGate always #channels*buffersPerChannel + 
> ExclusiveBuffers
> -
>
> Key: FLINK-9253
> URL: https://issues.apache.org/jira/browse/FLINK-9253
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>
> The credit-based flow control path assigns exclusive buffers only to remote 
> channels (which makes sense since local channels don't use any own buffers). 
> However, this is a bit intransparent with respect to how much data may be in 
> buffers since this depends on the actual schedule of the job and not the job 
> graph.
> By adapting the floating buffers to use a maximum of 
> {{#channels*buffersPerChannel + floatingBuffersPerGate - #exclusiveBuffers}}, 
> we would be channel-type agnostic and keep the old behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9228) log details about task fail/task manager is shutting down

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9228:
---
Fix Version/s: (was: 1.6.3)
   1.6.4

> log details about task fail/task manager is shutting down
> -
>
> Key: FLINK-9228
> URL: https://issues.apache.org/jira/browse/FLINK-9228
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Affects Versions: 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>
> condition:
> flink version:1.4.2
> jdk version:1.8.0.20
> linux version:3.10.0
> problem description:
> one of my task manager is out of the cluster and I checked its log found 
> something below: 
> 2018-04-19 22:34:47,441 INFO  org.apache.flink.runtime.taskmanager.Task       
>               
> - Attempting to fail task externally Process (115/120) 
> (19d0b0ce1ef3b8023b37bdfda643ef44). 
> 2018-04-19 22:34:47,441 INFO  org.apache.flink.runtime.taskmanager.Task       
>               
> - Process (115/120) (19d0b0ce1ef3b8023b37bdfda643ef44) switched from RUNNING 
> to FAILED. 
> java.lang.Exception: TaskManager is shutting down. 
>         at 
> org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala:220)
>  
>         at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) 
>         at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager.scala:121)
>  
>         at 
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>  
>         at 
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) 
>         at akka.actor.ActorCell.terminate(ActorCell.scala:374) 
>         at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) 
>         at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) 
>         at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) 
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) 
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224) 
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
>         at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
>         at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  
>         at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
>         at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  
> suggestion:
>  # short term suggestion:
>  ## log reasons why task tail?maybe received some event from job 
> manager/can't connect to job manager? operator exception? the more claritify 
> the better
>  ## log reasons why task manager is shutting down? received some event from 
> job manager/can't connect to job manager? operator exception can't be 
> recovery?
>  # long term suggestion:
>  ## define the state machine of flink node clearly. if nothing happens, the 
> node should stay what it used to be, which means if it is processing events, 
> if nothing happens, it should still processing events.or in other words, if 
> its state changes from processing event to cancel, then event happens.
>  ## define the events which can cause node state changed clearly. like use 
> cancel, operator exception, heart beat timeout etc
>  ## log the state change and event which cause state chaged clearly in logs
>  ## show event details(time, node, event, state changed etc) in webui



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11079) Update LICENSE and NOTICE files for flnk-storm-examples

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-11079:

Fix Version/s: (was: 1.6.3)
   1.6.4

> Update LICENSE and NOTICE files for flnk-storm-examples
> ---
>
> Key: FLINK-11079
> URL: https://issues.apache.org/jira/browse/FLINK-11079
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0
>
>
> Similar to FLINK-10987 we should also update the {{LICENSE}} and {{NOTICE}} 
> for {{flink-storm-examples}}.
>  
> This project creates several fat example jars that are deployed to maven 
> central.
> Alternatively we could about dropping these examples.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11107) [state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-11107:

Fix Version/s: (was: 1.6.3)
   1.6.4

> [state] Avoid memory stateBackend to create arbitrary folders under HA path 
> when no checkpoint path configured
> --
>
> Key: FLINK-11107
> URL: https://issues.apache.org/jira/browse/FLINK-11107
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.7.2
>
>
> Currently, memory state-backend would create a folder named with random UUID 
> under HA directory if no checkpoint path ever configured. (the code logic 
> locates within {{StateBackendLoader#fromApplicationOrConfigOrDefault}}) 
> However, the default memory state-backend would not only be created on JM 
> side, but also on each task manager's side, which means many folders with 
> random UUID would be created under HA directory. It would result in exception 
> like:
> {noformat}
> The directory item limit of /tmp/flink/ha is exceeded: limit=1048576 
> items=1048576{noformat}
>  If this happens, no new jobs could be submitted only if we clean up those 
> directories manually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10954) Hardlink from files of previous local stored state might cross devices

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10954:

Fix Version/s: (was: 1.6.3)
   1.6.4

> Hardlink from files of previous local stored state might cross devices
> --
>
> Key: FLINK-10954
> URL: https://issues.apache.org/jira/browse/FLINK-10954
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.2
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>
> Currently, local recovery's base directories is initialized from 
> '{{io.tmp.dirs}}' if parameter '{{taskmanager.state.local.root-dirs}}' is not 
> set. For Yarn environment, the tmp dirs is replaced by its '{{LOCAL_DIRS}}', 
> which might consist of directories from different devices, such as 
> /dump/1/nm-local-dir, /dump/2/nm-local-dir. The local directory for RocksDB 
> is initialized from IOManager's spillingDirectories, which might located in 
> different device from local recovery's folder. However, hard-link between 
> different devices is not allowed, it will throw exception below:
> {code:java}
> java.nio.file.FileSystemException: target -> souce: Invalid cross-device link
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11186) Support for event-time balancing for multiple Kafka partitions

2018-12-17 Thread Tom Schamberger (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Schamberger updated FLINK-11186:

Summary: Support for event-time balancing for multiple Kafka partitions  
(was: Support for event-time balancing for multiple Kafka comsumer partitions)

> Support for event-time balancing for multiple Kafka partitions
> --
>
> Key: FLINK-11186
> URL: https://issues.apache.org/jira/browse/FLINK-11186
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, Kafka Connector
>Reporter: Tom Schamberger
>Priority: Minor
>
> Currently, it is not possible with Flink to back-pressure individual Kafka 
> partitions, which are faster in terms of event-time. This leads to 
> unnecessary memory consumption and can lead to deadlocks in the case of 
> back-pressure.
> When multiple Kafka topics are consumed, succeeding event-time window 
> operators have to wait until the last Kafka partition has produced a 
> sufficient watermark to be triggered. If individual Kafka partitions differ 
> in read performance or the event-time of messages within partitions is not 
> monotonically distributed, this can lead to a situation, where 'fast' 
> partitions (event-time makes fast progress) outperform slower partitions 
> until back-pressuring prevents all partitions from being further consumed. 
> This leads to a deadlock of the application.
> I suggest, that windows should be able to back-pressure individual 
> partitions, which progress faster in terms of event-time, so that slow 
> partitions can keep up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10435) Client sporadically hangs after Ctrl + C

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10435:

Fix Version/s: (was: 1.6.3)
   1.6.4

> Client sporadically hangs after Ctrl + C
> 
>
> Key: FLINK-10435
> URL: https://issues.apache.org/jira/browse/FLINK-10435
> Project: Flink
>  Issue Type: Bug
>  Components: Client, YARN
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Gary Yao
>Priority: Major
> Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0
>
>
> When submitting a YARN job cluster in attached mode, the client hangs 
> indefinitely if Ctrl + C is pressed at the right time. One can recover from 
> this by sending SIGKILL.
> *Command to submit job*
> {code}
> HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster 
> examples/streaming/WordCount.jar
> {code}
>  
> *Output/Stacktrace*
> {code}
> [hadoop@ip-172-31-45-22 flink-1.5.4]$ HADOOP_CLASSPATH=`hadoop classpath` 
> bin/flink run -m yarn-cluster examples/streaming/WordCount.jar
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/hadoop/flink-1.5.4/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-09-26 12:01:04,241 INFO  org.apache.hadoop.yarn.client.RMProxy   
>   - Connecting to ResourceManager at 
> ip-172-31-45-22.eu-central-1.compute.internal/172.31.45.22:8032
> 2018-09-26 12:01:04,386 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-26 12:01:04,386 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-26 12:01:04,402 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-09-26 12:01:04,598 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster 
> specification: ClusterSpecification{masterMemoryMB=1024, 
> taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-09-26 12:01:04,972 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The 
> configuration directory ('/home/hadoop/flink-1.5.4/conf') contains both LOG4J 
> and Logback configuration files. Please delete or rename one of them.
> 2018-09-26 12:01:07,857 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Submitting 
> application master application_1537944258063_0017
> 2018-09-26 12:01:07,913 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
> application application_1537944258063_0017
> 2018-09-26 12:01:07,913 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for 
> the cluster to be allocated
> 2018-09-26 12:01:07,916 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying 
> cluster, current state ACCEPTED
> ^C2018-09-26 12:01:08,851 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cancelling 
> deployment from Deployment Failure Hook
> 2018-09-26 12:01:08,854 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Killing YARN 
> application
> 
>  The program finished with the following exception:
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn session cluster
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:410)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:258)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> 

[jira] [Updated] (FLINK-9879) Find sane defaults for (advanced) SSL session parameters

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9879:
---
Fix Version/s: (was: 1.6.3)
   1.6.4

> Find sane defaults for (advanced) SSL session parameters
> 
>
> Key: FLINK-9879
> URL: https://issues.apache.org/jira/browse/FLINK-9879
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.2, 1.6.0
>Reporter: Nico Kruber
>Priority: Major
> Fix For: 1.6.4
>
>
> After adding these configuration parameters with 
> https://issues.apache.org/jira/browse/FLINK-9878:
> - SSL session cache size
> - SSL session timeout
> - SSL handshake timeout
> - SSL close notify flush timeout
> We should try to find sane defaults that "just work" :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9749) Rework Bucketing Sink

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9749:
---
Fix Version/s: (was: 1.6.3)

> Rework Bucketing Sink
> -
>
> Key: FLINK-9749
> URL: https://issues.apache.org/jira/browse/FLINK-9749
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>
> The BucketingSink has a series of deficits at the moment.
> Due to the long list of issues, I would suggest to add a new 
> StreamingFileSink with a new and cleaner design
> h3. Encoders, Parquet, ORC
>  - It only efficiently supports row-wise data formats (avro, jso, sequence 
> files.
>  - Efforts to add (columnar) compression for blocks of data is inefficient, 
> because blocks cannot span checkpoints due to persistence-on-checkpoint.
>  - The encoders are part of the \{{flink-connector-filesystem project}}, 
> rather than in orthogonal formats projects. This blows up the dependencies of 
> the \{{flink-connector-filesystem project}} project. As an example, the 
> rolling file sink has dependencies on Hadoop and Avro, which messes up 
> dependency management.
> h3. Use of FileSystems
>  - The BucketingSink works only on Hadoop's FileSystem abstraction not 
> support Flink's own FileSystem abstraction and cannot work with the packaged 
> S3, maprfs, and swift file systems
>  - The sink hence needs Hadoop as a dependency
>  - The sink relies on "trying out" whether truncation works, which requires 
> write access to the users working directory
>  - The sink relies on enumerating and counting files, rather than maintaining 
> its own state, making less efficient
> h3. Correctness and Efficiency on S3
>  - The BucketingSink relies on strong consistency in the file enumeration, 
> hence may work incorrectly on S3.
>  - The BucketingSink relies on persisting streams at intermediate points. 
> This is not working properly on S3, hence there may be data loss on S3.
> h3. .valid-length companion file
>  - The valid length file makes it hard for consumers of the data and should 
> be dropped
> We track this design in a series of sub issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10317) Configure Metaspace size by default

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10317:

Fix Version/s: (was: 1.6.3)
   1.6.4

> Configure Metaspace size by default
> ---
>
> Key: FLINK-10317
> URL: https://issues.apache.org/jira/browse/FLINK-10317
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0
>
>
> We should set the size of the JVM Metaspace to a sane default, like  
> {{-XX:MaxMetaspaceSize=256m}}.
> If not set, the JVM offheap memory will grow indefinitely with repeated 
> classloading and Jitting, eventually exceeding allowed memory on docker/yarn 
> or similar setups.
> It is hard to come up with a good default, however, I believe the error 
> messages one gets when metaspace is too small are easy to understand (and 
> easy to take action), while it is very hard to figure out why the memory 
> footprint keeps growing steadily and infinitely.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9776) Interrupt TaskThread only while in User/Operator code

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9776:
---
Fix Version/s: (was: 1.6.3)
   1.6.4

> Interrupt TaskThread only while in User/Operator code
> -
>
> Key: FLINK-9776
> URL: https://issues.apache.org/jira/browse/FLINK-9776
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.8.0
>
>
> Upon cancellation, the task thread is periodically interrupted.
> This helps to pull the thread out of blocking operations in the user code.
> Once the thread leaves the user code, the repeated interrupts may interfere 
> with the shutdown cleanup logic, causing confusing exceptions.
> We should stop sending the periodic interrupts once the thread leaves the 
> user code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10241) Reduce performance/stability impact of latency metrics

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10241:

Fix Version/s: (was: 1.6.3)

> Reduce performance/stability impact of latency metrics
> --
>
> Key: FLINK-10241
> URL: https://issues.apache.org/jira/browse/FLINK-10241
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.6, 1.6.4, 1.8.0
>
>
> Umbrella issue for performance/stability improvements around the latency 
> metrics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9525) Missing META-INF/services/*FileSystemFactory in flink-hadoop-fs module

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9525:
---
Fix Version/s: (was: 1.6.3)
   1.6.4

> Missing META-INF/services/*FileSystemFactory in flink-hadoop-fs module
> --
>
> Key: FLINK-9525
> URL: https://issues.apache.org/jira/browse/FLINK-9525
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.4.0, 1.5.0, 1.6.0
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0
>
> Attachments: wx20180605-142...@2x.png
>
>
> if flink job dependencies includes `hadoop-common` and `hadoop-hdfs`, will 
> throw runtime error.
> like this case: 
> [https://stackoverflow.com/questions/47890596/java-util-serviceconfigurationerror-org-apache-hadoop-fs-filesystem-provider-o].
> the root cause: 
> see  {{org.apache.flink.core.fs.FileSystem}}
> This class will load all available file system factories via 
> {{ServiceLoader.load(FileSystemFactory.class)}}. 
> Since  {{ META-INF / services / org.apache.flink.core.fs.FileSystemFactory }} 
> file in the classpath does not have an 
> `org.apache.flink.runtime.fs.hdfs.HadoopFsFactory`, 
> and finaly only loaded one available {{LocalFileSystemFactory}} .
> more error messages see this screenshot.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9457) Cancel container requests when cancelling pending slot allocations

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9457:
---
Fix Version/s: (was: 1.6.3)
   1.6.4

> Cancel container requests when cancelling pending slot allocations
> --
>
> Key: FLINK-9457
> URL: https://issues.apache.org/jira/browse/FLINK-9457
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager, YARN
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0
>
>
> When cancelling a pending slot allocation request on the {{ResourceManager}}, 
> then we should also check whether we still need all the requested containers. 
> If it turns out that we no longer need them, then we should try to cancel the 
> unnecessary container requests. That way Flink will be, for example, a better 
> Yarn citizen which quickly releases resources and resource requests if no 
> longer needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8899) Submitting YARN job with FLIP-6 may lead to ApplicationAttemptNotFoundException

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8899:
---
Fix Version/s: (was: 1.6.3)
   1.6.4

> Submitting YARN job with FLIP-6 may lead to 
> ApplicationAttemptNotFoundException
> ---
>
> Key: FLINK-8899
> URL: https://issues.apache.org/jira/browse/FLINK-8899
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.6, 1.6.4, 1.7.2, 1.8.0
>
>
> Occasionally, running a simple word count as this
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> leads to an {{ApplicationAttemptNotFoundException}} in the logs:
> {code}
> 2018-03-08 16:18:08,507 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (df707a3c9817ddf5936efe56d427e2bd) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job df707a3c9817ddf5936efe56d427e2bd
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:18:08,536 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> df707a3c9817ddf5936efe56d427e2bd reached globally terminal state FINISHED.
> 2018-03-08 16:18:08,611 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(df707a3c9817ddf5936efe56d427e2bd).
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> dcfdc329d61aae0ace2de26292c8916b: JobManager is shutting down..
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-2-0.eu-west-1.compute.internal:38555/user/jobmanager_0
>  for job df707a3c9817ddf5936efe56d427e2bd from the resource manager.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:18:09,650 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager adc8090bdb3f7052943ff86bde7d2a7b at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Replacing old instance of worker for ResourceID 
> container_1519984124671_0090_01_05
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - 
> Unregister TaskManager adc8090bdb3f7052943ff86bde7d2a7b from the SlotManager.
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b975dbd16e0fd59c1168d978490a4b76 at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - The target with resource ID 
> container_1519984124671_0090_01_05 is already been monitored.
> 2018-03-08 16:18:09,992 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 73c258a0dbad236501b8391971c330ba at the SlotManager.
> 2018-03-08 16:18:10,000 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:18:10,028 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
> on heartbeat
> org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: 
> Application attempt appattempt_1519984124671_0090_01 doesn't exist in 
> ApplicationMasterService cache.
>   at 
> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:403)
>   at 
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
>   at 
> org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
>   at 
> 

[jira] [Updated] (FLINK-9142) Lower the minimum number of buffers for incoming channels to 1

2018-12-17 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9142:
---
Fix Version/s: (was: 1.6.3)
   1.6.4

> Lower the minimum number of buffers for incoming channels to 1
> --
>
> Key: FLINK-9142
> URL: https://issues.apache.org/jira/browse/FLINK-9142
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: boshu Zheng
>Priority: Major
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>
> Even if we make the floating buffers optional, we still require 
> {{taskmanager.network.memory.buffers-per-channel}} number of (exclusive) 
> buffers per incoming channel with credit-based flow control while without, 
> the minimum was 1 and only the maximum number of buffers was influenced by 
> this parameter.
> {{taskmanager.network.memory.buffers-per-channel}} is set to {{2}} by default 
> with the argumentation that this way we will have one buffer available for 
> netty to process while a worker thread is processing/deserializing the other 
> buffer. While this seems reasonable, it does increase our minimum 
> requirements. Instead, we could probably live with {{1}} exclusive buffer and 
> up to {{gate.getNumberOfInputChannels() * (networkBuffersPerChannel - 1) + 
> extraNetworkBuffersPerGate}} floating buffers. That way we will have the same 
> memory footprint as before with only slightly changed behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >