[GitHub] zentol closed pull request #6912: [FLINK-10657] TPCHQuery3 fail with IllegalAccessException
zentol closed pull request #6912: [FLINK-10657] TPCHQuery3 fail with IllegalAccessException URL: https://github.com/apache/flink/pull/6912 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java index 66213e33262..cf9f078367e 100644 --- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java @@ -238,7 +238,10 @@ public Long getShippriority() { } } - private static class ShippingPriorityItem extends Tuple4 { + /** +* ShippingPriorityItem. +*/ + public static class ShippingPriorityItem extends Tuple4 { public ShippingPriorityItem() {} This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10621) BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when running locally
[ https://issues.apache.org/jira/browse/FLINK-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-10621: - Affects Version/s: 1.7.0 > BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when > running locally > --- > > Key: FLINK-10621 > URL: https://issues.apache.org/jira/browse/FLINK-10621 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 > Environment: OS X, 4 cores, Java 1.8.0_181 >Reporter: Alexey Trenikhin >Assignee: Biao Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: threaddump-1540049768201.tdump > > > When running locally > BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > hangs forever (waited for 20 minutes, multiple attempts) > {noformat} > "main" #1 prio=5 os_prio=31 tid=0x7facaa801000 nid=0x1703 waiting on > condition [0x70218000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00074fee2370> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.clusterframework.BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) >Locked ownable synchronizers: > - None > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10621) BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when running locally
[ https://issues.apache.org/jira/browse/FLINK-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-10621: - Component/s: Tests > BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when > running locally > --- > > Key: FLINK-10621 > URL: https://issues.apache.org/jira/browse/FLINK-10621 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 > Environment: OS X, 4 cores, Java 1.8.0_181 >Reporter: Alexey Trenikhin >Assignee: Biao Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: threaddump-1540049768201.tdump > > > When running locally > BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > hangs forever (waited for 20 minutes, multiple attempts) > {noformat} > "main" #1 prio=5 os_prio=31 tid=0x7facaa801000 nid=0x1703 waiting on > condition [0x70218000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00074fee2370> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.clusterframework.BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) >Locked ownable synchronizers: > - None > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10621) BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when running locally
[ https://issues.apache.org/jira/browse/FLINK-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663339#comment-16663339 ] ASF GitHub Bot commented on FLINK-10621: zentol closed pull request #6925: [FLINK-10621][runtime] Fix unstable unit test case of BootstrapToolsTest URL: https://github.com/apache/flink/pull/6925 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java index c01f1aa19fe..805e97036bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java @@ -348,7 +348,7 @@ public void testConcurrentActorSystemCreation() throws Exception { "localhost", "0", LOG); - }))) + }), executorService)) .map( // terminate ActorSystems actorSystemFuture -> This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when > running locally > --- > > Key: FLINK-10621 > URL: https://issues.apache.org/jira/browse/FLINK-10621 > Project: Flink > Issue Type: Bug > Environment: OS X, 4 cores, Java 1.8.0_181 >Reporter: Alexey Trenikhin >Assignee: Biao Liu >Priority: Major > Labels: pull-request-available > Attachments: threaddump-1540049768201.tdump > > > When running locally > BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > hangs forever (waited for 20 minutes, multiple attempts) > {noformat} > "main" #1 prio=5 os_prio=31 tid=0x7facaa801000 nid=0x1703 waiting on > condition [0x70218000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00074fee2370> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.clusterframework.BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >
[jira] [Resolved] (FLINK-10621) BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when running locally
[ https://issues.apache.org/jira/browse/FLINK-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler resolved FLINK-10621. -- Resolution: Fixed Fix Version/s: 1.7.0 master: 2fe70a5c1256eed9781c74e48574da54d976b8cc > BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when > running locally > --- > > Key: FLINK-10621 > URL: https://issues.apache.org/jira/browse/FLINK-10621 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 > Environment: OS X, 4 cores, Java 1.8.0_181 >Reporter: Alexey Trenikhin >Assignee: Biao Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: threaddump-1540049768201.tdump > > > When running locally > BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > hangs forever (waited for 20 minutes, multiple attempts) > {noformat} > "main" #1 prio=5 os_prio=31 tid=0x7facaa801000 nid=0x1703 waiting on > condition [0x70218000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00074fee2370> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.clusterframework.BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) >Locked ownable synchronizers: > - None > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol closed pull request #6925: [FLINK-10621][runtime] Fix unstable unit test case of BootstrapToolsTest
zentol closed pull request #6925: [FLINK-10621][runtime] Fix unstable unit test case of BootstrapToolsTest URL: https://github.com/apache/flink/pull/6925 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java index c01f1aa19fe..805e97036bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java @@ -348,7 +348,7 @@ public void testConcurrentActorSystemCreation() throws Exception { "localhost", "0", LOG); - }))) + }), executorService)) .map( // terminate ActorSystems actorSystemFuture -> This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10674) DistinctAccumulator.remove lead to NPE
ambition created FLINK-10674: Summary: DistinctAccumulator.remove lead to NPE Key: FLINK-10674 URL: https://issues.apache.org/jira/browse/FLINK-10674 Project: Flink Issue Type: Bug Components: flink-contrib Affects Versions: 1.6.1 Environment: Flink 1.6.0 Reporter: ambition Attachments: image-2018-10-25-14-46-03-373.png Our online Flink Job run about a week,job contain sql : {code:java} select `time`, lower(trim(os_type)) as os_type, count(distinct feed_id) as feed_total_view from my_table group by `time`, lower(trim(os_type)){code} then occur NPE: {code:java} java.lang.NullPointerException at scala.Predef$.Long2long(Predef.scala:363) at org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109) at NonWindowedAggregationHelper$894.retract(Unknown Source) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) {code} View DistinctAccumulator.remove !image-2018-10-25-14-46-03-373.png! this NPE should currentCnt = null lead to, so we simple handle like : {code:java} def remove(params: Row): Boolean = { if(!distinctValueMap.contains(params)){ true }else{ val currentCnt = distinctValueMap.get(params) // if (currentCnt == null || currentCnt == 1) { distinctValueMap.remove(params) true } else { var value = currentCnt - 1L if(value < 0){ value = 1 } distinctValueMap.put(params, value) false } } }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhangxinyu1 commented on issue #6770: [FLINK-10002] [Webfrontend] WebUI shows jm/tm logs more friendly.
zhangxinyu1 commented on issue #6770: [FLINK-10002] [Webfrontend] WebUI shows jm/tm logs more friendly. URL: https://github.com/apache/flink/pull/6770#issuecomment-432905856 @ifndef-SleePy Thanks for your suggestions. I have modified the code. I removed the legacy part and refactor the implementation. About the test, I don't find any tests for the REST API of logs. Should I add tests for `JobManagerLogFileHandler`, `JobManagerLogListHandler`, `TaskManagerLogListHandler ` and `TaskManagerLogFileHandler `? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10002) WebUI shows logs unfriendly, especially when the amount of logs is large
[ https://issues.apache.org/jira/browse/FLINK-10002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663171#comment-16663171 ] ASF GitHub Bot commented on FLINK-10002: zhangxinyu1 commented on issue #6770: [FLINK-10002] [Webfrontend] WebUI shows jm/tm logs more friendly. URL: https://github.com/apache/flink/pull/6770#issuecomment-432905856 @ifndef-SleePy Thanks for your suggestions. I have modified the code. I removed the legacy part and refactor the implementation. About the test, I don't find any tests for the REST API of logs. Should I add tests for `JobManagerLogFileHandler`, `JobManagerLogListHandler`, `TaskManagerLogListHandler ` and `TaskManagerLogFileHandler `? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > WebUI shows logs unfriendly, especially when the amount of logs is large > > > Key: FLINK-10002 > URL: https://issues.apache.org/jira/browse/FLINK-10002 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: zhangxinyu >Assignee: zhangxinyu >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-09-10-11-38-07-973.png > > > When a streaming job run for a long time, the amount of logs may be very > large. The current WebUI shows all content of logs. It will cost much time to > download logs from task managers. and the browser cannot display the logs. > Therefore, I suggest that Flink uses DailyRollingAppender to split logs by > default, and task manager provides an API that can get logs based on a > parameter of time interval. In this way WebUI can display logs based on time > interval. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] ifndef-SleePy opened a new pull request #6925: [FLINK-10621][runtime] Fix unstable unit test case of BootstrapToolsTest
ifndef-SleePy opened a new pull request #6925: [FLINK-10621][runtime] Fix unstable unit test case of BootstrapToolsTest URL: https://github.com/apache/flink/pull/6925 ## What is the purpose of the change `BootstrapToolsTest.testConcurrentActorSystemCreation` is unstable. We forgot to pass the `ExecutorService` to `CompletableFuture.supplyAsync`, so the thread count depends on the machine the case is running on. If the thread count is less than 10, the case would hang forever. ## Brief change log * Pass the `ExecutorService` to `CompletableFuture.supplyAsync` ## Verifying this change * Verifying by unit test ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (on) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10621) BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when running locally
[ https://issues.apache.org/jira/browse/FLINK-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10621: --- Labels: pull-request-available (was: ) > BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when > running locally > --- > > Key: FLINK-10621 > URL: https://issues.apache.org/jira/browse/FLINK-10621 > Project: Flink > Issue Type: Bug > Environment: OS X, 4 cores, Java 1.8.0_181 >Reporter: Alexey Trenikhin >Assignee: Biao Liu >Priority: Major > Labels: pull-request-available > Attachments: threaddump-1540049768201.tdump > > > When running locally > BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > hangs forever (waited for 20 minutes, multiple attempts) > {noformat} > "main" #1 prio=5 os_prio=31 tid=0x7facaa801000 nid=0x1703 waiting on > condition [0x70218000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00074fee2370> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.clusterframework.BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) >Locked ownable synchronizers: > - None > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10621) BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when running locally
[ https://issues.apache.org/jira/browse/FLINK-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663160#comment-16663160 ] ASF GitHub Bot commented on FLINK-10621: ifndef-SleePy opened a new pull request #6925: [FLINK-10621][runtime] Fix unstable unit test case of BootstrapToolsTest URL: https://github.com/apache/flink/pull/6925 ## What is the purpose of the change `BootstrapToolsTest.testConcurrentActorSystemCreation` is unstable. We forgot to pass the `ExecutorService` to `CompletableFuture.supplyAsync`, so the thread count depends on the machine the case is running on. If the thread count is less than 10, the case would hang forever. ## Brief change log * Pass the `ExecutorService` to `CompletableFuture.supplyAsync` ## Verifying this change * Verifying by unit test ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (on) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when > running locally > --- > > Key: FLINK-10621 > URL: https://issues.apache.org/jira/browse/FLINK-10621 > Project: Flink > Issue Type: Bug > Environment: OS X, 4 cores, Java 1.8.0_181 >Reporter: Alexey Trenikhin >Assignee: Biao Liu >Priority: Major > Labels: pull-request-available > Attachments: threaddump-1540049768201.tdump > > > When running locally > BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > hangs forever (waited for 20 minutes, multiple attempts) > {noformat} > "main" #1 prio=5 os_prio=31 tid=0x7facaa801000 nid=0x1703 waiting on > condition [0x70218000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00074fee2370> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.clusterframework.BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.Par
[jira] [Assigned] (FLINK-10504) Decide actual parallelism based on available resources
[ https://issues.apache.org/jira/browse/FLINK-10504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] blues zheng reassigned FLINK-10504: --- Assignee: blues zheng > Decide actual parallelism based on available resources > -- > > Key: FLINK-10504 > URL: https://issues.apache.org/jira/browse/FLINK-10504 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: blues zheng >Priority: Major > Fix For: 1.7.0 > > > Check if a {{JobGraph}} can be scheduled with the available set of resources > (slots). If the minimum parallelism is fulfilled, then distribute the > available set of slots across all available slot sharing groups in order to > decide on the actual runtime parallelism. In the absence of minimum, target > and maximum parallelism, assume minimum = target = maximum = parallelism > defined in the {{JobGraph}}. > Ideally, we make the slot assignment strategy pluggable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9957) Rescale job with respect to available slots
[ https://issues.apache.org/jira/browse/FLINK-9957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] blues zheng reassigned FLINK-9957: -- Assignee: blues zheng > Rescale job with respect to available slots > --- > > Key: FLINK-9957 > URL: https://issues.apache.org/jira/browse/FLINK-9957 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Till Rohrmann >Assignee: blues zheng >Priority: Major > Fix For: 1.7.0 > > > The {{JobMaster}} which runs in the reactive container mode, needs to react > to additionally offered slots in order to make use of newly started > {{TaskExecutors}}. This could mean that the {{JobMaster}} tries to scale the > job wrt the available number of slots after some grace period. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10621) BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when running locally
[ https://issues.apache.org/jira/browse/FLINK-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663146#comment-16663146 ] Biao Liu commented on FLINK-10621: -- Thank you for reporting. [~alexeyt820] It's an unstable case. Would fix it asap. > BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when > running locally > --- > > Key: FLINK-10621 > URL: https://issues.apache.org/jira/browse/FLINK-10621 > Project: Flink > Issue Type: Bug > Environment: OS X, 4 cores, Java 1.8.0_181 >Reporter: Alexey Trenikhin >Assignee: Biao Liu >Priority: Major > Attachments: threaddump-1540049768201.tdump > > > When running locally > BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > hangs forever (waited for 20 minutes, multiple attempts) > {noformat} > "main" #1 prio=5 os_prio=31 tid=0x7facaa801000 nid=0x1703 waiting on > condition [0x70218000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00074fee2370> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.clusterframework.BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) >Locked ownable synchronizers: > - None > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10507) Set target parallelism to maximum when using the standalone job cluster mode
[ https://issues.apache.org/jira/browse/FLINK-10507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10507: Assignee: vinoyang > Set target parallelism to maximum when using the standalone job cluster mode > > > Key: FLINK-10507 > URL: https://issues.apache.org/jira/browse/FLINK-10507 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Fix For: 1.7.0 > > > In order to enable the reactive container mode, we should set the target > value to the maximum parallelism if we run in standalone job cluster mode. > That way, we will always use all available resources and scale up if new > resources are being added. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10505) Treat fail signal as scheduling event
[ https://issues.apache.org/jira/browse/FLINK-10505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10505: Assignee: vinoyang > Treat fail signal as scheduling event > - > > Key: FLINK-10505 > URL: https://issues.apache.org/jira/browse/FLINK-10505 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Fix For: 1.7.0 > > > Instead of simply calling into the {{RestartStrategy}} which restarts the > existing {{ExecutionGraph}} with the same parallelism, the > {{ExecutionGraphDriver}} should treat a recovery similar to the initial > scheduling operation. First, one needs to decide on the new parallelism of > the {{ExecutionGraph}} (scale up/scale down) wrt to the available set of > resources. Only if the minimum configuration is fulfilled, the potentially > rescaled {{ExecutionGraph}} will be restarted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10621) BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when running locally
[ https://issues.apache.org/jira/browse/FLINK-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Biao Liu reassigned FLINK-10621: Assignee: Biao Liu > BootstrapToolsTest.testConcurrentActorSystemCreation hangs forever when > running locally > --- > > Key: FLINK-10621 > URL: https://issues.apache.org/jira/browse/FLINK-10621 > Project: Flink > Issue Type: Bug > Environment: OS X, 4 cores, Java 1.8.0_181 >Reporter: Alexey Trenikhin >Assignee: Biao Liu >Priority: Major > Attachments: threaddump-1540049768201.tdump > > > When running locally > BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > hangs forever (waited for 20 minutes, multiple attempts) > {noformat} > "main" #1 prio=5 os_prio=31 tid=0x7facaa801000 nid=0x1703 waiting on > condition [0x70218000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00074fee2370> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.clusterframework.BootstrapToolsTest.testConcurrentActorSystemCreation(BootstrapToolsTest.java:358) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) >Locked ownable synchronizers: > - None > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10503) Periodically check for new resources
[ https://issues.apache.org/jira/browse/FLINK-10503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10503: --- Assignee: Shimin Yang > Periodically check for new resources > > > Key: FLINK-10503 > URL: https://issues.apache.org/jira/browse/FLINK-10503 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > In order to decide when to start scheduling or to rescale, we need to > periodically check for new resources (slots). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10501) Obtain resource overview of cluster
[ https://issues.apache.org/jira/browse/FLINK-10501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10501: --- Assignee: Shimin Yang > Obtain resource overview of cluster > --- > > Key: FLINK-10501 > URL: https://issues.apache.org/jira/browse/FLINK-10501 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > In order to decide with which parallelism to run, the > {{ExecutionGraphDriver}} needs to obtain an overview over all available > resources. This includes the resources managed by the {{SlotPool}} as well as > not yet allocated resources on the {{ResourceManager}}. This is a temporary > workaround until we adapted the slot allocation protocol to support resource > declaration. Once this is done, we will only take the {{SlotPool’s}} slots > into account. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase
[ https://issues.apache.org/jira/browse/FLINK-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663133#comment-16663133 ] ASF GitHub Bot commented on FLINK-10656: zhijiangW commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader… URL: https://github.com/apache/flink/pull/6911#discussion_r228020917 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/IterationReader.java ## @@ -21,34 +21,21 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.util.event.EventListener; -import java.io.IOException; - /** - * The basic API for every reader. + * Reader for iteration. */ -public interface ReaderBase { - - /** -* Returns whether the reader has consumed the input. -*/ - boolean isFinished(); - - // - // Task events - // - - void sendTaskEvent(TaskEvent event) throws IOException; - - void registerTaskEventListener(EventListener listener, Class eventType); - - // - // Iterations - // +public interface IterationReader { void setIterativeReader(); void startNextSuperstep(); boolean hasReachedEndOfSuperstep(); + /** +* Returns whether the reader has consumed the input. +*/ + boolean isFinished(); Review comment: Yes, I just confirmed it is actually only used in iteration, so all the current methods in `ReaderBase` are for iteration. If we change it to `IterationReader`, the corresponding `AbstractReader` should also be changed to `AbstractIterationReader`, because the current `AbstractReader` is also only for iteration. But I think it still seems a little strange to let `IterationReader` in the upper layer, because the current `Reader` interface which is used for common read extends the `IterationReader`. This relationship seems not make sense. If we have two independent reader base interfaces, and one is for iteration, the other is for common case. Then the specific record reader can implement both of interfaces for iteration scenario or only implement common reader interface for non-iteration scenario. So it may need reorganize the current relationships if we want to change this part. Easily change the name to `IterationReader` may bring literal confusing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase > -- > > Key: FLINK-10656 > URL: https://issues.apache.org/jira/browse/FLINK-10656 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0 > > > The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is > not very clean, the API in it are called only by iteration and handle event. > which is not related the name ReaderBase. And the functionality is > independent, so propose to change the name and split the interface to two > isolated interface. > More details please look at the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader…
zhijiangW commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader… URL: https://github.com/apache/flink/pull/6911#discussion_r228020917 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/IterationReader.java ## @@ -21,34 +21,21 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.util.event.EventListener; -import java.io.IOException; - /** - * The basic API for every reader. + * Reader for iteration. */ -public interface ReaderBase { - - /** -* Returns whether the reader has consumed the input. -*/ - boolean isFinished(); - - // - // Task events - // - - void sendTaskEvent(TaskEvent event) throws IOException; - - void registerTaskEventListener(EventListener listener, Class eventType); - - // - // Iterations - // +public interface IterationReader { void setIterativeReader(); void startNextSuperstep(); boolean hasReachedEndOfSuperstep(); + /** +* Returns whether the reader has consumed the input. +*/ + boolean isFinished(); Review comment: Yes, I just confirmed it is actually only used in iteration, so all the current methods in `ReaderBase` are for iteration. If we change it to `IterationReader`, the corresponding `AbstractReader` should also be changed to `AbstractIterationReader`, because the current `AbstractReader` is also only for iteration. But I think it still seems a little strange to let `IterationReader` in the upper layer, because the current `Reader` interface which is used for common read extends the `IterationReader`. This relationship seems not make sense. If we have two independent reader base interfaces, and one is for iteration, the other is for common case. Then the specific record reader can implement both of interfaces for iteration scenario or only implement common reader interface for non-iteration scenario. So it may need reorganize the current relationships if we want to change this part. Easily change the name to `IterationReader` may bring literal confusing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase
[ https://issues.apache.org/jira/browse/FLINK-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663127#comment-16663127 ] ASF GitHub Bot commented on FLINK-10656: zhijiangW commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader… URL: https://github.com/apache/flink/pull/6911#discussion_r228020917 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/IterationReader.java ## @@ -21,34 +21,21 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.util.event.EventListener; -import java.io.IOException; - /** - * The basic API for every reader. + * Reader for iteration. */ -public interface ReaderBase { - - /** -* Returns whether the reader has consumed the input. -*/ - boolean isFinished(); - - // - // Task events - // - - void sendTaskEvent(TaskEvent event) throws IOException; - - void registerTaskEventListener(EventListener listener, Class eventType); - - // - // Iterations - // +public interface IterationReader { void setIterativeReader(); void startNextSuperstep(); boolean hasReachedEndOfSuperstep(); + /** +* Returns whether the reader has consumed the input. +*/ + boolean isFinished(); Review comment: Yes, I just confirmed it is actually only used in iteration, so all the current methods in `ReaderBase` are for iteration. If we change it to `IterationReaderBase`, the corresponding `AbstractReader` should also be changed to `AbstractIterationReader`, because the current `AbstractReader` is also only for iteration. But I think it still seems a little strange to let `IterationReaderBase` in the upper layer, because the current `Reader` interface which is used for common read extends the `IterationReaderBase`. This relationship seems not make sense. If we have two independent reader base interfaces, and one is for iteration, the other is for common case. Then the specific record reader can implement both of interfaces for iteration scenario or only implement common reader interface for non-iteration scenario. So it may need reorganize the current relationships if we want to change this part. Easily change the name to `IterationReaderBase` may bring literal confusing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase > -- > > Key: FLINK-10656 > URL: https://issues.apache.org/jira/browse/FLINK-10656 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0 > > > The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is > not very clean, the API in it are called only by iteration and handle event. > which is not related the name ReaderBase. And the functionality is > independent, so propose to change the name and split the interface to two > isolated interface. > More details please look at the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader…
zhijiangW commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader… URL: https://github.com/apache/flink/pull/6911#discussion_r228020917 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/IterationReader.java ## @@ -21,34 +21,21 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.util.event.EventListener; -import java.io.IOException; - /** - * The basic API for every reader. + * Reader for iteration. */ -public interface ReaderBase { - - /** -* Returns whether the reader has consumed the input. -*/ - boolean isFinished(); - - // - // Task events - // - - void sendTaskEvent(TaskEvent event) throws IOException; - - void registerTaskEventListener(EventListener listener, Class eventType); - - // - // Iterations - // +public interface IterationReader { void setIterativeReader(); void startNextSuperstep(); boolean hasReachedEndOfSuperstep(); + /** +* Returns whether the reader has consumed the input. +*/ + boolean isFinished(); Review comment: Yes, I just confirmed it is actually only used in iteration, so all the current methods in `ReaderBase` are for iteration. If we change it to `IterationReaderBase`, the corresponding `AbstractReader` should also be changed to `AbstractIterationReader`, because the current `AbstractReader` is also only for iteration. But I think it still seems a little strange to let `IterationReaderBase` in the upper layer, because the current `Reader` interface which is used for common read extends the `IterationReaderBase`. This relationship seems not make sense. If we have two independent reader base interfaces, and one is for iteration, the other is for common case. Then the specific record reader can implement both of interfaces for iteration scenario or only implement common reader interface for non-iteration scenario. So it may need reorganize the current relationships if we want to change this part. Easily change the name to `IterationReaderBase` may bring literal confusing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10673) Table API / SQL UIDs not the only one
[ https://issues.apache.org/jira/browse/FLINK-10673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] winifredtang reassigned FLINK-10673: Assignee: winifredtang > Table API / SQL UIDs not the only one > - > > Key: FLINK-10673 > URL: https://issues.apache.org/jira/browse/FLINK-10673 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.5.4, 1.6.1 > Environment: flink 1.5.0 >Reporter: Fan weiwen >Assignee: winifredtang >Priority: Major > > a job have two sql > source is kafka > sink is redis or other sink > Asql > {code:java} > //代码占位符 > select > reqIp as factorContenta, > count(*) as eCount, > 60 * 60 as expire > from > kafka_source > where > uri is not null > group by > hop( > rowtime, > interval '2' second, > interval '60' minute > ), > reqIp > {code} > Bsql > {code:java} > //代码占位符 > select > uid as factorContentb, > count(*) as eCount, > 60 * 60 as expire > from > kafka_source > where > uri is not null > group by > hop( > rowtime, > interval '2' second, > interval '60' minute > ), > uid > {code} > now only start Asql stop Bsql sink have key 656.19.173.34 > then stop Asql and savepoint hdfs now del key 656.19.173.34( if sink is > kafka Don't delete) > start Bsql from savepoint > you will find sink have key 656.19.173.34 and 6630519 all exist > Bsql fetch Asql savepoint result > i think sql uids not the only one > Who can help me see this problem? > my test data is > {code:java} > //代码占位符 > { > "reqIp" : "656.19.173.34", > "rowtime" : 1537950912546, > "uid" : 6630519, > "uri" : "/web" > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10265) Configure checkpointing behavior for SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] winifredtang reassigned FLINK-10265: Assignee: winifredtang (was: vinoyang) > Configure checkpointing behavior for SQL Client > --- > > Key: FLINK-10265 > URL: https://issues.apache.org/jira/browse/FLINK-10265 > Project: Flink > Issue Type: New Feature > Components: SQL Client >Reporter: Timo Walther >Assignee: winifredtang >Priority: Major > > The SQL Client environment file should expose checkpointing related > properties: > - enable checkpointing > - checkpointing interval > - mode > - timeout > - etc. see {{org.apache.flink.streaming.api.environment.CheckpointConfig}} > Per-job selection of state backends and their configuration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10448) VALUES clause is translated into a separate operator per value
[ https://issues.apache.org/jira/browse/FLINK-10448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] winifredtang reassigned FLINK-10448: Assignee: winifredtang (was: vinoyang) > VALUES clause is translated into a separate operator per value > -- > > Key: FLINK-10448 > URL: https://issues.apache.org/jira/browse/FLINK-10448 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.1 >Reporter: Timo Walther >Assignee: winifredtang >Priority: Major > > It seems that a SQL VALUES clause uses one operator per value under certain > conditions which leads to a complicated job graph. Given that we need to > compile code for every operator in the open method and have other overhead as > well, this looks inefficient to me. > For example, the following query creates and unions 6 operators together: > {code} > SELECT * > FROM ( > VALUES > (1, 'Bob', CAST(0 AS BIGINT)), > (22, 'Alice', CAST(0 AS BIGINT)), > (42, 'Greg', CAST(0 AS BIGINT)), > (42, 'Greg', CAST(0 AS BIGINT)), > (42, 'Greg', CAST(0 AS BIGINT)), > (1, 'Bob', CAST(0 AS BIGINT))) > AS UserCountTable(user_id, user_name, user_count) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors
[ https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663102#comment-16663102 ] ASF GitHub Bot commented on FLINK-10600: yanghua opened a new pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors URL: https://github.com/apache/flink/pull/6924 ## What is the purpose of the change *This pull request provides End-to-end test cases for modern Kafka connectors* ## Brief change log - *Provide End-to-end test cases for modern Kafka connectors* ## Verifying this change This change added tests and can be verified as follows: - *Added end-to-end integration tests for modern kafka connector * ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide End-to-end test cases for modern Kafka connectors > - > > Key: FLINK-10600 > URL: https://issues.apache.org/jira/browse/FLINK-10600 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors
[ https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10600: --- Labels: pull-request-available (was: ) > Provide End-to-end test cases for modern Kafka connectors > - > > Key: FLINK-10600 > URL: https://issues.apache.org/jira/browse/FLINK-10600 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua opened a new pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors
yanghua opened a new pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors URL: https://github.com/apache/flink/pull/6924 ## What is the purpose of the change *This pull request provides End-to-end test cases for modern Kafka connectors* ## Brief change log - *Provide End-to-end test cases for modern Kafka connectors* ## Verifying this change This change added tests and can be verified as follows: - *Added end-to-end integration tests for modern kafka connector * ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-5832) Support for simple hive UDF
[ https://issues.apache.org/jira/browse/FLINK-5832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] winifredtang reassigned FLINK-5832: --- Assignee: winifredtang (was: vinoyang) > Support for simple hive UDF > --- > > Key: FLINK-5832 > URL: https://issues.apache.org/jira/browse/FLINK-5832 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: winifredtang >Priority: Major > Labels: pull-request-available > > The first step of FLINK-5802 is to support simple Hive UDF. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-5802) Flink SQL calling Hive User-Defined Functions
[ https://issues.apache.org/jira/browse/FLINK-5802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] winifredtang reassigned FLINK-5802: --- Assignee: winifredtang > Flink SQL calling Hive User-Defined Functions > - > > Key: FLINK-5802 > URL: https://issues.apache.org/jira/browse/FLINK-5802 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: winifredtang >Priority: Major > Labels: features > > It's important to call hive udf in Flink SQL. A great many udfs were written > in hive since last ten years. > It's really important to reuse the hive udfs. This feature will reduce the > cost of migration and bring more users to flink. > Spark SQL has already supported this function. > https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.0/bk_spark-guide/content/calling-udfs.html > The Hive UDFs here include both built-in UDFs and customized UDFs. As many > business logic had been written in UDFs, the customized UDFs are more > important than the built-in UDFs. > Generally, there are three kinds of UDFs in Hive: UDF, UDTF and UDAF. > Here is the document of the Spark SQL: > http://spark.apache.org/docs/latest/sql-programming-guide.html#compatibility-with-apache-hive > > Spark code: > https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala > https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10673) Table API / SQL UIDs not the only one
Fan weiwen created FLINK-10673: -- Summary: Table API / SQL UIDs not the only one Key: FLINK-10673 URL: https://issues.apache.org/jira/browse/FLINK-10673 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.6.1, 1.5.4 Environment: flink 1.5.0 Reporter: Fan weiwen a job have two sql source is kafka sink is redis or other sink Asql {code:java} //代码占位符 select reqIp as factorContenta, count(*) as eCount, 60 * 60 as expire from kafka_source where uri is not null group by hop( rowtime, interval '2' second, interval '60' minute ), reqIp {code} Bsql {code:java} //代码占位符 select uid as factorContentb, count(*) as eCount, 60 * 60 as expire from kafka_source where uri is not null group by hop( rowtime, interval '2' second, interval '60' minute ), uid {code} now only start Asql stop Bsql sink have key 656.19.173.34 then stop Asql and savepoint hdfs now del key 656.19.173.34( if sink is kafka Don't delete) start Bsql from savepoint you will find sink have key 656.19.173.34 and 6630519 all exist Bsql fetch Asql savepoint result i think sql uids not the only one Who can help me see this problem? my test data is {code:java} //代码占位符 { "reqIp" : "656.19.173.34", "rowtime" : 1537950912546, "uid" : 6630519, "uri" : "/web" } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5833) Support for Hive GenericUDF
[ https://issues.apache.org/jira/browse/FLINK-5833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663093#comment-16663093 ] vinoyang commented on FLINK-5833: - Hi [~clarkyzl] This issue has not been active for a long time. Are you still maintaining it now? If not, then I am happy to take over it. > Support for Hive GenericUDF > --- > > Key: FLINK-5833 > URL: https://issues.apache.org/jira/browse/FLINK-5833 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang >Priority: Major > > The second step of FLINK-5802 is to support Hive's GenericUDF. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-5832) Support for simple hive UDF
[ https://issues.apache.org/jira/browse/FLINK-5832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-5832: --- Assignee: vinoyang > Support for simple hive UDF > --- > > Key: FLINK-5832 > URL: https://issues.apache.org/jira/browse/FLINK-5832 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The first step of FLINK-5802 is to support simple Hive UDF. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10672) Task stuck while writing output to flink
[ https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ankur Goenka updated FLINK-10672: - Description: I am running a fairly complex pipleline with 200+ task. The pipeline works fine with small data (order of 10kb input) but gets stuck with a slightly larger data (300kb input). The task gets stuck while writing the output toFlink, more specifically it gets stuck while requesting memory segment in local buffer pool. The Task manager UI shows that it has enough memory and memory segments to work with. The relevant stack trace is {quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9] java.lang.Thread.State: TIMED_WAITING (on object monitor) at (C/C++) 0x7fef201c7dae (Unknown Source) at (C/C++) 0x7fef1f2aea07 (Unknown Source) at (C/C++) 0x7fef1f241cd3 (Unknown Source) at java.lang.Object.wait(Native Method) - waiting on <0xf6d56450> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247) - locked <0xf6d56450> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230) - locked <0xf6a60bd0> (a java.lang.Object) at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81) at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32) at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139) at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125) at org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248) at org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) at org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748){quote} The full stack trace and logs are attached. Please take a look and let me know if more information is needed. was: I am running a fairly complex pipleline with 200+ task. The pipeline works fine with small data (order of 10kb input) but gets stuck with a slightly larger data (300kb input). The task gets stuck while writing the output toFlink, more specifically it gets stuck while requesting memory segment in local buffer pool. The Task manager UI shows that it has enough memory and memory segments to work with. The relevant stack trace is bq. "grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9] bq.java.lang.Thread.State: TIMED_WAITING (on object monitor) bq. at (C/C++) 0x7fef201c7dae (Unknown Source) bq. at (C/C++) 0x7fef1f2aea0
[jira] [Updated] (FLINK-10672) Task stuck while writing output to flink
[ https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ankur Goenka updated FLINK-10672: - Description: I am running a fairly complex pipleline with 200+ task. The pipeline works fine with small data (order of 10kb input) but gets stuck with a slightly larger data (300kb input). The task gets stuck while writing the output toFlink, more specifically it gets stuck while requesting memory segment in local buffer pool. The Task manager UI shows that it has enough memory and memory segments to work with. The relevant stack trace is bq. "grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9] bq.java.lang.Thread.State: TIMED_WAITING (on object monitor) bq. at (C/C++) 0x7fef201c7dae (Unknown Source) bq. at (C/C++) 0x7fef1f2aea07 (Unknown Source) bq. at (C/C++) 0x7fef1f241cd3 (Unknown Source) bq. at java.lang.Object.wait(Native Method) bq. - waiting on <0xf6d56450> (a java.util.ArrayDeque) bq. at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247) bq. - locked <0xf6d56450> (a java.util.ArrayDeque) bq. at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204) bq. at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) bq. at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) bq. at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) bq. at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) bq. at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) bq. at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42) bq. at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26) bq. at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) bq. at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) bq. at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230) bq. - locked <0xf6a60bd0> (a java.lang.Object) bq. at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81) bq. at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32) bq. at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139) bq. at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125) bq. at org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248) bq. at org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) bq. at org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) bq. at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263) bq. at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683) bq. at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) bq. at org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) bq. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) bq. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) bq. at java.lang.Thread.run(Thread.java:748) bq. bq. The full stack trace and logs are attached. Please take a look and let me know if more information is needed. was: I am running a fairly complex pipleline with 200+ task. The pipeline works fine with small data (order of 10kb input) but gets stuck with a slightly larger data (300kb input). The task gets stuck while writing the output toFlink, more specifically it gets stuck while requesting memory segment in local buffer pool. The Task manager UI shows that it has enough
[jira] [Updated] (FLINK-10672) Task stuck while writing output to flink
[ https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ankur Goenka updated FLINK-10672: - Description: I am running a fairly complex pipleline with 200+ task. The pipeline works fine with small data (order of 10kb input) but gets stuck with a slightly larger data (300kb input). The task gets stuck while writing the output toFlink, more specifically it gets stuck while requesting memory segment in local buffer pool. The Task manager UI shows that it has enough memory and memory segments to work with. The relevant stack trace is {quote} "grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9] java.lang.Thread.State: TIMED_WAITING (on object monitor) at (C/C++) 0x7fef201c7dae (Unknown Source) at (C/C++) 0x7fef1f2aea07 (Unknown Source) at (C/C++) 0x7fef1f241cd3 (Unknown Source) at java.lang.Object.wait(Native Method) - waiting on <0xf6d56450> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247) - locked <0xf6d56450> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230) - locked <0xf6a60bd0> (a java.lang.Object) at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81) at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32) at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139) at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125) at org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248) at org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) at org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748){quote} The full stack trace and logs are attached. Please take a look and let me know if more information is needed. was: I am running a fairly complex pipleline with 200+ task. The pipeline works fine with small data (order of 10kb input) but gets stuck with a slightly larger data (300kb input). The task gets stuck while writing the output toFlink, more specifically it gets stuck while requesting memory segment in local buffer pool. The Task manager UI shows that it has enough memory and memory segments to work with. The relevant stack trace is "grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=
[jira] [Updated] (FLINK-10672) Task stuck while writing output to flink
[ https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ankur Goenka updated FLINK-10672: - Description: I am running a fairly complex pipleline with 200+ task. The pipeline works fine with small data (order of 10kb input) but gets stuck with a slightly larger data (300kb input). The task gets stuck while writing the output toFlink, more specifically it gets stuck while requesting memory segment in local buffer pool. The Task manager UI shows that it has enough memory and memory segments to work with. The relevant stack trace is "grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9] java.lang.Thread.State: TIMED_WAITING (on object monitor) at (C/C++) 0x7fef201c7dae (Unknown Source) at (C/C++) 0x7fef1f2aea07 (Unknown Source) at (C/C++) 0x7fef1f241cd3 (Unknown Source) at java.lang.Object.wait(Native Method) - waiting on <0xf6d56450> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247) - locked <0xf6d56450> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230) - locked <0xf6a60bd0> (a java.lang.Object) at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81) at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32) at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139) at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125) at org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248) at org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) at org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) The full stack trace and logs are attached. Please take a look and let me know if more information is needed. was: I am running a fairly complex pipleline with 200+ task. The pipeline works fine with small data (order of 10kb input) but gets stuck with a slightly larger data (300kb input). The task gets stuck while writing the output toFlink, more specifically it gets stuck while requesting memory segment in local buffer pool. The Task manager UI shows that it has enough memory and memory segments to work with. The relevant stack trace is "grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163
[jira] [Updated] (FLINK-10672) Task stuck while writing output to flink
[ https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ankur Goenka updated FLINK-10672: - Description: I am running a fairly complex pipleline with 200+ task. The pipeline works fine with small data (order of 10kb input) but gets stuck with a slightly larger data (300kb input). The task gets stuck while writing the output toFlink, more specifically it gets stuck while requesting memory segment in local buffer pool. The Task manager UI shows that it has enough memory and memory segments to work with. The relevant stack trace is "grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9] java.lang.Thread.State: TIMED_WAITING (on object monitor) at (C/C++) 0x7fef201c7dae (Unknown Source) at (C/C++) 0x7fef1f2aea07 (Unknown Source) at (C/C++) 0x7fef1f241cd3 (Unknown Source) at java.lang.Object.wait(Native Method) - waiting on <0xf6d56450> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247) - locked <0xf6d56450> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230) - locked <0xf6a60bd0> (a java.lang.Object) at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81) at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32) at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139) at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125) at org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248) at org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) at org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) The full stack trace and logs are attached. Please take a look and let me know if more information is needed. was: I am running a fairly complex pipleline with 200+ task. The pipeline works fine with small data (order of 10kb input) but gets stuck with a slightly larger data (300kb input). The task gets stuck while writing the output toFlink, more specifically it gets stuck while requesting memory segment in local buffer pool. The Task manager UI shows that it has enough memory and memory segments to work with. The relevant stack trace is {{"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]}} {{ java.lang.Thread.State: TIMED_WAITING (on object monitor)}} {{ at (C/C++) 0x7fef201c7dae (Unknown Source)}} {{ at (C/C++) 0x7fef1f2aea07 (Unknown Source)}} {{ at (C/
[jira] [Commented] (FLINK-10672) Task stuck while writing output to flink
[ https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662937#comment-16662937 ] Ankur Goenka commented on FLINK-10672: -- cc: [~mxm] [~robertwb] > Task stuck while writing output to flink > > > Key: FLINK-10672 > URL: https://issues.apache.org/jira/browse/FLINK-10672 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.4 > Environment: OS: Debuan rodente 4.17 > Flink version: 1.5.4 > ||Key||Value|| > |jobmanager.heap.mb|1024| > |jobmanager.rpc.address|localhost| > |jobmanager.rpc.port|6123| > |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter| > |metrics.reporter.jmx.port|9250-9260| > |metrics.reporters|jmx| > |parallelism.default|1| > |rest.port|8081| > |taskmanager.heap.mb|1024| > |taskmanager.numberOfTaskSlots|1| > |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26| > > h1. Overview > ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap > Size||Flink Managed Memory|| > |43501|1|0|12|62.9 GB|922 MB|642 MB| > h1. Memory > h2. JVM (Heap/Non-Heap) > ||Type||Committed||Used||Maximum|| > |Heap|922 MB|575 MB|922 MB| > |Non-Heap|68.8 MB|64.3 MB|-1 B| > |Total|991 MB|639 MB|922 MB| > h2. Outside JVM > ||Type||Count||Used||Capacity|| > |Direct|3,292|105 MB|105 MB| > |Mapped|0|0 B|0 B| > h1. Network > h2. Memory Segments > ||Type||Count|| > |Available|3,194| > |Total|3,278| > h1. Garbage Collection > ||Collector||Count||Time|| > |G1_Young_Generation|13|336| > |G1_Old_Generation|1|21| >Reporter: Ankur Goenka >Priority: Major > Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, > jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, > jstack_66985.log > > > I am running a fairly complex pipleline with 200+ task. > The pipeline works fine with small data (order of 10kb input) but gets stuck > with a slightly larger data (300kb input). > > The task gets stuck while writing the output toFlink, more specifically it > gets stuck while requesting memory segment in local buffer pool. The Task > manager UI shows that it has enough memory and memory segments to work with. > The relevant stack trace is > {{"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 > tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]}} > {{ java.lang.Thread.State: TIMED_WAITING (on object monitor)}} > {{ at (C/C++) 0x7fef201c7dae (Unknown Source)}} > {{ at (C/C++) 0x7fef1f2aea07 (Unknown Source)}} > {{ at (C/C++) 0x7fef1f241cd3 (Unknown Source)}} > {{ at java.lang.Object.wait(Native Method)}} > {{ - waiting on <0xf6d56450> (a java.util.ArrayDeque)}} > {{ at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)}} > {{ - locked <0xf6d56450> (a java.util.ArrayDeque)}} > {{ at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)}} > {{ at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)}} > {{ at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)}} > {{ at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)}} > {{ at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)}} > {{ at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)}} > {{ at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)}} > {{ at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)}} > {{ at > org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)}} > {{ at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)}} > {{ at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)}} > {{ - locked <0xf6a60bd0> (a java.lang.Object)}} > {{ at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)}} > {{ at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)}} > {{ at > org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)}} > {{ at > org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)}} > {{ at > org.apache.beam.vendor.grpc.v1.io.grpc
[jira] [Created] (FLINK-10672) Task stuck while writing output to flink
Ankur Goenka created FLINK-10672: Summary: Task stuck while writing output to flink Key: FLINK-10672 URL: https://issues.apache.org/jira/browse/FLINK-10672 Project: Flink Issue Type: Bug Affects Versions: 1.5.4 Environment: OS: Debuan rodente 4.17 Flink version: 1.5.4 ||Key||Value|| |jobmanager.heap.mb|1024| |jobmanager.rpc.address|localhost| |jobmanager.rpc.port|6123| |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter| |metrics.reporter.jmx.port|9250-9260| |metrics.reporters|jmx| |parallelism.default|1| |rest.port|8081| |taskmanager.heap.mb|1024| |taskmanager.numberOfTaskSlots|1| |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26| h1. Overview ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap Size||Flink Managed Memory|| |43501|1|0|12|62.9 GB|922 MB|642 MB| h1. Memory h2. JVM (Heap/Non-Heap) ||Type||Committed||Used||Maximum|| |Heap|922 MB|575 MB|922 MB| |Non-Heap|68.8 MB|64.3 MB|-1 B| |Total|991 MB|639 MB|922 MB| h2. Outside JVM ||Type||Count||Used||Capacity|| |Direct|3,292|105 MB|105 MB| |Mapped|0|0 B|0 B| h1. Network h2. Memory Segments ||Type||Count|| |Available|3,194| |Total|3,278| h1. Garbage Collection ||Collector||Count||Time|| |G1_Young_Generation|13|336| |G1_Old_Generation|1|21| Reporter: Ankur Goenka Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, jstack_66985.log I am running a fairly complex pipleline with 200+ task. The pipeline works fine with small data (order of 10kb input) but gets stuck with a slightly larger data (300kb input). The task gets stuck while writing the output toFlink, more specifically it gets stuck while requesting memory segment in local buffer pool. The Task manager UI shows that it has enough memory and memory segments to work with. The relevant stack trace is {{"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]}} {{ java.lang.Thread.State: TIMED_WAITING (on object monitor)}} {{ at (C/C++) 0x7fef201c7dae (Unknown Source)}} {{ at (C/C++) 0x7fef1f2aea07 (Unknown Source)}} {{ at (C/C++) 0x7fef1f241cd3 (Unknown Source)}} {{ at java.lang.Object.wait(Native Method)}} {{ - waiting on <0xf6d56450> (a java.util.ArrayDeque)}} {{ at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)}} {{ - locked <0xf6d56450> (a java.util.ArrayDeque)}} {{ at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)}} {{ at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)}} {{ at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)}} {{ at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)}} {{ at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)}} {{ at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)}} {{ at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)}} {{ at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)}} {{ at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)}} {{ at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)}} {{ at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)}} {{ - locked <0xf6a60bd0> (a java.lang.Object)}} {{ at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)}} {{ at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)}} {{ at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)}} {{ at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)}} {{ at org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)}} {{ at org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)}} {{ at org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)}} {{ at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.mes
[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers
[ https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662923#comment-16662923 ] ASF GitHub Bot commented on FLINK-8354: --- alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#issuecomment-432846236 @yanghua, @aljoscha is anything else need to be done with this PR ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink Kafka connector ignores Kafka message headers > - > > Key: FLINK-8354 > URL: https://issues.apache.org/jira/browse/FLINK-8354 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector > Environment: Kafka 0.11.0.0 > Flink 1.4.0 > flink-connector-kafka-0.11_2.11 >Reporter: Mohammad Abareghi >Assignee: Aegeaner >Priority: Major > Labels: pull-request-available > > Kafka has introduced notion of Header for messages in version 0.11.0.0 > https://issues.apache.org/jira/browse/KAFKA-4208. > But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores > headers when consuming kafka messages. > It would be useful in some scenarios, such as distributed log tracing, to > support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#issuecomment-432846236 @yanghua, @aljoscha is anything else need to be done with this PR ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10542) Register Hive metastore as an external catalog in TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang updated FLINK-10542: Description: Similar to FLINK-2167 but rather register Hive metastore as an external catalog in the {{TableEnvironment}}. After registration, Table API and SQL queries should be able to access all Hive tables. This might supersede the need of FLINK-2167 because Hive metastore stores a superset of tables available via hCat without an indirection. was: Similar to FLINK-2167 but rather register Hive metastore as an external ctalog in the {{TableEnvironment}}. After registration, Table API and SQL queries should be able to access all Hive tables. This might supersede the need of FLINK-2167 because Hive metastore stores a superset of tables available via hCat without an indirection. > Register Hive metastore as an external catalog in TableEnvironment > -- > > Key: FLINK-10542 > URL: https://issues.apache.org/jira/browse/FLINK-10542 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > > Similar to FLINK-2167 but rather register Hive metastore as an external > catalog in the {{TableEnvironment}}. After registration, Table API and SQL > queries should be able to access all Hive tables. > This might supersede the need of FLINK-2167 because Hive metastore stores a > superset of tables available via hCat without an indirection. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10556) Integration with Apache Hive
[ https://issues.apache.org/jira/browse/FLINK-10556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang updated FLINK-10556: Attachment: (was: Proposal_ Integrate Flink with Hive Ecosystem.pdf) > Integration with Apache Hive > > > Key: FLINK-10556 > URL: https://issues.apache.org/jira/browse/FLINK-10556 > Project: Flink > Issue Type: New Feature > Components: Batch Connectors and Input/Output Formats, SQL Client, > Table API & SQL >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > Attachments: Proposal_ Integrate Flink with Hive Ecosystem.pdf > > > This is an umbrella JIRA tracking all enhancement and issues related to > integrating Flink with Hive ecosystem. This is an outcome of a discussion in > the community, and thanks go to everyone that provided feedback and interest. > Specifically, we'd like to see the following features and capabilities > immediately in Flink: > # Metadata interoperability > # Data interoperability > # Data type compatibility > # Hive UDF support > # DDL/DML/Query language compatibility > For a longer term, we'd also like to add or improve: > # Compatible SQL service, client tools, JDBC/ODBC drivers > # Better task failure tolerance and task scheduling > # Support other user customizations in Hive (storage handlers, serdes, etc). > I will provide more details regarding the proposal in a doc shortly. Design > doc, if deemed necessary, will be provided in each related sub tasks under > this JIRA. > Feedback and contributions are greatly welcome! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10556) Integration with Apache Hive
[ https://issues.apache.org/jira/browse/FLINK-10556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang updated FLINK-10556: Attachment: Proposal_ Integrate Flink with Hive Ecosystem.pdf > Integration with Apache Hive > > > Key: FLINK-10556 > URL: https://issues.apache.org/jira/browse/FLINK-10556 > Project: Flink > Issue Type: New Feature > Components: Batch Connectors and Input/Output Formats, SQL Client, > Table API & SQL >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > Attachments: Proposal_ Integrate Flink with Hive Ecosystem.pdf > > > This is an umbrella JIRA tracking all enhancement and issues related to > integrating Flink with Hive ecosystem. This is an outcome of a discussion in > the community, and thanks go to everyone that provided feedback and interest. > Specifically, we'd like to see the following features and capabilities > immediately in Flink: > # Metadata interoperability > # Data interoperability > # Data type compatibility > # Hive UDF support > # DDL/DML/Query language compatibility > For a longer term, we'd also like to add or improve: > # Compatible SQL service, client tools, JDBC/ODBC drivers > # Better task failure tolerance and task scheduling > # Support other user customizations in Hive (storage handlers, serdes, etc). > I will provide more details regarding the proposal in a doc shortly. Design > doc, if deemed necessary, will be provided in each related sub tasks under > this JIRA. > Feedback and contributions are greatly welcome! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10671) rest monitoring api Savepoint status call fails if akka.ask.timeout < checkpoint duration
Cliff Resnick created FLINK-10671: - Summary: rest monitoring api Savepoint status call fails if akka.ask.timeout < checkpoint duration Key: FLINK-10671 URL: https://issues.apache.org/jira/browse/FLINK-10671 Project: Flink Issue Type: Bug Components: REST Affects Versions: 1.6.1 Reporter: Cliff Resnick Hi, There seems to be a problem with REST monitoring API: |/jobs/:jobid/savepoints/:triggerid| The problem is that when the Savepoint represented by :triggerid is called with `cancel=true` the above status call seems to fail if the savepoint duration exceeds `akka.ask.timeout` value. Below is a log in which I invoke "cancel with savepoint" then poll the above endpoint for status at 2 second intervals. akka.ask.timout is set for twenty seconds. The error is repeatable at various values of akka.ask.timeout. 2018/10/24 19:42:25 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:27 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:29 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:31 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:33 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:35 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:37 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:39 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:41 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:43 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:45 Cancel with Savepoint may have failed: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_0#-234856817]] after [2 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770) at akka.dispatch.OnComplete.internal(Future.scala:258) at akka.dispatch.OnComplete.internal(Future.scala:256) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:748) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_0#-234856817]] after [2 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) ... 9 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6036) Let catalog support partition
[ https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662721#comment-16662721 ] Xuefu Zhang commented on FLINK-6036: Thank you, [~jinyu.zj]. PR looks good overall, and I left some minor comments for consideration. > Let catalog support partition > - > > Key: FLINK-6036 > URL: https://issues.apache.org/jira/browse/FLINK-6036 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: jingzhang >Assignee: jingzhang >Priority: Major > > Now catalog only support CRUD at database and table level. But in some kind > of catalog, for example for hive, we also need do CRUD operations at > partition level. > This issue aims to let catalog support partition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition. URL: https://github.com/apache/flink/pull/6906#discussion_r227924642 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala ## @@ -120,4 +122,122 @@ class InMemoryExternalCatalog(name: String) extends CrudExternalCatalog { override def listSubCatalogs(): JList[String] = synchronized { databases.keys.toList.asJava } + + /** +* Adds partition into an external Catalog table +* +* @param tableName table name +* @param partition partition description of partition which to create +* @param ignoreIfExists if partition already exists in the catalog, not throw exception and +* leave the existed partition if ignoreIfExists is true; +* else throw PartitionAlreadyExistException +* @throws TableNotExistException if table does not exist in the catalog yet +* @throws PartitionAlreadyExistException if partition exists in the catalog and +*ignoreIfExists is false +*/ + override def createPartition( +tableName: String, +partition: ExternalCatalogPartition, +ignoreIfExists: Boolean): Unit = synchronized { +val newPartSpec = partition.partitionSpec +val table = getTable(tableName) +val partitions = getPartitions(tableName, table) +if (partitions.contains(newPartSpec)) { + if (!ignoreIfExists) { +throw new PartitionAlreadyExistException(name, tableName, newPartSpec) + } +} else { + partitions.put(newPartSpec, partition) Review comment: We might want to validate if the partitionSpec is what the table is expecting. Otherwise, we might insert some partition who's partition columns are different from how the table is partitioned. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition. URL: https://github.com/apache/flink/pull/6906#discussion_r227920287 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala ## @@ -120,4 +122,122 @@ class InMemoryExternalCatalog(name: String) extends CrudExternalCatalog { override def listSubCatalogs(): JList[String] = synchronized { databases.keys.toList.asJava } + + /** +* Adds partition into an external Catalog table +* +* @param tableName table name +* @param partition partition description of partition which to create +* @param ignoreIfExists if partition already exists in the catalog, not throw exception and +* leave the existed partition if ignoreIfExists is true; +* else throw PartitionAlreadyExistException +* @throws TableNotExistException if table does not exist in the catalog yet +* @throws PartitionAlreadyExistException if partition exists in the catalog and +*ignoreIfExists is false +*/ + override def createPartition( +tableName: String, +partition: ExternalCatalogPartition, +ignoreIfExists: Boolean): Unit = synchronized { +val newPartSpec = partition.partitionSpec +val table = getTable(tableName) +val partitions = getPartitions(tableName, table) +if (partitions.contains(newPartSpec)) { + if (!ignoreIfExists) { +throw new PartitionAlreadyExistException(name, tableName, newPartSpec) + } +} else { + partitions.put(newPartSpec, partition) +} + } + + private def getPartitions(tableName: String, table: ExternalCatalogTable) + : mutable.HashMap[JLinkedHashMap[String, String], ExternalCatalogPartition] = table match { +case t: ExternalCatalogPartitionedTable => + partitions.getOrElseUpdate( +tableName, new mutable.HashMap[JLinkedHashMap[String, String], ExternalCatalogPartition]) +case _ => throw new UnsupportedOperationException( Review comment: It might be better to define an exception class called TableNotPartitioned similar to TableNotExistException, so this exception can be handled explicitly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition. URL: https://github.com/apache/flink/pull/6906#discussion_r227915200 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ## @@ -282,34 +296,63 @@ class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDesc this } + /** +* Specifies the partition columns for this external table. +*/ + def withPartitionColumnNames( +partitionColumnNames: java.util.LinkedHashSet[String]): ExternalCatalogTableBuilder = { +require(partitionColumnNames != null && !partitionColumnNames.isEmpty) +this.partitionColumnNames = Some(partitionColumnNames) +this + } + /** * Declares this external table as a table source and returns the * configured [[ExternalCatalogTable]]. * * @return External catalog table */ - def asTableSource(): ExternalCatalogTable = { -new ExternalCatalogTable( - isBatch, - isStreaming, - isSource = true, - isSink = false, - DescriptorProperties.toJavaMap(this)) - } + def asTableSource(): ExternalCatalogTable = this.partitionColumnNames match { + case Some(pc) => +new ExternalCatalogPartitionedTable( + isBatch, + isStreaming, + isSource = true, + isSink = false, + pc, + DescriptorProperties.toJavaMap(this) +) + case None => +new ExternalCatalogTable( + isBatch, + isStreaming, + isSource = true, + isSink = false, + DescriptorProperties.toJavaMap(this)) + } /** * Declares this external table as a table sink and returns the * configured [[ExternalCatalogTable]]. * * @return External catalog table */ - def asTableSink(): ExternalCatalogTable = { -new ExternalCatalogTable( - isBatch, - isStreaming, - isSource = false, - isSink = true, - DescriptorProperties.toJavaMap(this)) + def asTableSink(): ExternalCatalogTable = this.partitionColumnNames match { Review comment: I see a repeated pattern in the three asXXX methods. While it's not introduced in this PR, it might be good if we can introduce a help method that those asXXX methods call to minimize the repetition. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition. URL: https://github.com/apache/flink/pull/6906#discussion_r227908967 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ## @@ -31,6 +31,32 @@ import org.apache.flink.table.api._ */ trait ExternalCatalog { + /** +* Gets the partition from external Catalog +* +* @param tableName table name +* @param partSpec partition specification +* @throws TableNotExistException if table does not exist in the catalog yet +* @throws PartitionNotExistException if partition does not exist in the catalog yet +* @return found partition +*/ + @throws[TableNotExistException] + @throws[PartitionNotExistException] + def getPartition( +tableName: String, +partSpec: JLinkedHashMap[String, String]): ExternalCatalogPartition + + /** +* Gets the partition specification list of a table from external catalog +* +* @param tableName table name +* @throws CatalogNotExistException if database does not exist in the catalog yet Review comment: This (CatalogNotExistException) seems not needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition. URL: https://github.com/apache/flink/pull/6906#discussion_r227908967 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ## @@ -31,6 +31,32 @@ import org.apache.flink.table.api._ */ trait ExternalCatalog { + /** +* Gets the partition from external Catalog +* +* @param tableName table name +* @param partSpec partition specification +* @throws TableNotExistException if table does not exist in the catalog yet +* @throws PartitionNotExistException if partition does not exist in the catalog yet +* @return found partition +*/ + @throws[TableNotExistException] + @throws[PartitionNotExistException] + def getPartition( +tableName: String, +partSpec: JLinkedHashMap[String, String]): ExternalCatalogPartition + + /** +* Gets the partition specification list of a table from external catalog +* +* @param tableName table name +* @throws CatalogNotExistException if database does not exist in the catalog yet Review comment: This seems not needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase
[ https://issues.apache.org/jira/browse/FLINK-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662592#comment-16662592 ] ASF GitHub Bot commented on FLINK-10656: isunjin commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader… URL: https://github.com/apache/flink/pull/6911#discussion_r227888205 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/TaskEventSender.java ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.reader; + +import org.apache.flink.runtime.event.TaskEvent; + +import java.io.IOException; + +/** + * The basic API for every reader. + */ +public interface TaskEventSender { + + void sendTaskEvent(TaskEvent event) throws IOException; Review comment: Yeah, maybe we just simply change the name of ```ReaderBase``` to ```IterationReader``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase > -- > > Key: FLINK-10656 > URL: https://issues.apache.org/jira/browse/FLINK-10656 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0 > > > The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is > not very clean, the API in it are called only by iteration and handle event. > which is not related the name ReaderBase. And the functionality is > independent, so propose to change the name and split the interface to two > isolated interface. > More details please look at the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] isunjin commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader…
isunjin commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader… URL: https://github.com/apache/flink/pull/6911#discussion_r227888205 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/TaskEventSender.java ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.reader; + +import org.apache.flink.runtime.event.TaskEvent; + +import java.io.IOException; + +/** + * The basic API for every reader. + */ +public interface TaskEventSender { + + void sendTaskEvent(TaskEvent event) throws IOException; Review comment: Yeah, maybe we just simply change the name of ```ReaderBase``` to ```IterationReader``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase
[ https://issues.apache.org/jira/browse/FLINK-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662587#comment-16662587 ] ASF GitHub Bot commented on FLINK-10656: isunjin commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader… URL: https://github.com/apache/flink/pull/6911#discussion_r227887183 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/IterationReader.java ## @@ -21,34 +21,21 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.util.event.EventListener; -import java.io.IOException; - /** - * The basic API for every reader. + * Reader for iteration. */ -public interface ReaderBase { - - /** -* Returns whether the reader has consumed the input. -*/ - boolean isFinished(); - - // - // Task events - // - - void sendTaskEvent(TaskEvent event) throws IOException; - - void registerTaskEventListener(EventListener listener, Class eventType); - - // - // Iterations - // +public interface IterationReader { void setIterativeReader(); void startNextSuperstep(); boolean hasReachedEndOfSuperstep(); + /** +* Returns whether the reader has consumed the input. +*/ + boolean isFinished(); Review comment: well, true from literal, however, isFinished() is only used in iteration scenario. maybe we can also change the name of it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase > -- > > Key: FLINK-10656 > URL: https://issues.apache.org/jira/browse/FLINK-10656 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0 > > > The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is > not very clean, the API in it are called only by iteration and handle event. > which is not related the name ReaderBase. And the functionality is > independent, so propose to change the name and split the interface to two > isolated interface. > More details please look at the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] isunjin commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader…
isunjin commented on a change in pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader… URL: https://github.com/apache/flink/pull/6911#discussion_r227887183 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/IterationReader.java ## @@ -21,34 +21,21 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.util.event.EventListener; -import java.io.IOException; - /** - * The basic API for every reader. + * Reader for iteration. */ -public interface ReaderBase { - - /** -* Returns whether the reader has consumed the input. -*/ - boolean isFinished(); - - // - // Task events - // - - void sendTaskEvent(TaskEvent event) throws IOException; - - void registerTaskEventListener(EventListener listener, Class eventType); - - // - // Iterations - // +public interface IterationReader { void setIterativeReader(); void startNextSuperstep(); boolean hasReachedEndOfSuperstep(); + /** +* Returns whether the reader has consumed the input. +*/ + boolean isFinished(); Review comment: well, true from literal, however, isFinished() is only used in iteration scenario. maybe we can also change the name of it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10556) Integration with Apache Hive
[ https://issues.apache.org/jira/browse/FLINK-10556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662551#comment-16662551 ] Xuefu Zhang commented on FLINK-10556: - The proposal is attached. > Integration with Apache Hive > > > Key: FLINK-10556 > URL: https://issues.apache.org/jira/browse/FLINK-10556 > Project: Flink > Issue Type: New Feature > Components: Batch Connectors and Input/Output Formats, SQL Client, > Table API & SQL >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > Attachments: Proposal_ Integrate Flink with Hive Ecosystem.pdf > > > This is an umbrella JIRA tracking all enhancement and issues related to > integrating Flink with Hive ecosystem. This is an outcome of a discussion in > the community, and thanks go to everyone that provided feedback and interest. > Specifically, we'd like to see the following features and capabilities > immediately in Flink: > # Metadata interoperability > # Data interoperability > # Data type compatibility > # Hive UDF support > # DDL/DML/Query language compatibility > For a longer term, we'd also like to add or improve: > # Compatible SQL service, client tools, JDBC/ODBC drivers > # Better task failure tolerance and task scheduling > # Support other user customizations in Hive (storage handlers, serdes, etc). > I will provide more details regarding the proposal in a doc shortly. Design > doc, if deemed necessary, will be provided in each related sub tasks under > this JIRA. > Feedback and contributions are greatly welcome! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662539#comment-16662539 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227849935 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -324,78 +297,98 @@ public void disconnectResourceManager() { boolean allowQueuedScheduling, Time allocationTimeout) { - log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute()); - - final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); - - if (slotSharingGroupId != null) { - // allocate slot with slot sharing - final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( - slotSharingGroupId, - id -> new SlotSharingManager( - id, - this, - providerAndOwner)); - - final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality; - - try { - if (task.getCoLocationConstraint() != null) { - multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot( - task.getCoLocationConstraint(), - multiTaskSlotManager, - slotProfile, - allowQueuedScheduling, - allocationTimeout); + return CompletableFuture.completedFuture(null).thenComposeAsync((i) -> { Review comment: Shouldn't this method be only called by the new `Scheduler` which already wraps all external calls into the main thread? If yes, then we don't need to do it here again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227871624 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -136,12 +143,13 @@ private String jobManagerAddress; + private MainThreadExecutable jmMainThreadScheduledExecutor; + // @VisibleForTesting - protected SlotPool(RpcService rpcService, JobID jobId, SchedulingStrategy schedulingStrategy) { + protected SlotPool(JobID jobId, SchedulingStrategy schedulingStrategy) { Review comment: This class still contains a lot of code which actually moved to the new `Scheduler`. Can we remove this code? Having this code still in this class makes it extremely hard to review. Of course, this would mean that we also need to adapt the `SlotPoolTests` which makes sense anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662540#comment-16662540 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227848647 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -324,78 +297,98 @@ public void disconnectResourceManager() { boolean allowQueuedScheduling, Time allocationTimeout) { - log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute()); - - final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); - - if (slotSharingGroupId != null) { - // allocate slot with slot sharing - final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( - slotSharingGroupId, - id -> new SlotSharingManager( - id, - this, - providerAndOwner)); - - final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality; - - try { - if (task.getCoLocationConstraint() != null) { - multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot( - task.getCoLocationConstraint(), - multiTaskSlotManager, - slotProfile, - allowQueuedScheduling, - allocationTimeout); + return CompletableFuture.completedFuture(null).thenComposeAsync((i) -> { Review comment: `CompletableFuture.supplyAsync()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872749 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. +* * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} */ + @Deprecated CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); + + /** +* Returns a list of {@link SlotInfo} objects about all slots that are currently available in the slot +* pool. +* +* @return a list of {@link SlotInfo} objects about all slots that are currently available in the slot pool. +*/ + @Nonnull + List getAvailableSlotsInformation(); + + /** +* Allocates the available slot with the given allocation id under the given request id. This method returns +* {@code null} if no slot with the given allocation id is available. +* +* @param slotRequestId identifying the requested slot +* @param allocationID the allocation id of the requested available slot +* @return the previously available slot with the given allocation id or {@code null} if no such slot existed. +*/ + @Nullable + AllocatedSlot allocateAvailableSlot( Review comment: It's not so nice that we leak the `AllocatedSlot` which is supposed to be an internal class of the `SlotPool`. I think it would be better to have an interface which allows to do what's needed but, for example, does not expose the `releasePayload` call. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662546#comment-16662546 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227877459 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ## @@ -312,7 +313,7 @@ public void validateRunsInMainThread() { /** * Executor which executes runnables in the main thread context. */ - protected static class MainThreadExecutor implements Executor { + protected static class MainThreadExecutor implements MainThreadExecutable { Review comment: `MainThreadExecutable` was actually intended to be a marker interface for the `RpcServer`. I would recommend to let the `MainThreadExecutor` implement the `ScheduledExecutor`. That way, there is no need to change the `MainThreadExecutable`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10556) Integration with Apache Hive
[ https://issues.apache.org/jira/browse/FLINK-10556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang updated FLINK-10556: Attachment: Proposal_ Integrate Flink with Hive Ecosystem.pdf > Integration with Apache Hive > > > Key: FLINK-10556 > URL: https://issues.apache.org/jira/browse/FLINK-10556 > Project: Flink > Issue Type: New Feature > Components: Batch Connectors and Input/Output Formats, SQL Client, > Table API & SQL >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > Attachments: Proposal_ Integrate Flink with Hive Ecosystem.pdf > > > This is an umbrella JIRA tracking all enhancement and issues related to > integrating Flink with Hive ecosystem. This is an outcome of a discussion in > the community, and thanks go to everyone that provided feedback and interest. > Specifically, we'd like to see the following features and capabilities > immediately in Flink: > # Metadata interoperability > # Data interoperability > # Data type compatibility > # Hive UDF support > # DDL/DML/Query language compatibility > For a longer term, we'd also like to add or improve: > # Compatible SQL service, client tools, JDBC/ODBC drivers > # Better task failure tolerance and task scheduling > # Support other user customizations in Hive (storage handlers, serdes, etc). > I will provide more details regarding the proposal in a doc shortly. Design > doc, if deemed necessary, will be provided in each related sub tasks under > this JIRA. > Feedback and contributions are greatly welcome! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227849935 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -324,78 +297,98 @@ public void disconnectResourceManager() { boolean allowQueuedScheduling, Time allocationTimeout) { - log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute()); - - final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); - - if (slotSharingGroupId != null) { - // allocate slot with slot sharing - final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( - slotSharingGroupId, - id -> new SlotSharingManager( - id, - this, - providerAndOwner)); - - final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality; - - try { - if (task.getCoLocationConstraint() != null) { - multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot( - task.getCoLocationConstraint(), - multiTaskSlotManager, - slotProfile, - allowQueuedScheduling, - allocationTimeout); + return CompletableFuture.completedFuture(null).thenComposeAsync((i) -> { Review comment: Shouldn't this method be only called by the new `Scheduler` which already wraps all external calls into the main thread? If yes, then we don't need to do it here again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227877459 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ## @@ -312,7 +313,7 @@ public void validateRunsInMainThread() { /** * Executor which executes runnables in the main thread context. */ - protected static class MainThreadExecutor implements Executor { + protected static class MainThreadExecutor implements MainThreadExecutable { Review comment: `MainThreadExecutable` was actually intended to be a marker interface for the `RpcServer`. I would recommend to let the `MainThreadExecutor` implement the `ScheduledExecutor`. That way, there is no need to change the `MainThreadExecutable`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872341 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. +* * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} */ + @Deprecated CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); + + /** +* Returns a list of {@link SlotInfo} objects about all slots that are currently available in the slot +* pool. +* +* @return a list of {@link SlotInfo} objects about all slots that are currently available in the slot pool. +*/ + @Nonnull + List getAvailableSlotsInformation(); Review comment: Let's make the return type a `Collection`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662544#comment-16662544 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872252 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. Review comment: What's preventing us from removing this method? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227848647 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -324,78 +297,98 @@ public void disconnectResourceManager() { boolean allowQueuedScheduling, Time allocationTimeout) { - log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute()); - - final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); - - if (slotSharingGroupId != null) { - // allocate slot with slot sharing - final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( - slotSharingGroupId, - id -> new SlotSharingManager( - id, - this, - providerAndOwner)); - - final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality; - - try { - if (task.getCoLocationConstraint() != null) { - multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot( - task.getCoLocationConstraint(), - multiTaskSlotManager, - slotProfile, - allowQueuedScheduling, - allocationTimeout); + return CompletableFuture.completedFuture(null).thenComposeAsync((i) -> { Review comment: `CompletableFuture.supplyAsync()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662541#comment-16662541 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872341 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. +* * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} */ + @Deprecated CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); + + /** +* Returns a list of {@link SlotInfo} objects about all slots that are currently available in the slot +* pool. +* +* @return a list of {@link SlotInfo} objects about all slots that are currently available in the slot pool. +*/ + @Nonnull + List getAvailableSlotsInformation(); Review comment: Let's make the return type a `Collection`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872948 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java ## @@ -90,7 +90,7 @@ * @param cause of the cancellation * @return Future which is completed once the slot request has been cancelled */ - CompletableFuture cancelSlotRequest( + Acknowledge cancelSlotRequest( Review comment: Can this become `void`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872252 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. Review comment: What's preventing us from removing this method? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662545#comment-16662545 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872749 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. +* * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} */ + @Deprecated CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); + + /** +* Returns a list of {@link SlotInfo} objects about all slots that are currently available in the slot +* pool. +* +* @return a list of {@link SlotInfo} objects about all slots that are currently available in the slot pool. +*/ + @Nonnull + List getAvailableSlotsInformation(); + + /** +* Allocates the available slot with the given allocation id under the given request id. This method returns +* {@code null} if no slot with the given allocation id is available. +* +* @param slotRequestId identifying the requested slot +* @param allocationID the allocation id of the requested available slot +* @return the previously available slot with the given allocation id or {@code null} if no such slot existed. +*/ + @Nullable + AllocatedSlot allocateAvailableSlot( Review comment: It's not so nice that we leak the `AllocatedSlot` which is supposed to be an internal class of the `SlotPool`. I think it would be better to have an interface which allows to do what's needed but, for example, does not expose the `releasePayload` call. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872881 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. +* * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} */ + @Deprecated CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); + + /** +* Returns a list of {@link SlotInfo} objects about all slots that are currently available in the slot +* pool. +* +* @return a list of {@link SlotInfo} objects about all slots that are currently available in the slot pool. +*/ + @Nonnull + List getAvailableSlotsInformation(); + + /** +* Allocates the available slot with the given allocation id under the given request id. This method returns +* {@code null} if no slot with the given allocation id is available. +* +* @param slotRequestId identifying the requested slot +* @param allocationID the allocation id of the requested available slot +* @return the previously available slot with the given allocation id or {@code null} if no such slot existed. +*/ + @Nullable + AllocatedSlot allocateAvailableSlot( + @Nonnull SlotRequestId slotRequestId, + @Nonnull AllocationID allocationID); + + /** +* Request the allocation of a new slot from the resource manager. This method will not return a slot from the +* already available slots from the pool, but instead will add a new slot to that pool that is immediately allocated +* and returned. +* +* @param slotRequestId identifying the requested slot +* @param resourceProfile resource profile that specifies the resource requirements for the requested slot +* @param timeout timeout for the allocation procedure +* @return a newly allocated slot that was previously not available. +*/ + @Nonnull + CompletableFuture requestNewAllocatedSlot( Review comment: Same here with the `AllocatedSlot` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662547#comment-16662547 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227870899 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -822,6 +835,29 @@ private SlotAndLocality pollAndAllocateSlot(SlotRequestId slotRequestId, SlotPro return slotFromPool; } + @Nullable + private AllocatedSlot allocateSlotWithID(@Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID) { + AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID); + if (allocatedSlot != null) { + allocatedSlots.add(slotRequestId, allocatedSlot); + } + return allocatedSlot; + } + + @Override + @Nullable + public AllocatedSlot allocateAvailableSlot( Review comment: Could think of whether to make it an `Optional` to make it more explicit that this might not succeed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662542#comment-16662542 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872881 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. +* * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} */ + @Deprecated CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); + + /** +* Returns a list of {@link SlotInfo} objects about all slots that are currently available in the slot +* pool. +* +* @return a list of {@link SlotInfo} objects about all slots that are currently available in the slot pool. +*/ + @Nonnull + List getAvailableSlotsInformation(); + + /** +* Allocates the available slot with the given allocation id under the given request id. This method returns +* {@code null} if no slot with the given allocation id is available. +* +* @param slotRequestId identifying the requested slot +* @param allocationID the allocation id of the requested available slot +* @return the previously available slot with the given allocation id or {@code null} if no such slot existed. +*/ + @Nullable + AllocatedSlot allocateAvailableSlot( + @Nonnull SlotRequestId slotRequestId, + @Nonnull AllocationID allocationID); + + /** +* Request the allocation of a new slot from the resource manager. This method will not return a slot from the +* already available slots from the pool, but instead will add a new slot to that pool that is immediately allocated +* and returned. +* +* @param slotRequestId identifying the requested slot +* @param resourceProfile resource profile that specifies the resource requirements for the requested slot +* @param timeout timeout for the allocation procedure +* @return a newly allocated slot that was previously not available. +*/ + @Nonnull + CompletableFuture requestNewAllocatedSlot( Review comment: Same here with the `AllocatedSlot` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The n
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662548#comment-16662548 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872948 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java ## @@ -90,7 +90,7 @@ * @param cause of the cancellation * @return Future which is completed once the slot request has been cancelled */ - CompletableFuture cancelSlotRequest( + Acknowledge cancelSlotRequest( Review comment: Can this become `void`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227870899 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -822,6 +835,29 @@ private SlotAndLocality pollAndAllocateSlot(SlotRequestId slotRequestId, SlotPro return slotFromPool; } + @Nullable + private AllocatedSlot allocateSlotWithID(@Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID) { + AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID); + if (allocatedSlot != null) { + allocatedSlots.add(slotRequestId, allocatedSlot); + } + return allocatedSlot; + } + + @Override + @Nullable + public AllocatedSlot allocateAvailableSlot( Review comment: Could think of whether to make it an `Optional` to make it more explicit that this might not succeed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662543#comment-16662543 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227871624 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -136,12 +143,13 @@ private String jobManagerAddress; + private MainThreadExecutable jmMainThreadScheduledExecutor; + // @VisibleForTesting - protected SlotPool(RpcService rpcService, JobID jobId, SchedulingStrategy schedulingStrategy) { + protected SlotPool(JobID jobId, SchedulingStrategy schedulingStrategy) { Review comment: This class still contains a lot of code which actually moved to the new `Scheduler`. Can we remove this code? Having this code still in this class makes it extremely hard to review. Of course, this would mean that we also need to adapt the `SlotPoolTests` which makes sense anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10657) TPCHQuery3 fail with IllegalAccessException
[ https://issues.apache.org/jira/browse/FLINK-10657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662521#comment-16662521 ] ASF GitHub Bot commented on FLINK-10657: isunjin commented on issue #6912: [FLINK-10657] TPCHQuery3 fail with IllegalAccessException URL: https://github.com/apache/flink/pull/6912#issuecomment-432735636 > ``` > 00:58:26.666 [ERROR] src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java:[241] (javadoc) JavadocType: Missing a Javadoc comment. > ``` Thanks for review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TPCHQuery3 fail with IllegalAccessException > --- > > Key: FLINK-10657 > URL: https://issues.apache.org/jira/browse/FLINK-10657 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Trivial > Labels: easy-fix, pull-request-available > Fix For: 1.7.0 > > > Similar with [FLINK-7998|https://issues.apache.org/jira/browse/FLINK-7998], > ShoppingPriorityItem in example TPCHQuery3.java are set to private. This > causes an IllegalAccessException exception because of reflection check in > dynamic class instantiation. Making them public resolves the problem -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] isunjin commented on issue #6912: [FLINK-10657] TPCHQuery3 fail with IllegalAccessException
isunjin commented on issue #6912: [FLINK-10657] TPCHQuery3 fail with IllegalAccessException URL: https://github.com/apache/flink/pull/6912#issuecomment-432735636 > ``` > 00:58:26.666 [ERROR] src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java:[241] (javadoc) JavadocType: Missing a Javadoc comment. > ``` Thanks for review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8995) Add a test operator with keyed state that uses custom, stateful serializer
[ https://issues.apache.org/jira/browse/FLINK-8995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662491#comment-16662491 ] ASF GitHub Bot commented on FLINK-8995: --- tzulitai commented on a change in pull request #6909: [FLINK-8995][tests] Add keyed state that uses custom stateful seriali… URL: https://github.com/apache/flink/pull/6909#discussion_r227863415 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/StatefulComplexPayloadSerializer.java ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.artificialstate; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A custom stateful serializer to test that serializers are not used concurrently. + */ +public class StatefulComplexPayloadSerializer extends TypeSerializer { + + private static final long serialVersionUID = 8766687317209282373L; + + /** This holds the thread that currently has exclusive ownership over the serializer. */ + private final AtomicReference currentOwnerThread; + + public StatefulComplexPayloadSerializer() { + this.currentOwnerThread = new AtomicReference<>(null); + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer duplicate() { + return new StatefulComplexPayloadSerializer(); + } + + @Override + public ComplexPayload createInstance() { + throw new UnsupportedOperationException(); + } + + @Override + public ComplexPayload copy(ComplexPayload from) { + try { + if (currentOwnerThread.compareAndSet(null, Thread.currentThread())) { + return InstantiationUtil.deserializeObject( + InstantiationUtil.serializeObject(from), Thread.currentThread().getContextClassLoader()); Review comment: Can avoid multiple invocations on `Thread.currentThread()`: ``` Thread currentThread = Thread.currentThread(); if (currentOwnerThread.compareAndSet(null, currentThread)) { return deserializeObject(serializeObject(from), currentThread.getContextClassLoader()); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a test operator with keyed state that uses custom, stateful serializer > -- > > Key: FLINK-8995 > URL: https://issues.apache.org/jira/browse/FLINK-8995 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > This test should figure out problems in places where multiple threads would > share the same serializer instead of properly duplicating it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tzulitai commented on a change in pull request #6909: [FLINK-8995][tests] Add keyed state that uses custom stateful seriali…
tzulitai commented on a change in pull request #6909: [FLINK-8995][tests] Add keyed state that uses custom stateful seriali… URL: https://github.com/apache/flink/pull/6909#discussion_r227863415 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/StatefulComplexPayloadSerializer.java ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.artificialstate; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A custom stateful serializer to test that serializers are not used concurrently. + */ +public class StatefulComplexPayloadSerializer extends TypeSerializer { + + private static final long serialVersionUID = 8766687317209282373L; + + /** This holds the thread that currently has exclusive ownership over the serializer. */ + private final AtomicReference currentOwnerThread; + + public StatefulComplexPayloadSerializer() { + this.currentOwnerThread = new AtomicReference<>(null); + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer duplicate() { + return new StatefulComplexPayloadSerializer(); + } + + @Override + public ComplexPayload createInstance() { + throw new UnsupportedOperationException(); + } + + @Override + public ComplexPayload copy(ComplexPayload from) { + try { + if (currentOwnerThread.compareAndSet(null, Thread.currentThread())) { + return InstantiationUtil.deserializeObject( + InstantiationUtil.serializeObject(from), Thread.currentThread().getContextClassLoader()); Review comment: Can avoid multiple invocations on `Thread.currentThread()`: ``` Thread currentThread = Thread.currentThread(); if (currentOwnerThread.compareAndSet(null, currentThread)) { return deserializeObject(serializeObject(from), currentThread.getContextClassLoader()); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10570) State grows unbounded when "within" constraint not applied
[ https://issues.apache.org/jira/browse/FLINK-10570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662481#comment-16662481 ] Dawid Wysakowicz commented on FLINK-10570: -- Fixed in master: fa085699ee5a8a90492952ae05bfc78ded3d1ec3 Fixed in 1.6: 8b8422c86ae2ef8b657c395cf3a10fc5f2180b84 > State grows unbounded when "within" constraint not applied > -- > > Key: FLINK-10570 > URL: https://issues.apache.org/jira/browse/FLINK-10570 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.6.1 >Reporter: Thomas Wozniakowski >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.7.0 > > > We have been running some failure monitoring using the CEP library. Simple > stuff that should probably have been implemented with a window, rather than > CEP, but we had already set the project up to use CEP elsewhere and it was > trivial to add this. > We ran the following pattern (on 1.4.2): > {code:java} > begin(PURCHASE_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent()) > .subtype(PurchaseEvent.class) > .times(100) > {code} > and then flat selected the responses if the failure ratio was over a certain > threshold. > With 1.6.1, the state size of the CEP operator for this pattern grows > unbounded, and eventually destroys the job with an OOM exception. We have > many CEP operators in this job but all the rest use a "within" call. > In 1.4.2, it seems events would be discarded once they were no longer in the > 100 most recent, now it seems they are held onto indefinitely. > We have a workaround (we're just going to add a "within" call to force the > CEP operator to discard old events), but it would be useful if we could have > the old behaviour back. > Please let me know if I can provide any more information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10570) State grows unbounded when "within" constraint not applied
[ https://issues.apache.org/jira/browse/FLINK-10570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-10570. Resolution: Fixed Fix Version/s: 1.7.0 1.6.3 > State grows unbounded when "within" constraint not applied > -- > > Key: FLINK-10570 > URL: https://issues.apache.org/jira/browse/FLINK-10570 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.6.1 >Reporter: Thomas Wozniakowski >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.7.0 > > > We have been running some failure monitoring using the CEP library. Simple > stuff that should probably have been implemented with a window, rather than > CEP, but we had already set the project up to use CEP elsewhere and it was > trivial to add this. > We ran the following pattern (on 1.4.2): > {code:java} > begin(PURCHASE_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent()) > .subtype(PurchaseEvent.class) > .times(100) > {code} > and then flat selected the responses if the failure ratio was over a certain > threshold. > With 1.6.1, the state size of the CEP operator for this pattern grows > unbounded, and eventually destroys the job with an OOM exception. We have > many CEP operators in this job but all the rest use a "within" call. > In 1.4.2, it seems events would be discarded once they were no longer in the > 100 most recent, now it seems they are held onto indefinitely. > We have a workaround (we're just going to add a "within" call to force the > CEP operator to discard old events), but it would be useful if we could have > the old behaviour back. > Please let me know if I can provide any more information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10670) Fix Correlate codegen error
[ https://issues.apache.org/jira/browse/FLINK-10670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10670: --- Labels: pull-request-available (was: ) > Fix Correlate codegen error > --- > > Key: FLINK-10670 > URL: https://issues.apache.org/jira/browse/FLINK-10670 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > Labels: pull-request-available > > TableFunctionCollector should handle reuseInitCode and reuseMemberCode -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10670) Fix Correlate codegen error
[ https://issues.apache.org/jira/browse/FLINK-10670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662476#comment-16662476 ] ASF GitHub Bot commented on FLINK-10670: Xpray opened a new pull request #6923: [FLINK-10670] [table] Fix Correlate codegen error URL: https://github.com/apache/flink/pull/6923 ## What is the purpose of the change TableFunctionCollector should handle reuseInitStatements and reuseMemberStatements ## Brief change log * merge the statements of `CollectorCodeGenerator` and `FunctionCodeGenerator` ## Verifying this change This change added tests and can be verified as follows: Added test: org.apache.flink.table.runtime.stream.table.CorrelateITCase#testTableFunctionCollectorInit ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix Correlate codegen error > --- > > Key: FLINK-10670 > URL: https://issues.apache.org/jira/browse/FLINK-10670 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > Labels: pull-request-available > > TableFunctionCollector should handle reuseInitCode and reuseMemberCode -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Xpray opened a new pull request #6923: [FLINK-10670] [table] Fix Correlate codegen error
Xpray opened a new pull request #6923: [FLINK-10670] [table] Fix Correlate codegen error URL: https://github.com/apache/flink/pull/6923 ## What is the purpose of the change TableFunctionCollector should handle reuseInitStatements and reuseMemberStatements ## Brief change log * merge the statements of `CollectorCodeGenerator` and `FunctionCodeGenerator` ## Verifying this change This change added tests and can be verified as follows: Added test: org.apache.flink.table.runtime.stream.table.CorrelateITCase#testTableFunctionCollectorInit ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10669) Exceptions & errors are not properly checked in logs in e2e tests
[ https://issues.apache.org/jira/browse/FLINK-10669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-10669. Resolution: Fixed > Exceptions & errors are not properly checked in logs in e2e tests > - > > Key: FLINK-10669 > URL: https://issues.apache.org/jira/browse/FLINK-10669 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10220) StreamSQL E2E test fails on travis
[ https://issues.apache.org/jira/browse/FLINK-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz resolved FLINK-10220. -- Resolution: Fixed > StreamSQL E2E test fails on travis > -- > > Key: FLINK-10220 > URL: https://issues.apache.org/jira/browse/FLINK-10220 > Project: Flink > Issue Type: Bug > Components: Table API & SQL, Tests >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Hequn Cheng >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > https://travis-ci.org/zentol/flink-ci/jobs/420972344 > {code} > [FAIL] 'Streaming SQL end-to-end test' failed after 1 minutes and 49 seconds! > Test exited with exit code 0 but the logs contained errors, exceptions or > non-empty .out files > 2018-08-27 07:34:36,311 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- window: > (TumblingGroupWindow('w$, 'rowtime, 2.millis)), select: ($SUM0(correct) > AS correct, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS > w$rowtime, proctime('w$) AS w$proctime) -> select: (correct, w$start AS > rowtime) -> to: Row -> Map -> Sink: Unnamed (1/1) > (97d055e4661ff3361a504626257d406d) switched from RUNNING to FAILED. > java.lang.RuntimeException: Exception occurred while processing valve output > watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:65) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateAllWindowFunction.apply(IncrementalAggregateAllWindowFunction.scala:62) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateAllTimeWindowFunction.apply(IncrementalAggregateAllTimeWindowFunction.scala:65) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateAllTimeWindowFunction.apply(IncrementalAggregateAllTimeWindowFunction.scala:37) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:46) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(St
[GitHub] yanghua commented on issue #6890: [FLINK-10603] Reduce kafka test duration
yanghua commented on issue #6890: [FLINK-10603] Reduce kafka test duration URL: https://github.com/apache/flink/pull/6890#issuecomment-432718022 excluded all but `FlinkKafkaProducer011ITCase`, test result : ``` 15:41:23.815 [INFO] flink-connector-kafka-0.8 .. SUCCESS [02:38 min] 15:41:23.815 [INFO] flink-connector-kafka-0.9 .. SUCCESS [02:52 min] 15:41:23.815 [INFO] flink-connector-kafka-0.10 . SUCCESS [03:13 min] 15:41:23.815 [INFO] flink-connector-kafka-0.11 . SUCCESS [04:30 min] ``` What do you think about the time duration, does it look normal? @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10603) Reduce kafka test duration
[ https://issues.apache.org/jira/browse/FLINK-10603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662466#comment-16662466 ] ASF GitHub Bot commented on FLINK-10603: yanghua commented on issue #6890: [FLINK-10603] Reduce kafka test duration URL: https://github.com/apache/flink/pull/6890#issuecomment-432718022 excluded all but `FlinkKafkaProducer011ITCase`, test result : ``` 15:41:23.815 [INFO] flink-connector-kafka-0.8 .. SUCCESS [02:38 min] 15:41:23.815 [INFO] flink-connector-kafka-0.9 .. SUCCESS [02:52 min] 15:41:23.815 [INFO] flink-connector-kafka-0.10 . SUCCESS [03:13 min] 15:41:23.815 [INFO] flink-connector-kafka-0.11 . SUCCESS [04:30 min] ``` What do you think about the time duration, does it look normal? @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce kafka test duration > -- > > Key: FLINK-10603 > URL: https://issues.apache.org/jira/browse/FLINK-10603 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The tests for the modern kafka connector take more than 10 minutes which is > simply unacceptable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10665) Port YARNSessionFIFOITCase#testJavaAPI to new codebase
[ https://issues.apache.org/jira/browse/FLINK-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662439#comment-16662439 ] ASF GitHub Bot commented on FLINK-10665: TisonKun commented on issue #6917: [FLINK-10665] [tests] Port YARNSessionFIFOITCase#testJavaAPI to new c… URL: https://github.com/apache/flink/pull/6917#issuecomment-432711697 @zentol thanks for the review, add a hotfix :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port YARNSessionFIFOITCase#testJavaAPI to new codebase > -- > > Key: FLINK-10665 > URL: https://issues.apache.org/jira/browse/FLINK-10665 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{YARNSessionFIFOITCase#testJavaAPI}} to new codebase -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6917: [FLINK-10665] [tests] Port YARNSessionFIFOITCase#testJavaAPI to new c…
TisonKun commented on issue #6917: [FLINK-10665] [tests] Port YARNSessionFIFOITCase#testJavaAPI to new c… URL: https://github.com/apache/flink/pull/6917#issuecomment-432711697 @zentol thanks for the review, add a hotfix :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-8921) Split code generated call expression
[ https://issues.apache.org/jira/browse/FLINK-8921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li reassigned FLINK-8921: - Assignee: xueyu (was: Ruidong Li) > Split code generated call expression > - > > Key: FLINK-8921 > URL: https://issues.apache.org/jira/browse/FLINK-8921 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: xueyu >Priority: Major > Labels: pull-request-available > > In FLINK-8274 we introduced the possibility of splitting the generated code > into multiple methods in order to exceed the JVMs maximum method size (see > also https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.9). > At the moment we only split methods by fields, however, this is not enough in > all case. We should also split expressions. I suggest to split the operands > of a {{RexCall}} in > {{org.apache.flink.table.codegen.CodeGenerator#visitCall}} if we reach a > certain threshold. However, this should happen as lazily as possible to keep > the runtime overhead low. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8921) Split code generated call expression
[ https://issues.apache.org/jira/browse/FLINK-8921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662436#comment-16662436 ] Ruidong Li commented on FLINK-8921: --- [~xueyu] thanks for you contribution, I assign this issue to you if you want fix this. > Split code generated call expression > - > > Key: FLINK-8921 > URL: https://issues.apache.org/jira/browse/FLINK-8921 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Ruidong Li >Priority: Major > Labels: pull-request-available > > In FLINK-8274 we introduced the possibility of splitting the generated code > into multiple methods in order to exceed the JVMs maximum method size (see > also https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.9). > At the moment we only split methods by fields, however, this is not enough in > all case. We should also split expressions. I suggest to split the operands > of a {{RexCall}} in > {{org.apache.flink.table.codegen.CodeGenerator#visitCall}} if we reach a > certain threshold. However, this should happen as lazily as possible to keep > the runtime overhead low. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10665) Port YARNSessionFIFOITCase#testJavaAPI to new codebase
[ https://issues.apache.org/jira/browse/FLINK-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662430#comment-16662430 ] ASF GitHub Bot commented on FLINK-10665: zentol commented on issue #6917: [FLINK-10665] [tests] Port YARNSessionFIFOITCase#testJavaAPI to new c… URL: https://github.com/apache/flink/pull/6917#issuecomment-432709017 ``` 14:45:49.851 [INFO] There are 3 errors reported by Checkstyle 8.9 with /tools/maven/checkstyle.xml ruleset. 14:45:49.852 [ERROR] src/test/java/org/apache/flink/yarn/YARNITCase.java:[127] (javadoc) JavadocType: Missing a Javadoc comment. 14:45:49.852 [ERROR] src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java:[38] (imports) ImportOrder: Import org.apache.flink.yarn.util.YarnTestUtils appears after other imports that it should precede 14:45:49.852 [ERROR] src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java:[39] (imports) ImportOrder: 'org.apache.hadoop.fs.Path' should be separated from previous imports. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port YARNSessionFIFOITCase#testJavaAPI to new codebase > -- > > Key: FLINK-10665 > URL: https://issues.apache.org/jira/browse/FLINK-10665 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{YARNSessionFIFOITCase#testJavaAPI}} to new codebase -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #6917: [FLINK-10665] [tests] Port YARNSessionFIFOITCase#testJavaAPI to new c…
zentol commented on issue #6917: [FLINK-10665] [tests] Port YARNSessionFIFOITCase#testJavaAPI to new c… URL: https://github.com/apache/flink/pull/6917#issuecomment-432709017 ``` 14:45:49.851 [INFO] There are 3 errors reported by Checkstyle 8.9 with /tools/maven/checkstyle.xml ruleset. 14:45:49.852 [ERROR] src/test/java/org/apache/flink/yarn/YARNITCase.java:[127] (javadoc) JavadocType: Missing a Javadoc comment. 14:45:49.852 [ERROR] src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java:[38] (imports) ImportOrder: Import org.apache.flink.yarn.util.YarnTestUtils appears after other imports that it should precede 14:45:49.852 [ERROR] src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java:[39] (imports) ImportOrder: 'org.apache.hadoop.fs.Path' should be separated from previous imports. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10490) OperatorSnapshotUtil should probably use SavepointV2Serializer
[ https://issues.apache.org/jira/browse/FLINK-10490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662427#comment-16662427 ] ASF GitHub Bot commented on FLINK-10490: zentol commented on issue #6910: [FLINK-10490][tests] OperatorSnapshotUtil should use SavepointV2Seria… URL: https://github.com/apache/flink/pull/6910#issuecomment-432708555 The code loading the savepoint is aware of previous versions and starts a migration routine if a v1 savepoint is detected; the savepoint is loaded using the v1 serializer and migrated before it is passed to the task. I supposed our migration tests aren't affected since they don't make structural changes to jobs, which IIRC is the major limitation of the old format. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > OperatorSnapshotUtil should probably use SavepointV2Serializer > -- > > Key: FLINK-10490 > URL: https://issues.apache.org/jira/browse/FLINK-10490 > Project: Flink > Issue Type: Test > Components: Tests >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Priority: Major > Labels: pull-request-available > > {{OperatorSnapshotUtil}} is used for testing savepoint migration. This > utility internally still uses {{SavepointV1Serializer}} and I would assume > that it should use {{SavepointV2Serializer}}. I wonder if that means that > some newer cases are actually not covered in the migration tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)