[jira] [Comment Edited] (FLINK-11344) Display All Execution Attempt Information on Flink Web Dashboard
[ https://issues.apache.org/jira/browse/FLINK-11344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744769#comment-16744769 ] BoWang edited comment on FLINK-11344 at 1/17/19 7:55 AM: - [~hailong wang] Thanks for the comment. The execution attempt list indeed becomes a bit longer if failures occur, but I think keeps restarting is rare, so a bit longer list may be tolerable. Displaying failed attempt could give us a glimpse of the running information without query the log, including running time, on which TaskManager it runs etc. According these simple informations, we could make some actions, e.g., by *orderBy host* on dash board, we can also find the bad host on which many attempts fail to make corresponding operation actions, e.g., add the machine to blacklist. Thus, I think displaying all the execution attempts is more good than harm. was (Author: eaglewatcher): [~hailong wang] Thanks for the comment. The execution attempt list will indeed becomes a bit longer if failures occur, but I think keeps restarting is rare, so a bit longer list may be tolerable. Displaying failed attempt could give us a glimpse of the running information without query the log, including running time, on which TaskManager it runs etc. According these simple informations, we could make some actions, e.g., by *orderBy host* on dash board, we can also find the bad host on which many attempts fail to make corresponding operation actions, e.g., add the machine to blacklist. Thus, I think displaying all the execution attempts is more good than harm. > Display All Execution Attempt Information on Flink Web Dashboard > > > Key: FLINK-11344 > URL: https://issues.apache.org/jira/browse/FLINK-11344 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, only one Execution Attempt of each sub-task is shown in web > dashboard, thus, only the succeed Attempt is shown when failover occurs. This > may be inconvenient to rapidly locate the failure reasons of failed Attempts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11344) Display All Execution Attempt Information on Flink Web Dashboard
[ https://issues.apache.org/jira/browse/FLINK-11344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744769#comment-16744769 ] BoWang commented on FLINK-11344: [~hailong wang] Thanks for the comment. The execution attempt list will indeed becomes a bit longer if failures occur, but I think keeps restarting is rare, so a bit longer list may be tolerable. Displaying failed attempt could give us a glimpse of the running information without query the log, including running time, on which TaskManager it runs etc. According these simple informations, we could make some actions, e.g., by *orderBy host* on dash board, we can also find the bad host on which many attempts fail to make corresponding operation actions, e.g., add the machine to blacklist. Thus, I think displaying all the execution attempts is more good than harm. > Display All Execution Attempt Information on Flink Web Dashboard > > > Key: FLINK-11344 > URL: https://issues.apache.org/jira/browse/FLINK-11344 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, only one Execution Attempt of each sub-task is shown in web > dashboard, thus, only the succeed Attempt is shown when failover occurs. This > may be inconvenient to rapidly locate the failure reasons of failed Attempts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r248523663 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala ## @@ -170,6 +170,19 @@ class StreamTableEnvironmentTest extends TableTestBase { jTEnv.fromAppendStream(ds, "rt.rowtime, b, c, d, e, pt.proctime") } + @Test + def testAddTableFromUpsert(): Unit = { Review comment: Used to check `registerTable()` with `field.key`. Considering your comments below, I will revert changes in the file. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component
[ https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744746#comment-16744746 ] Tarush Grover commented on FLINK-10429: --- Whats the current status for this ticket, like which tasks are still open? I can pick some of them if they are still open :) > Redesign Flink Scheduling, introducing dedicated Scheduler component > > > Key: FLINK-10429 > URL: https://issues.apache.org/jira/browse/FLINK-10429 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > This epic tracks the redesign of scheduling in Flink. Scheduling is currently > a concern that is scattered across different components, mainly the > ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on > the granularity of individual tasks, which make holistic scheduling > strategies hard to implement. In this epic we aim to introduce a dedicated > Scheduler component that can support use-case like auto-scaling, > local-recovery, and resource optimized batch. > The design for this feature is developed here: > https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11376) flink cli -yn -ys is not effect if (yn * ys)
[ https://issues.apache.org/jira/browse/FLINK-11376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shengjk1 updated FLINK-11376: - Attachment: Main222.java > flink cli -yn -ys is not effect if (yn * ys) parallelism form env.setParallelism(parallelism) ) ; > - > > Key: FLINK-11376 > URL: https://issues.apache.org/jira/browse/FLINK-11376 > Project: Flink > Issue Type: Bug >Affects Versions: 1.7.1 > Environment: java: jdk1.8.0_151 > flink: flink-1.7.1 > CDH:CDH-5.13.1-1.cdh5.13.1.p0.2 >Reporter: shengjk1 >Priority: Major > Attachments: Main222.java, image-2019-01-17-14-25-34-206.png > > > Such as the title > if (yn * ys) env.setParallelism(parallelism) ) the yn and ys is not effect > my application is flink streaming read kafka . this kafka topic has 3 > partition,and setParallelism(3) in code.when i use cli submiitjobs > flink-1.7.1/bin/flink run -m yarn-cluster -yn 1 -ys 1 -ynm test > -ccom.ishansong.bigdata.Main222 ./flinkDemo-1.0-SNAPSHOT.jar > the application apply for 4 cpu cores and 4 containers from yarn web ui > !image-2019-01-17-14-25-34-206.png! > but if code is not write env.setParallelism(parallelism) or > (yn*ys)>parallelism ,the yn、ys will effect. if code write > env.setParallelism(parallelism) ,the final application resources are yn > multiples and ys multiples. such as parallelism=10,yn=1 ys=5,the final > application resources:cpu cores=11 containers=3 > > Reproduce for the convenience of bugs,offer codes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11376) flink cli -yn -ys is not effect if (yn * ys)
[ https://issues.apache.org/jira/browse/FLINK-11376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shengjk1 updated FLINK-11376: - Attachment: (was: Main222.java) > flink cli -yn -ys is not effect if (yn * ys) parallelism form env.setParallelism(parallelism) ) ; > - > > Key: FLINK-11376 > URL: https://issues.apache.org/jira/browse/FLINK-11376 > Project: Flink > Issue Type: Bug >Affects Versions: 1.7.1 > Environment: java: jdk1.8.0_151 > flink: flink-1.7.1 > CDH:CDH-5.13.1-1.cdh5.13.1.p0.2 >Reporter: shengjk1 >Priority: Major > Attachments: image-2019-01-17-14-25-34-206.png > > > Such as the title > if (yn * ys) env.setParallelism(parallelism) ) the yn and ys is not effect > my application is flink streaming read kafka . this kafka topic has 3 > partition,and setParallelism(3) in code.when i use cli submiitjobs > flink-1.7.1/bin/flink run -m yarn-cluster -yn 1 -ys 1 -ynm test > -ccom.ishansong.bigdata.Main222 ./flinkDemo-1.0-SNAPSHOT.jar > the application apply for 4 cpu cores and 4 containers from yarn web ui > !image-2019-01-17-14-25-34-206.png! > but if code is not write env.setParallelism(parallelism) or > (yn*ys)>parallelism ,the yn、ys will effect. if code write > env.setParallelism(parallelism) ,the final application resources are yn > multiples and ys multiples. such as parallelism=10,yn=1 ys=5,the final > application resources:cpu cores=11 containers=3 > > Reproduce for the convenience of bugs,offer codes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11377) AbstractYarnClusterDescriptor's validClusterSpecification is not final application resources from yarn if cli -yn -ys not effect
[ https://issues.apache.org/jira/browse/FLINK-11377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shengjk1 updated FLINK-11377: - Description: when cli -yn -ys not effect ,AbstractYarnClusterDescriptor's validClusterSpecification is not final application resources from yarn (cli -yn -ys not effect can refer to https://issues.apache.org/jira/browse/FLINK-11376) the cli : flink-1.7.1/bin/flink run -m yarn-cluster -yn 1 -ys 1 -ynm test -ccom.ishansong.bigdata.Main222 ./flinkDemo-1.0-SNAPSHOT.jar AbstractYarnClusterDescriptor's log : org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification\{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1} but yarn web ui: allocated containers=4 and allocated cpu cores=4 !image-2019-01-17-14-57-24-060.png! was: when cli -yn -ys not effect ,AbstractYarnClusterDescriptor's validClusterSpecification is not final application resources from yarn (cli -yn -ys not effect can refer to https://issues.apache.org/jira/browse/FLINK-11376) the cli : flink-1.7.1/bin/flink run -m yarn-cluster -yn 1 -ys 1 -ynm test -ccom.ishansong.bigdata.Main222 ./flinkDemo-1.0-SNAPSHOT.jar AbstractYarnClusterDescriptor's log : org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification\{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1} but yarn web ui: !image-2019-01-17-14-57-24-060.png! > AbstractYarnClusterDescriptor's validClusterSpecification is not final > application resources from yarn if cli -yn -ys not effect > > > Key: FLINK-11377 > URL: https://issues.apache.org/jira/browse/FLINK-11377 > Project: Flink > Issue Type: Bug >Affects Versions: 1.7.1 > Environment: java: jdk1.8.0_151 > flink: flink-1.7.1 > CDH:CDH-5.13.1-1.cdh5.13.1.p0.2 >Reporter: shengjk1 >Priority: Major > Attachments: image-2019-01-17-14-57-24-060.png > > > when cli -yn -ys not effect ,AbstractYarnClusterDescriptor's > validClusterSpecification is not final application resources from yarn (cli > -yn -ys not effect can refer to > https://issues.apache.org/jira/browse/FLINK-11376) > > the cli : > flink-1.7.1/bin/flink run -m yarn-cluster -yn 1 -ys 1 -ynm test > -ccom.ishansong.bigdata.Main222 ./flinkDemo-1.0-SNAPSHOT.jar > AbstractYarnClusterDescriptor's log : > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster > specification: ClusterSpecification\{masterMemoryMB=1024, > taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1} > but yarn web ui: > allocated containers=4 and allocated cpu cores=4 > !image-2019-01-17-14-57-24-060.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#issuecomment-455063600 @pnowojski Thanks a lot for your comments. I have updated the PR according to your suggestions and rebased to the master. Looking forward to your new comments. Thank you! Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11376) flink cli -yn -ys is not effect if (yn * ys)
shengjk1 created FLINK-11376: Summary: flink cli -yn -ys is not effect if (yn * ys)https://issues.apache.org/jira/browse/FLINK-11376 Project: Flink Issue Type: Bug Affects Versions: 1.7.1 Environment: java: jdk1.8.0_151 flink: flink-1.7.1 CDH:CDH-5.13.1-1.cdh5.13.1.p0.2 Reporter: shengjk1 Attachments: Main222.java, image-2019-01-17-14-25-34-206.png Such as the title if (yn * ys)parallelism ,the yn、ys will effect. if code write env.setParallelism(parallelism) ,the final application resources are yn multiples and ys multiples. such as parallelism=10,yn=1 ys=5,the final application resources:cpu cores=11 containers=3 Reproduce for the convenience of bugs,offer codes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11377) AbstractYarnClusterDescriptor's validClusterSpecification is not final application resources from yarn if cli -yn -ys not effect
shengjk1 created FLINK-11377: Summary: AbstractYarnClusterDescriptor's validClusterSpecification is not final application resources from yarn if cli -yn -ys not effect Key: FLINK-11377 URL: https://issues.apache.org/jira/browse/FLINK-11377 Project: Flink Issue Type: Bug Affects Versions: 1.7.1 Environment: java: jdk1.8.0_151 flink: flink-1.7.1 CDH:CDH-5.13.1-1.cdh5.13.1.p0.2 Reporter: shengjk1 Attachments: image-2019-01-17-14-57-24-060.png when cli -yn -ys not effect ,AbstractYarnClusterDescriptor's validClusterSpecification is not final application resources from yarn (cli -yn -ys not effect can refer to https://issues.apache.org/jira/browse/FLINK-11376) the cli : flink-1.7.1/bin/flink run -m yarn-cluster -yn 1 -ys 1 -ynm test -ccom.ishansong.bigdata.Main222 ./flinkDemo-1.0-SNAPSHOT.jar AbstractYarnClusterDescriptor's log : org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification\{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1} but yarn web ui: !image-2019-01-17-14-57-24-060.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #7516: [FLINK-11360] [test] Check and remove LocalFlinkMiniClusterITCase
TisonKun commented on issue #7516: [FLINK-11360] [test] Check and remove LocalFlinkMiniClusterITCase URL: https://github.com/apache/flink/pull/7516#issuecomment-455059869 travis fails on >23:21:27.113 [INFO] Tests run: 13, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 273.772 s - in org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase 23:21:27.524 [INFO] 23:21:27.524 [INFO] Results: 23:21:27.524 [INFO] 23:21:27.524 [ERROR] Errors: 23:21:27.524 [ERROR] KafkaITCase.testCancelingEmptyTopic:85->KafkaConsumerTestBase.runCancelingOnEmptyInputTest:1124->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2 » TestTimedOut 23:21:27.524 [INFO] 23:21:27.524 [ERROR] Tests run: 47, Failures: 0, Errors: 1, Skipped: 0 I think it is irrelevant This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] eaglewatcherwb commented on issue #7436: [FLINK-11071][core] add support for dynamic proxy classes resolution …
eaglewatcherwb commented on issue #7436: [FLINK-11071][core] add support for dynamic proxy classes resolution … URL: https://github.com/apache/flink/pull/7436#issuecomment-455054189 > Thanks for the contribution @eaglewatcherwb . > > I don't think we should introduce incompatibilities with that change. Having two different versions of an interface is a problem in the first place. If it occurs the solution would be just to simple leave only one version of the proxy. > > @eaglewatcherwb I don't think we should change `SocketWindowWordCount` example at all. I would prefer to see a proper unit test for those changes. You can check `org.apache.flink.runtime.classloading.ClassLoaderTest#testMessageDecodingWithUnavailableClass` for inspiration. @dawidwys Thanks for the comment. "leave only one version of the proxy." do you mean `ClassLoaderObjectInputStream.resolveProxyClass `not call `super.resolveProxyClass` and just return `null` when `classLoader==null` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r248533068 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala ## @@ -185,4 +198,20 @@ class StreamTableEnvironmentTest extends TableTestBase { (jTEnv, ds) } + private def prepareKeyedSchemaExpressionParser: +(JStreamTableEnv, DataStream[JTuple2[JBool, JTuple5[JLong, JInt, String, JInt, JLong]]]) = { + +val jStreamExecEnv = mock(classOf[JStreamExecEnv]) + when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime) +val jTEnv = TableEnvironment.getTableEnvironment(jStreamExecEnv) + +val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG) + .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]] +val dsType = new TupleTypeInfo(Types.BOOLEAN, sType) + .asInstanceOf[TupleTypeInfo[JTuple2[JBool, JTuple5[JLong, JInt, String, JInt, JLong +val ds = mock(classOf[DataStream[JTuple2[JBool, JTuple5[JLong, JInt, String, JInt, JLong) +when(ds.getType).thenReturn(dsType) Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly
[ https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-11375: - Description: In SlotPool, the AvailableSlots is lock free, so all access to it should in the main thread of SlotPool, and so all the public methods are called throw SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. This may cause a ConcurrentModificationException. 2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: BlinkStoreScanTableSource feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> SourceConversion(table:[_DataStreamTable_12, source: [BlinkStoreScanTableSource feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], fields:(f0)) -> correlate: table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), select: item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to FINISHED. 2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices meet exception, need to fail global execution graph java.lang.reflect.UndeclaredThrowableException at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source) at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955) at org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965) at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503) at org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349) at org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132) at org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107) at org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163) at org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372) at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213) at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198) at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97) at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) at java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125) ... 23 more Caused by: java.util.ConcurrentModificationException at java.util.HashMap$ValueSpliterator.tryAdvance(HashMap.java:1643) at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464) at
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r248525458 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala ## @@ -185,4 +198,20 @@ class StreamTableEnvironmentTest extends TableTestBase { (jTEnv, ds) } + private def prepareKeyedSchemaExpressionParser: +(JStreamTableEnv, DataStream[JTuple2[JBool, JTuple5[JLong, JInt, String, JInt, JLong]]]) = { + +val jStreamExecEnv = mock(classOf[JStreamExecEnv]) Review comment: I will revert changes in this file. It not only introduce confusions raised by you, but also can be tested in our `FromUpsertStreamTest` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly
shuai.xu created FLINK-11375: Summary: Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly Key: FLINK-11375 URL: https://issues.apache.org/jira/browse/FLINK-11375 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.7.1 Reporter: shuai.xu In SlotPool, the AvailableSlots is lock free, so all access to it should in the main thread of SlotPool, and so all the public methods are called throw SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. This may cause a ConcurrentModificationException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Myasuka commented on a change in pull request #7515: [FLINK-11313][checkpoint] Introduce LZ4 compression for keyed state in full checkpoints and savepoints
Myasuka commented on a change in pull request #7515: [FLINK-11313][checkpoint] Introduce LZ4 compression for keyed state in full checkpoints and savepoints URL: https://github.com/apache/flink/pull/7515#discussion_r248531370 ## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ## @@ -894,12 +898,13 @@ public void disableAutoTypeRegistration() { this.autoTypeRegistrationEnabled = false; } - public boolean isUseSnapshotCompression() { Review comment: @zentol hmm, to keep backward compatibility, I have to let `ExecutionConfig` within `flink-core` module to know the snappy compression type. It seems I have to also move all `StreamCompressionDecorator`s into `flink-core` module. I'll then update my PR based on this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Myasuka commented on a change in pull request #7515: [FLINK-11313][checkpoint] Introduce LZ4 compression for keyed state in full checkpoints and savepoints
Myasuka commented on a change in pull request #7515: [FLINK-11313][checkpoint] Introduce LZ4 compression for keyed state in full checkpoints and savepoints URL: https://github.com/apache/flink/pull/7515#discussion_r248531370 ## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ## @@ -894,12 +898,13 @@ public void disableAutoTypeRegistration() { this.autoTypeRegistrationEnabled = false; } - public boolean isUseSnapshotCompression() { Review comment: hmm, to keep backward compatibility, I have to let `ExecutionConfig` within `flink-core` module to know the snappy compression type. It seems I have to also move all `StreamCompressionDecorator`s into `flink-core` module. I'll then update my PR based on this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r248531053 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala ## @@ -95,4 +95,41 @@ object StreamTestData { data.+=(((3, 3), "three")) env.fromCollection(data) } + + def getSmall3TupleUpsertStream(env: StreamExecutionEnvironment): + DataStream[(Boolean, (Int, Long, String))] = { +val data = new mutable.MutableList[(Boolean, (Int, Long, String))] +data.+=((true, (1, 1L, "Hi"))) +data.+=((true, (2, 2L, "Hello"))) +data.+=((true, (3, 2L, "Hello world"))) +env.fromCollection(data) + } + + def get3TupleUpsertStream(env: StreamExecutionEnvironment): Review comment: Oh, it should be used in the next commit. I will correct this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r248530953 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala ## @@ -46,6 +46,20 @@ class TableSinkValidationTest extends TableTestBase { env.execute() } + @Test(expected = classOf[TableException]) + def testAppendSinkOnUpdatingTable2(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +val t = tEnv.fromUpsertStream( + StreamTestData.getSmall3TupleUpsertStream(env), 'id.key, 'num, 'text) + +t.writeToSink(new TestAppendSink) + +// must fail because table is not append-only +env.execute() Review comment: Good point! I think we can use `streamTestUtil` to test the sink logic. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11366) Check and port TaskManagerMetricsTest to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-11366: Assignee: Yun Tang > Check and port TaskManagerMetricsTest to new code base if necessary > --- > > Key: FLINK-11366 > URL: https://issues.apache.org/jira/browse/FLINK-11366 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: Yun Tang >Priority: Major > > Check and port {{TaskManagerMetricsTest}} to new code base if necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11363) Check and remove TaskManagerConfigurationTest
[ https://issues.apache.org/jira/browse/FLINK-11363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-11363: Assignee: Yun Tang > Check and remove TaskManagerConfigurationTest > - > > Key: FLINK-11363 > URL: https://issues.apache.org/jira/browse/FLINK-11363 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: Yun Tang >Priority: Major > > Check whether {{TaskManagerConfigurationTest}} contains any relevant tests > for the new code base and then remove this test. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11374) See more failover and can filter by time range
[ https://issues.apache.org/jira/browse/FLINK-11374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744657#comment-16744657 ] lining commented on FLINK-11374: Hi, [~till.rohrmann]. What do you think of this. > See more failover and can filter by time range > -- > > Key: FLINK-11374 > URL: https://issues.apache.org/jira/browse/FLINK-11374 > Project: Flink > Issue Type: Improvement > Components: REST, Webfrontend >Reporter: lining >Assignee: lining >Priority: Major > > Now failover just show limit size task failover latest time. If task has > failed many time, we can not see the earlier time failover. Can we add filter > by time to see failover which contains task attemp fail msg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11374) See more failover and can filter by time range
[ https://issues.apache.org/jira/browse/FLINK-11374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lining updated FLINK-11374: --- Summary: See more failover and can filter by time range (was: Faiover add time range filter) > See more failover and can filter by time range > -- > > Key: FLINK-11374 > URL: https://issues.apache.org/jira/browse/FLINK-11374 > Project: Flink > Issue Type: Improvement > Components: REST, Webfrontend >Reporter: lining >Assignee: lining >Priority: Major > > Now failover just show limit size task failover latest time. If task has > failed many time, we can not see the earlier time failover. Can we add filter > by time to see failover which contains task attemp fail msg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11374) Faiover add time range filter
lining created FLINK-11374: -- Summary: Faiover add time range filter Key: FLINK-11374 URL: https://issues.apache.org/jira/browse/FLINK-11374 Project: Flink Issue Type: Improvement Components: REST, Webfrontend Reporter: lining Assignee: lining Now failover just show limit size task failover latest time. If task has failed many time, we can not see the earlier time failover. Can we add filter by time to see failover which contains task attemp fail msg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] libenchao opened a new pull request #7518: [hotfix] [docs] fix typo in debugging classloading doc
libenchao opened a new pull request #7518: [hotfix] [docs] fix typo in debugging classloading doc URL: https://github.com/apache/flink/pull/7518 ## What is the purpose of the change Fix typo in debugging classloading doc. ## Brief change log Fix typo ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r248527085 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, unaryNode} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import java.lang.{Boolean => JBool} + +class FromUpsertStreamTest extends TableTestBase { + + private val streamUtil: StreamTableTestUtil = streamTestUtil() + + @Test + def testRemoveUpsertToRetraction() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT b as b1, c, proctime as proctime1, rowtime as rowtime1 FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +unaryNode( + "DataStreamUpsertToRetraction", + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ), + term("keys", "b"), + term("select", "a", "b", "c", "proctime", "rowtime") +), +term("select", "b AS b1", "c", "proctime AS proctime1", "rowtime AS rowtime1")) +streamUtil.verifySql(sql, expected, true) + } + + @Test + def testCalcCannotTransposeUpsertToRetraction() = { Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11370) Check and port ZooKeeperLeaderElectionITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744645#comment-16744645 ] TisonKun commented on FLINK-11370: -- [~lining] if you start working on this issue, I'm glad to help review. Basically I've not started working on it. So it's OK for me that you take over it. > Check and port ZooKeeperLeaderElectionITCase to new code base if necessary > -- > > Key: FLINK-11370 > URL: https://issues.apache.org/jira/browse/FLINK-11370 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: TisonKun >Priority: Major > > Check and port {{ZooKeeperLeaderElectionITCase}} to new code base if > necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r248526967 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, unaryNode} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import java.lang.{Boolean => JBool} + +class FromUpsertStreamTest extends TableTestBase { + + private val streamUtil: StreamTableTestUtil = streamTestUtil() + + @Test + def testRemoveUpsertToRetraction() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = { Review comment: `testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose` is used to test materialize logic and transpose(calc can be pushed down through UpsertToRetraction). `testCalcCannotTransposeUpsertToRetraction` is used to test transpose logic(calc can not be pushed down). However, we should not test transpose logic in this commit? I will remove `testCalcCannotTransposeUpsertToRetraction` in this commit and add it later. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11370) Check and port ZooKeeperLeaderElectionITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lining reassigned FLINK-11370: -- Assignee: lining (was: TisonKun) > Check and port ZooKeeperLeaderElectionITCase to new code base if necessary > -- > > Key: FLINK-11370 > URL: https://issues.apache.org/jira/browse/FLINK-11370 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: lining >Priority: Major > > Check and port {{ZooKeeperLeaderElectionITCase}} to new code base if > necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-10882) Misleading job/task state for scheduled jobs
[ https://issues.apache.org/jira/browse/FLINK-10882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lining updated FLINK-10882: --- Comment: was deleted (was: [~Zentol] what's your point?) > Misleading job/task state for scheduled jobs > > > Key: FLINK-10882 > URL: https://issues.apache.org/jira/browse/FLINK-10882 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Priority: Major > Attachments: list_view.png, task_view.png > > > When submitting a job when not enough resources are available currently > cuases the job stay in a {{CREATE/SCHEDULED}} state. > There are 2 issues with how this is presented in the UI. > The {{Running Jobs}} page incorrectly states that the job is running. > (see list_view attachment) > EDIT: Actually, from a runtime perspective the job is in fact in a RUNNING > state. > The state display for individual tasks either > # States the task is in a CREATED state, when it is actually SCHEDULED > # States the task is in a CREATED state, but the count for all state boxes is > zero. > (see task_view attachment) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-10882) Misleading job/task state for scheduled jobs
[ https://issues.apache.org/jira/browse/FLINK-10882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lining updated FLINK-10882: --- Comment: was deleted (was: See code, now org.apache.flink.runtime.jobgraph.JobStatus define running which means Some tasks are scheduled or running, some may be pending, some may be finished. Should we add new status?) > Misleading job/task state for scheduled jobs > > > Key: FLINK-10882 > URL: https://issues.apache.org/jira/browse/FLINK-10882 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Priority: Major > Attachments: list_view.png, task_view.png > > > When submitting a job when not enough resources are available currently > cuases the job stay in a {{CREATE/SCHEDULED}} state. > There are 2 issues with how this is presented in the UI. > The {{Running Jobs}} page incorrectly states that the job is running. > (see list_view attachment) > EDIT: Actually, from a runtime perspective the job is in fact in a RUNNING > state. > The state display for individual tasks either > # States the task is in a CREATED state, when it is actually SCHEDULED > # States the task is in a CREATED state, but the count for all state boxes is > zero. > (see task_view attachment) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11370) Check and port ZooKeeperLeaderElectionITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lining reassigned FLINK-11370: -- Assignee: TisonKun (was: lining) > Check and port ZooKeeperLeaderElectionITCase to new code base if necessary > -- > > Key: FLINK-11370 > URL: https://issues.apache.org/jira/browse/FLINK-11370 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: TisonKun >Priority: Major > > Check and port {{ZooKeeperLeaderElectionITCase}} to new code base if > necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r248525458 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala ## @@ -185,4 +198,20 @@ class StreamTableEnvironmentTest extends TableTestBase { (jTEnv, ds) } + private def prepareKeyedSchemaExpressionParser: +(JStreamTableEnv, DataStream[JTuple2[JBool, JTuple5[JLong, JInt, String, JInt, JLong]]]) = { + +val jStreamExecEnv = mock(classOf[JStreamExecEnv]) Review comment: I will remove changes in this file. It not only introduce confusions raised by you, but also can be tested in our `FromUpsertStreamTest` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11357) Check and port LeaderChangeJobRecoveryTest to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744638#comment-16744638 ] lining commented on FLINK-11357: Hi,[~till.rohrmann] can assign this to me. > Check and port LeaderChangeJobRecoveryTest to new code base if necessary > > > Key: FLINK-11357 > URL: https://issues.apache.org/jira/browse/FLINK-11357 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Priority: Major > > Check and port {{LeaderChangeJobRecoveryTest}} to new code base if necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r248523663 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala ## @@ -170,6 +170,19 @@ class StreamTableEnvironmentTest extends TableTestBase { jTEnv.fromAppendStream(ds, "rt.rowtime, b, c, d, e, pt.proctime") } + @Test + def testAddTableFromUpsert(): Unit = { Review comment: Used to check `registerTable()` with `field.key` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r248523576 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala ## @@ -183,6 +183,17 @@ object UpdatingPlanChecker { lJoinKeys.zip(rJoinKeys) ) } + +case l: DataStreamUpsertToRetraction => + val uniqueKeyNames = l.getRowType.getFieldNames.zipWithIndex +.filter(e => l.keyIndexes.contains(e._2)) +.map(_._1) + Some(uniqueKeyNames.map(e => (e, e))) + +case scan: UpsertStreamScan => Review comment: The `visite` in `RelVisitor` returns void. We need to return an `Option[Seq[(String, String)]]` here This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11365) Check and port TaskManagerFailureRecoveryITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] boshu Zheng reassigned FLINK-11365: --- Assignee: boshu Zheng > Check and port TaskManagerFailureRecoveryITCase to new code base if necessary > - > > Key: FLINK-11365 > URL: https://issues.apache.org/jira/browse/FLINK-11365 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: boshu Zheng >Priority: Major > > Check and port {{TaskManagerFailureRecoveryITCase}} to new code base if > necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r248522927 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/UpsertStreamScan.scala ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rex.RexNode +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.expressions.Cast +import org.apache.flink.table.plan.schema.{RowSchema, UpsertStreamTable} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo + +/** + * Flink RelNode which matches along with DataStreamSource. Different from [[AppendStreamScan]], + * [[UpsertStreamScan]] is used to handle upsert streams from source. + */ +class UpsertStreamScan( Review comment: Seems a common base class is better. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11361) Check and port RecoveryITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu reassigned FLINK-11361: Assignee: Congxian Qiu > Check and port RecoveryITCase to new code base if necessary > --- > > Key: FLINK-11361 > URL: https://issues.apache.org/jira/browse/FLINK-11361 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: Congxian Qiu >Priority: Major > > Check and port {{RecoveryITCase}} to new code base if necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11352) Check and port JobManagerHACheckpointRecoveryITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu reassigned FLINK-11352: Assignee: Congxian Qiu > Check and port JobManagerHACheckpointRecoveryITCase to new code base if > necessary > - > > Key: FLINK-11352 > URL: https://issues.apache.org/jira/browse/FLINK-11352 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: Congxian Qiu >Priority: Major > > Check and port {{JobManagerHACheckpointRecoveryITCase}} to new code base if > necessary -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11344) Display All Execution Attempt Information on Flink Web Dashboard
[ https://issues.apache.org/jira/browse/FLINK-11344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744617#comment-16744617 ] hailong wang commented on FLINK-11344: -- The display list will be long if the job keeps restarting? and the previous attempt will be identical to the current attempt if there are errors in the job, so we can also locate the failure reasons > Display All Execution Attempt Information on Flink Web Dashboard > > > Key: FLINK-11344 > URL: https://issues.apache.org/jira/browse/FLINK-11344 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, only one Execution Attempt of each sub-task is shown in web > dashboard, thus, only the succeed Attempt is shown when failover occurs. This > may be inconvenient to rapidly locate the failure reasons of failed Attempts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11373) CliFrontend cuts off reason for error messages
[ https://issues.apache.org/jira/browse/FLINK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] leesf reassigned FLINK-11373: - Assignee: leesf > CliFrontend cuts off reason for error messages > -- > > Key: FLINK-11373 > URL: https://issues.apache.org/jira/browse/FLINK-11373 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.5.6, 1.6.3, 1.7.1 >Reporter: Maximilian Michels >Assignee: leesf >Priority: Minor > Labels: starter > > The CliFrontend seems to only print the first message in the strace trace and > not any of its causes. > {noformat} > bin/flink run /non-existing/path > Could not build the program from JAR file. > Use the help option (-h or --help) to get help on the command. > {noformat} > Notice, the underlying cause of this message is FileNotFoundException. > Consider changing > a) the error message for this particular case > b) the way the stack trace messages are trimmed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunhaibotb commented on a change in pull request #7403: [FLINK-11256] Replace the reference of StreamNode object with ID in S…
sunhaibotb commented on a change in pull request #7403: [FLINK-11256] Replace the reference of StreamNode object with ID in S… URL: https://github.com/apache/flink/pull/7403#discussion_r248518981 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java ## @@ -130,7 +122,7 @@ public boolean equals(Object o) { @Override public String toString() { - return "(" + sourceVertex + " -> " + targetVertex + ", typeNumber=" + typeNumber + return "(" + sourceId + " -> " + targetId + ", typeNumber=" + typeNumber Review comment: I discussed with @sunjincheng121 and decided to add _sourceOperatorName_ and _targetOperatorName_. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhuzhurk commented on issue #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…
zhuzhurk commented on issue #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7255#issuecomment-455015548 Thanks Andrey(@azagrebin) and Till(@tillrohrmann) for the reviewing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on issue #7502: [hotfix][docs] Fix documentation of DataStream API Tutorial
dianfu commented on issue #7502: [hotfix][docs] Fix documentation of DataStream API Tutorial URL: https://github.com/apache/flink/pull/7502#issuecomment-455011043 @sunjincheng121 Thanks a lot for the review, merge and the kind suggestion. Will do that next time. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r248513031 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, indicesToMaterialize) } + def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = { Review comment: Good catch! > First you didn't recursively call the RelTimeIndicatorConverter on the upsertToRetraction input I think you are right. It is better to visit input for `LogicalUpsertToRetraction`. I will update the pr according to your suggestion. However, it seems impossible for us to add a test for this. The `LogicalUpsertToRetraction` is right after the source. There is no case we need to materialize time indicators in source. The `visit()` method in `RelTimeIndicatorConverter` return scan directly. > Secondly, we should solve this in some more generic way. Agree. I suggest to do it in another pr. It is self contained. Also, this pr is a bit big. I would rather not to add new feature in it. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r248513031 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, indicesToMaterialize) } + def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = { Review comment: Good catch! > First you didn't recursively call the RelTimeIndicatorConverter on the upsertToRetraction input The `LogicalUpsertToRetraction` is right after the source. There is no case we need to materialize time indicators in source. Currently, visit() return scan directly in `RelTimeIndicatorConverter`. But I think you are right. It is better to visit input for `LogicalUpsertToRetraction`. However, it seems impossible for us to add a test for this. > Secondly, we should solve this in some more generic way. Agree. I suggest to do it in another pr. It is self contained. This pr is a bit big. I would rather not to add new feature in it. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11159) Allow configuration whether to fall back to savepoints for restore
[ https://issues.apache.org/jira/browse/FLINK-11159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744582#comment-16744582 ] vinoyang commented on FLINK-11159: -- >From my personal point of view, setting the default to false is a good >consideration, which is compatible with the default behavior of the old >version, and does not surprise the user, but rather uses it as an optimization >option. > Allow configuration whether to fall back to savepoints for restore > -- > > Key: FLINK-11159 > URL: https://issues.apache.org/jira/browse/FLINK-11159 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Nico Kruber >Assignee: vinoyang >Priority: Major > > Ever since FLINK-3397, upon failure, Flink would restart from the latest > checkpoint/savepoint which ever is more recent. With the introduction of > local recovery and the knowledge that a RocksDB checkpoint restore would just > copy the files, it may be time to re-consider / making this configurable: > In certain situations, it may be faster to restore from the latest checkpoint > only (even if there is a more recent savepoint) and reprocess the data > between. On the downside, though, that may not be correct because that might > break side effects if the savepoint was the latest one, e.g. consider this > chain: {{chk1 -> chk2 -> sp … restore chk2 -> …}}. Then all side effects > between {{chk2 -> sp}} would be reproduced. > Making this configurable will allow the user to set whatever he needs / can > to get the lowest recovery time in Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] allenxwang commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
allenxwang commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#issuecomment-455003915 This is a very useful feature for us. Any chance this would be merged soon? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on issue #7502: [hotfix][docs] Fix documentation of DataStream API Tutorial
sunjincheng121 commented on issue #7502: [hotfix][docs] Fix documentation of DataStream API Tutorial URL: https://github.com/apache/flink/pull/7502#issuecomment-455000790 Fixed in master: 09eff88a72a46541d624a08fd66bc342099e0c81 Fixed in release-1.7: e341b3411e1706413b39e4301c97456489005044 Do not cp the changes into release-1.6, because the doc structure is not same. BTW. @dianfu I suggest that create the JIRA. before the fix PR next time. great thanks. :-) Bests, Jincheng This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #7502: [hotfix][docs] Fix documentation of DataStream API Tutorial
asfgit closed pull request #7502: [hotfix][docs] Fix documentation of DataStream API Tutorial URL: https://github.com/apache/flink/pull/7502 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on issue #7502: [hotfix][docs] Fix documentation of DataStream API Tutorial
sunjincheng121 commented on issue #7502: [hotfix][docs] Fix documentation of DataStream API Tutorial URL: https://github.com/apache/flink/pull/7502#issuecomment-454996428 @dianfu thanks for the update! LGTM. +1 to merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KarmaGYZ commented on issue #7517: [FLINK-11359][test] Check and port LegacyAvroExternalJarProgramITCase to new…
KarmaGYZ commented on issue #7517: [FLINK-11359][test] Check and port LegacyAvroExternalJarProgramITCase to new… URL: https://github.com/apache/flink/pull/7517#issuecomment-454986842 cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11359) Check and port LegacyAvroExternalJarProgramITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11359: --- Labels: pull-request-available (was: ) > Check and port LegacyAvroExternalJarProgramITCase to new code base if > necessary > --- > > Key: FLINK-11359 > URL: https://issues.apache.org/jira/browse/FLINK-11359 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: Yangze Guo >Priority: Major > Labels: pull-request-available > > Check and port {{LegacyAvroExternalJarProgramITCase}} to new code base if > necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] KarmaGYZ opened a new pull request #7517: [FLINK-11359][test] Check and port LegacyAvroExternalJarProgramITCase to new…
KarmaGYZ opened a new pull request #7517: [FLINK-11359][test] Check and port LegacyAvroExternalJarProgramITCase to new… URL: https://github.com/apache/flink/pull/7517 … code base if necessary ## What is the purpose of the change Check and port LegacyAvroExternalJarProgramITCase to new code base if necessary. ## Brief change log `LegacyAvroExternalJarProgramITCase` only contains `testExternalProgram` which is same with `AvroExternalJarProgramITCase`. So we can simply remove the legacy version at all. ## Verifying this change This change is already covered by existing tests, such as `AvroExternalJarProgramITCase#testExternalProgram` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11359) Check and port LegacyAvroExternalJarProgramITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yangze Guo reassigned FLINK-11359: -- Assignee: Yangze Guo > Check and port LegacyAvroExternalJarProgramITCase to new code base if > necessary > --- > > Key: FLINK-11359 > URL: https://issues.apache.org/jira/browse/FLINK-11359 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: Yangze Guo >Priority: Major > > Check and port {{LegacyAvroExternalJarProgramITCase}} to new code base if > necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] AlphaGarden closed pull request #7312: [FLINK-11169][runtime] fix the problem of not being reloaded for jobmanager's…
AlphaGarden closed pull request #7312: [FLINK-11169][runtime] fix the problem of not being reloaded for jobmanager's… URL: https://github.com/apache/flink/pull/7312 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] AlphaGarden commented on issue #7312: [FLINK-11169][runtime] fix the problem of not being reloaded for jobmanager's…
AlphaGarden commented on issue #7312: [FLINK-11169][runtime] fix the problem of not being reloaded for jobmanager's… URL: https://github.com/apache/flink/pull/7312#issuecomment-454978509 @aljoscha May I ask a question about the reason why the design and implementation of loading logs and stdout files for jobmanager are different from the taskmangers' at the first beginning? is there any distinct behavior for these two components' loading logs and stdout files? Thanks in advance. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port YARNHighAvailabilityITCase to new codebase
GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port YARNHighAvailabilityITCase to new codebase URL: https://github.com/apache/flink/pull/7509#discussion_r248478213 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ## @@ -18,198 +18,205 @@ package org.apache.flink.yarn; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.deployment.ClusterDeploymentException; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; - -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.yarn.configuration.YarnConfigOptions; +import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint; +import org.apache.flink.yarn.testjob.YarnTestJob; +import org.apache.flink.yarn.util.YarnTestUtils; + import org.apache.curator.test.TestingServer; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Rule; +import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.duration.FiniteDuration; +import javax.annotation.Nonnull; +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.EnumSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; import static org.junit.Assume.assumeTrue; /** * Tests that verify correct HA behavior. */ public class YARNHighAvailabilityITCase extends YarnTestBase { - private static TestingServer zkServer; - - private static ActorSystem actorSystem; + @ClassRule + public static final TemporaryFolder FOLDER = new TemporaryFolder(); - private static final int numberApplicationAttempts = 3; + private static final String LOG_DIR = "flink-yarn-tests-ha"; + private static final Duration TIMEOUT = Duration.ofSeconds(200L); - @Rule - public TemporaryFolder temp = new TemporaryFolder(); + private static TestingServer zkServer; + private static String storageDir; @BeforeClass - public static void setup() { - actorSystem = AkkaUtils.createDefaultActorSystem(); - - try { - zkServer = new TestingServer(); - zkServer.start(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Could not start ZooKeeper testing cluster."); - } + public static void setup() throws Exception { + zkServer = new TestingServer(); - YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha"); -
[GitHub] GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port YARNHighAvailabilityITCase to new codebase
GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port YARNHighAvailabilityITCase to new codebase URL: https://github.com/apache/flink/pull/7509#discussion_r248476938 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ## @@ -18,198 +18,205 @@ package org.apache.flink.yarn; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.deployment.ClusterDeploymentException; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; - -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.yarn.configuration.YarnConfigOptions; +import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint; +import org.apache.flink.yarn.testjob.YarnTestJob; +import org.apache.flink.yarn.util.YarnTestUtils; + import org.apache.curator.test.TestingServer; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Rule; +import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.duration.FiniteDuration; +import javax.annotation.Nonnull; +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.EnumSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; import static org.junit.Assume.assumeTrue; /** * Tests that verify correct HA behavior. */ public class YARNHighAvailabilityITCase extends YarnTestBase { - private static TestingServer zkServer; - - private static ActorSystem actorSystem; + @ClassRule + public static final TemporaryFolder FOLDER = new TemporaryFolder(); - private static final int numberApplicationAttempts = 3; + private static final String LOG_DIR = "flink-yarn-tests-ha"; + private static final Duration TIMEOUT = Duration.ofSeconds(200L); - @Rule - public TemporaryFolder temp = new TemporaryFolder(); + private static TestingServer zkServer; + private static String storageDir; @BeforeClass - public static void setup() { - actorSystem = AkkaUtils.createDefaultActorSystem(); - - try { - zkServer = new TestingServer(); - zkServer.start(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Could not start ZooKeeper testing cluster."); - } + public static void setup() throws Exception { + zkServer = new TestingServer(); - YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha"); -
[GitHub] GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port YARNHighAvailabilityITCase to new codebase
GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port YARNHighAvailabilityITCase to new codebase URL: https://github.com/apache/flink/pull/7509#discussion_r248477826 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ## @@ -18,198 +18,205 @@ package org.apache.flink.yarn; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.deployment.ClusterDeploymentException; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; - -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.yarn.configuration.YarnConfigOptions; +import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint; +import org.apache.flink.yarn.testjob.YarnTestJob; +import org.apache.flink.yarn.util.YarnTestUtils; + import org.apache.curator.test.TestingServer; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Rule; +import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.duration.FiniteDuration; +import javax.annotation.Nonnull; +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.EnumSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; import static org.junit.Assume.assumeTrue; /** * Tests that verify correct HA behavior. */ public class YARNHighAvailabilityITCase extends YarnTestBase { - private static TestingServer zkServer; - - private static ActorSystem actorSystem; + @ClassRule + public static final TemporaryFolder FOLDER = new TemporaryFolder(); - private static final int numberApplicationAttempts = 3; + private static final String LOG_DIR = "flink-yarn-tests-ha"; + private static final Duration TIMEOUT = Duration.ofSeconds(200L); - @Rule - public TemporaryFolder temp = new TemporaryFolder(); + private static TestingServer zkServer; + private static String storageDir; @BeforeClass - public static void setup() { - actorSystem = AkkaUtils.createDefaultActorSystem(); - - try { - zkServer = new TestingServer(); - zkServer.start(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Could not start ZooKeeper testing cluster."); - } + public static void setup() throws Exception { + zkServer = new TestingServer(); - YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha"); -
[GitHub] GJL commented on a change in pull request #7512: [FLINK-11294][tests] Remove legacy JobInfo usage in valid tests
GJL commented on a change in pull request #7512: [FLINK-11294][tests] Remove legacy JobInfo usage in valid tests URL: https://github.com/apache/flink/pull/7512#discussion_r248471274 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java ## @@ -114,7 +112,7 @@ public void testPutAndRemoveJobGraph() throws Exception { verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId)); // Update (same ID) - jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 1); + jobGraph = createSubmittedJobGraph(jobGraph.getJobId()); jobGraphs.putJobGraph(jobGraph); // Verify updated Review comment: ok understood This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11360) Check and remove LocalFlinkMiniClusterITCase
[ https://issues.apache.org/jira/browse/FLINK-11360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11360: --- Labels: pull-request-available (was: ) > Check and remove LocalFlinkMiniClusterITCase > > > Key: FLINK-11360 > URL: https://issues.apache.org/jira/browse/FLINK-11360 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > > Check tests in {{LocalFlinkMiniClusterITCase}} whether they also apply to the > {{MiniCluster}} and port if necessary. Afterwards remove > {{LocalFlinkMiniClusterITCase}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun opened a new pull request #7516: [FLINK-11360] [test] Check and remove LocalFlinkMiniClusterITCase
TisonKun opened a new pull request #7516: [FLINK-11360] [test] Check and remove LocalFlinkMiniClusterITCase URL: https://github.com/apache/flink/pull/7516 ## What is the purpose of the change Check tests in LocalFlinkMiniClusterITCase whether they also apply to the MiniCluster and port if necessary. Afterwards remove LocalFlinkMiniClusterITCase. ## Brief change log `LocalFlinkMiniClusterITCase` contains only `testLocalFlinkMiniClusterWithMultipleTaskManagers`. We don't cover multi TMs case in `MiniClusterITCase`. Thus add it. For implementation, simply test positive cases that a job with `numOfTMs * slotPerTM` can run with corresponding TMs and slots. negative cases test manually. ## Verifying this change This change added tests and can be verified as follows: `MiniClusterITCase#runJobWithSingleRpcService` and `MiniClusterITCase#runJobWithMultipleRpcServices` are now covered multi TMs case. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10822) Configurable MetricQueryService interval
[ https://issues.apache.org/jira/browse/FLINK-10822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10822: --- Labels: pull-request-available (was: ) > Configurable MetricQueryService interval > > > Key: FLINK-10822 > URL: https://issues.apache.org/jira/browse/FLINK-10822 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Chesnay Schepler >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > The {{MetricQueryService}} is used for transmitting metrics from TaskManagers > to the JobManager, in order to expose them via REST and by extension the > WebUI. > By default the JM will poll metrics at most every 10 seconds. This has an > adverse effect on the duration of our end-to-end tests, which for example > query metrics via the REST API to determine whether the cluster has started. > If during the first poll no TM is available it will take another 10 second > for updated information to be available. > By making this interval configurable we could this reduce the test duration. > Additionally this could serve as a switch to disable the > {{MetricQueryService}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #7459: [FLINK-10822] Make MetricFetcher update interval configurable
asfgit closed pull request #7459: [FLINK-10822] Make MetricFetcher update interval configurable URL: https://github.com/apache/flink/pull/7459 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-10822) Configurable MetricQueryService interval
[ https://issues.apache.org/jira/browse/FLINK-10822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-10822. --- Resolution: Fixed Fixed via 474fe8e8ae063e250f7f8f4eddcd799b15f8b69b > Configurable MetricQueryService interval > > > Key: FLINK-10822 > URL: https://issues.apache.org/jira/browse/FLINK-10822 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Chesnay Schepler >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.8.0 > > > The {{MetricQueryService}} is used for transmitting metrics from TaskManagers > to the JobManager, in order to expose them via REST and by extension the > WebUI. > By default the JM will poll metrics at most every 10 seconds. This has an > adverse effect on the duration of our end-to-end tests, which for example > query metrics via the REST API to determine whether the cluster has started. > If during the first poll no TM is available it will take another 10 second > for updated information to be available. > By making this interval configurable we could this reduce the test duration. > Additionally this could serve as a switch to disable the > {{MetricQueryService}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #7459: [FLINK-10822] Make MetricFetcher update interval configurable
tillrohrmann commented on issue #7459: [FLINK-10822] Make MetricFetcher update interval configurable URL: https://github.com/apache/flink/pull/7459#issuecomment-454957193 @zentol I like the idea. Shall we do this as a follow up since I've just merged the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11351) Port JobManagerCleanupITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-11351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-11351: - Assignee: Till Rohrmann > Port JobManagerCleanupITCase to new code base > - > > Key: FLINK-11351 > URL: https://issues.apache.org/jira/browse/FLINK-11351 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > > Port {{JobManagerCleanupITCase}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11349) Port CoordinatorShutdownTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-11349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-11349: - Assignee: Till Rohrmann > Port CoordinatorShutdownTest to new code base > - > > Key: FLINK-11349 > URL: https://issues.apache.org/jira/browse/FLINK-11349 > Project: Flink > Issue Type: Sub-task >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Port {{CoordinatorShutdownTest#testCoordinatorShutsDownOnFailure}} and > {{CoordinatorShutdownTest#testCoordinatorShutsDownOnSuccess}} to new code > base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] lamber-ken closed pull request #7510: [hotfix][runtime] remove RejectedExecutionException code in executeAsyncCallRunnable method
lamber-ken closed pull request #7510: [hotfix][runtime] remove RejectedExecutionException code in executeAsyncCallRunnable method URL: https://github.com/apache/flink/pull/7510 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on issue #7303: [FLINK-10569] [tests] Get rid of Scheduler from valid tests
TisonKun commented on issue #7303: [FLINK-10569] [tests] Get rid of Scheduler from valid tests URL: https://github.com/apache/flink/pull/7303#issuecomment-454949528 @zentol I see your concern above. To nudge the removal of legacy `Scheduler`, since the full usages of it are coupled with `Instance`, I can think of 2 approach: 1. Keep this pr and also the thread WIP, notify committer when finish all works (Scheduler and Instance). 2. Separated this removal testcase by testcase, such as "Port PointwisePatternTest to new codebase". This follows Till's sort scheme and we can complete the whole job by fine-grained parts. In addition, most of these test cases, the porting job is fully about replacing `Scheduler`. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on a change in pull request #7303: [FLINK-10569] [tests] Get rid of Scheduler from valid tests
TisonKun commented on a change in pull request #7303: [FLINK-10569] [tests] Get rid of Scheduler from valid tests URL: https://github.com/apache/flink/pull/7303#discussion_r248454800 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ## @@ -155,21 +153,12 @@ public void testRestartAutomatically() throws Exception { public void testCancelWhileRestarting() throws Exception { // We want to manually control the restart and delay RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy(); - Tuple2 executionGraphInstanceTuple = createExecutionGraph(restartStrategy); - ExecutionGraph executionGraph = executionGraphInstanceTuple.f0; - Instance instance = executionGraphInstanceTuple.f1; - // Kill the instance and wait for the job to restart - instance.markDead(); - - Deadline deadline = TestingUtils.TESTING_DURATION().fromNow(); - - while (deadline.hasTimeLeft() && - executionGraph.getState() != JobStatus.RESTARTING) { - - Thread.sleep(100); - } + SlotProvider slotProvider = new TestingSlotProvider(ignore -> new CompletableFuture<>()); + ExecutionGraph executionGraph = createSimpleExecutionGraph(restartStrategy, slotProvider); + assertEquals(JobStatus.CREATED, executionGraph.getState()); + executionGraph.scheduleForExecution(); assertEquals(JobStatus.RESTARTING, executionGraph.getState()); Review comment: This is because the SlotProvider returns non-completed future. And eg is configured as not allow queued scheduling. Thus when scheduleForExecutor, it causes `IllegalArgumentException("The slot allocation future has not been completed yet.")` and then failing and then restarting. But it is tricky. Maybe I should set allow queued scheduling and fail a future to emulate a real failure. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on a change in pull request #7303: [FLINK-10569] [tests] Get rid of Scheduler from valid tests
TisonKun commented on a change in pull request #7303: [FLINK-10569] [tests] Get rid of Scheduler from valid tests URL: https://github.com/apache/flink/pull/7303#discussion_r248449108 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java ## @@ -90,12 +90,12 @@ public void testAssignSlotSharingGroup() { new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy(), - new Scheduler(TestingUtils.defaultExecutionContext())); + new TestingSlotProvider(ignored -> new CompletableFuture<>())); eg.attachJobGraph(vertices); // verify that the vertices are all in the same slot sharing group - SlotSharingGroup group1 = null; - SlotSharingGroup group2 = null; + SlotSharingGroup group1; Review comment: Gotcha. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on a change in pull request #7245: [FLINK-11069] [utils] Remove FutureUtil
TisonKun commented on a change in pull request #7245: [FLINK-11069] [utils] Remove FutureUtil URL: https://github.com/apache/flink/pull/7245#discussion_r248448673 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ## @@ -158,27 +160,31 @@ public void testConcurrentConsumeMultiplePartitions() throws Exception { // Test try { // Submit producer tasks - List> results = Lists.newArrayListWithCapacity( + List> results = Lists.newArrayListWithCapacity( parallelism + 1); for (int i = 0; i < parallelism; i++) { - results.add(executor.submit(partitionProducers[i])); + results.add(CompletableFuture.supplyAsync( + CheckedSupplier.unchecked(partitionProducers[i]::call), executor)); } // Submit consumer for (int i = 0; i < parallelism; i++) { - results.add(executor.submit( - new TestLocalInputChannelConsumer( - i, - parallelism, - numberOfBuffersPerChannel, - networkBuffers.createBufferPool(parallelism, parallelism), - partitionManager, - new TaskEventDispatcher(), - partitionIds))); + results.add(CompletableFuture.supplyAsync( + CheckedSupplier.unchecked( + new TestLocalInputChannelConsumer( Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7515: [FLINK-11313][checkpoint] Introduce LZ4 compression for keyed state in full checkpoints and savepoints
zentol commented on a change in pull request #7515: [FLINK-11313][checkpoint] Introduce LZ4 compression for keyed state in full checkpoints and savepoints URL: https://github.com/apache/flink/pull/7515#discussion_r248439222 ## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ## @@ -894,12 +898,13 @@ public void disableAutoTypeRegistration() { this.autoTypeRegistrationEnabled = false; } - public boolean isUseSnapshotCompression() { Review comment: The class is annotated with `@Public`, which means you cannot remove and `public` method. This method and it's behavior must be conserved. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on issue #7506: [FLINK-11347] Optimize the ParquetAvroWriters factory
zentol commented on issue #7506: [FLINK-11347] Optimize the ParquetAvroWriters factory URL: https://github.com/apache/flink/pull/7506#issuecomment-454927640 Related test failures: ``` 18:30:12.583 [INFO] Running org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase 18:30:15.884 [ERROR] Tests run: 3, Failures: 0, Errors: 3, Skipped: 0, Time elapsed: 3.298 s <<< FAILURE! - in org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase 18:30:15.884 [ERROR] testWriteParquetAvroReflect(org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase) Time elapsed: 0.371 s <<< ERROR! org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase.testWriteParquetAvroReflect(ParquetStreamingFileSinkITCase.java:140) Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema at org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase.testWriteParquetAvroReflect(ParquetStreamingFileSinkITCase.java:140) 18:30:15.885 [ERROR] testWriteParquetAvroSpecific(org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase) Time elapsed: 0.085 s <<< ERROR! org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase.testWriteParquetAvroSpecific(ParquetStreamingFileSinkITCase.java:83) Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema at org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase.testWriteParquetAvroSpecific(ParquetStreamingFileSinkITCase.java:83) 18:30:15.885 [ERROR] testWriteParquetAvroGeneric(org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase) Time elapsed: 0.013 s <<< ERROR! org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase.testWriteParquetAvroGeneric(ParquetStreamingFileSinkITCase.java:110) Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema at org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase.testWriteParquetAvroGeneric(ParquetStreamingFileSinkITCase.java:110) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11196) Extend S3 EntropyInjector to use key replacement (instead of key removal) when creating checkpoint metadata files
[ https://issues.apache.org/jira/browse/FLINK-11196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744345#comment-16744345 ] Steven Zhen Wu commented on FLINK-11196: [~StephanEwen] can you take a look at the Jira and PR from [~markcho] ? > Extend S3 EntropyInjector to use key replacement (instead of key removal) > when creating checkpoint metadata files > - > > Key: FLINK-11196 > URL: https://issues.apache.org/jira/browse/FLINK-11196 > Project: Flink > Issue Type: Improvement > Components: FileSystem >Affects Versions: 1.7.0 >Reporter: Mark Cho >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > We currently use S3 entropy injection when writing out checkpoint data. > We also use external checkpoints so that we can resume from a checkpoint > metadata file later. > The current implementation of S3 entropy injector makes it difficult to > locate the checkpoint metadata files since in the newer versions of Flink, > `state.checkpoints.dir` configuration controls where the metadata and state > files are written, instead of having two separate paths (one for metadata, > one for state files). > With entropy injection, we replace the entropy marker in the path specified > by `state.checkpoints.dir` with entropy (for state files) or we strip out the > marker (for metadata files). > > We need to extend the entropy injection so that we can replace the entropy > marker with a predictable path (instead of removing it) so that we can do a > prefix query for just the metadata files. > By not using the entropy key replacement (defaults to empty string), you get > the same behavior as it is today (entropy marker removed). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11195) Extend AbstractS3FileSystemFactory.createHadoopFileSystem to accept URI and Hadoop Configuration
[ https://issues.apache.org/jira/browse/FLINK-11195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744346#comment-16744346 ] Steven Zhen Wu commented on FLINK-11195: [~StephanEwen] can you take a look at this Jira and PR from [~markcho]? > Extend AbstractS3FileSystemFactory.createHadoopFileSystem to accept URI and > Hadoop Configuration > > > Key: FLINK-11195 > URL: https://issues.apache.org/jira/browse/FLINK-11195 > Project: Flink > Issue Type: Improvement > Components: FileSystem >Affects Versions: 1.7.0 >Reporter: Mark Cho >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, `createHadoopFileSystem` method does not take any parameters. > In order to delegate FileSystem creation to Hadoop FileSystem.get method, we > need to pass URI and Hadoop Configuration to this abstract method. > We use a custom version of PrestoS3FileSystem by plugging our > FileSystemFactory similar to `flink-filesystems/flink-s3-fs-presto` project. > However, we would like to delegate our FS creation to Hadoop. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #7510: [hotfix][runtime] remove RejectedExecutionException code in executeAsyncCallRunnable method
zentol commented on issue #7510: [hotfix][runtime] remove RejectedExecutionException code in executeAsyncCallRunnable method URL: https://github.com/apache/flink/pull/7510#issuecomment-454891145 why? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol removed a comment on issue #7510: [hotfix][runtime] remove RejectedExecutionException code in executeAsyncCallRunnable method
zentol removed a comment on issue #7510: [hotfix][runtime] remove RejectedExecutionException code in executeAsyncCallRunnable method URL: https://github.com/apache/flink/pull/7510#issuecomment-454891145 why? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11373) CliFrontend cuts off reason for error messages
Maximilian Michels created FLINK-11373: -- Summary: CliFrontend cuts off reason for error messages Key: FLINK-11373 URL: https://issues.apache.org/jira/browse/FLINK-11373 Project: Flink Issue Type: Bug Components: Startup Shell Scripts Affects Versions: 1.7.1, 1.6.3, 1.5.6 Reporter: Maximilian Michels The CliFrontend seems to only print the first message in the strace trace and not any of its causes. {noformat} bin/flink run /non-existing/path Could not build the program from JAR file. Use the help option (-h or --help) to get help on the command. {noformat} Notice, the underlying cause of this message is FileNotFoundException. Consider changing a) the error message for this particular case b) the way the stack trace messages are trimmed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11274) Scala 2.12 Kryo serialization bug
[ https://issues.apache.org/jira/browse/FLINK-11274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744281#comment-16744281 ] Zhenhao Li commented on FLINK-11274: Hi Aljoscha, thanks for checking. I am 100% sure that I resolved it by changing the version number from 1.7.1 to 1.7.0. Since reporting this issue, I have done a major structural refactor of the codebase and updated the code for performance optimization. I no longer call the filter method on SortedSet anymore. I just tried to reproduce the issue. This time everything still worked when I changed the version to 1.7.1. That was indeed very puzzling to me when it occurred for the first time. I forgot to mention that I was running everything inside IntelliJ IDEA. It could as well be a bug there. Feel free to close this one. > Scala 2.12 Kryo serialization bug > - > > Key: FLINK-11274 > URL: https://issues.apache.org/jira/browse/FLINK-11274 > Project: Flink > Issue Type: Bug >Affects Versions: 1.7.1 > Environment: Flink 1.7.1 > Scala 2.12.8 >Reporter: Zhenhao Li >Priority: Major > > The following code works well for serializing Scala classes, e.g., > SortedSet[T], without problem in 1.7.0. > ``` > env.getConfig.registerTypeWithKryoSerializer( > classOf[ClosureSerializer.Closure], > classOf[ClosureSerializer] > ) > ``` > However, in 1.7.1 the following error occurs when checkpointing. > ``` > Serialization trace: > cmp$2 (scala.math.Ordering$$anon$6) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > com.twitter.chill.SortedSetSerializer.read(SortedSetSerializer.scala:38) > at > com.twitter.chill.SortedSetSerializer.read(SortedSetSerializer.scala:21) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311) > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73) > at > org.apache.flink.streaming.api.scala.function.StatefulFunction.applyWithState(StatefulFunction.scala:41) > at > org.apache.flink.streaming.api.scala.function.StatefulFunction.applyWithState$(StatefulFunction.scala:40) > at > org.apache.flink.streaming.api.scala.KeyedStream$$anon$3.applyWithState(KeyedStream.scala:591) > at > org.apache.flink.streaming.api.scala.KeyedStream$$anon$3.flatMap(KeyedStream.scala:596) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassNotFoundException: > io.connecterra.stateful.AggregationSlidingWindowStateUpdater$$$Lambda$506/497325684 > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) > ... 24 common frames omitted > ``` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Myasuka opened a new pull request #7515: Lz4 compression
Myasuka opened a new pull request #7515: Lz4 compression URL: https://github.com/apache/flink/pull/7515 ## What is the purpose of the change LZ4 is a popular lightweight compression, which has better performance than Snappy in many cases, and also [recommended by RocksDB](https://github.com/facebook/rocksdb/wiki/Compression#configuration). Based on this, I introduce LZ4 except for now existing snappy compression for keyed state in full checkpoint and savepoints. ## Brief change log - Introduce new `CompressionType` interface and move `StreamCompressionDecorator` related classes to `flink-core` module. - Introduce new enum `CompressionTypes` class and new `LZ4StreamCompressionDecorator` class in `flink-runtime` module. - Bump `KeyedBackendSerializationProxy` to a newer version for newer compression type. - Migrated existing tests to use LZ4 compression. ## Verifying this change This change added tests and can be verified as follows: - Extended unit tests `SerializationProxiesTest` and `StateSnapshotCompressionTest` for newely added compression type. - Migrate `EventTimeWindowCheckpointingITCase` and `RescalingITCase` IT cases to use LZ4 compression. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes, add lz4-java dependency. - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no, but changed the `KeyedBackendSerializationProxy` - The runtime per-record code paths (performance sensitive): no, should not affect topology task performance. - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11349) Port CoordinatorShutdownTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-11349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11349: --- Labels: pull-request-available (was: ) > Port CoordinatorShutdownTest to new code base > - > > Key: FLINK-11349 > URL: https://issues.apache.org/jira/browse/FLINK-11349 > Project: Flink > Issue Type: Sub-task >Reporter: Till Rohrmann >Priority: Major > Labels: pull-request-available > > Port {{CoordinatorShutdownTest#testCoordinatorShutsDownOnFailure}} and > {{CoordinatorShutdownTest#testCoordinatorShutsDownOnSuccess}} to new code > base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann opened a new pull request #7514: [FLINK-11349][tests] Port CoordinatorShutdownTest to new code base
tillrohrmann opened a new pull request #7514: [FLINK-11349][tests] Port CoordinatorShutdownTest to new code base URL: https://github.com/apache/flink/pull/7514 ## What is the purpose of the change The relevant tests of the CoordinatorShutdownTest have been moved to the ExecutionGraphCheckpointCoordinatorTest which executes the same test just without spawning an actual cluster. ## Brief change log - `testCoordinatorShutsDownOnFailure` is subsumed by `ExecutionGraphCheckpointCoordinatorTest#testShutdownCheckpointCoordinatorOnFailure` - `testCoordinatorShutsDownOnSuccess` is moved to `ExecutionGraphCheckpointCoordinatorTest#testShutdownCheckpointCoordinatorOnFinished ` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11372) Incorrect delegation of compatibility checks to new snapshots in CollectionSerializerConfigSnapshot
Tzu-Li (Gordon) Tai created FLINK-11372: --- Summary: Incorrect delegation of compatibility checks to new snapshots in CollectionSerializerConfigSnapshot Key: FLINK-11372 URL: https://issues.apache.org/jira/browse/FLINK-11372 Project: Flink Issue Type: Bug Components: Type Serialization System Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Fix For: 1.8.0 In {{CollectionSerializerConfigSnapshot}}: {code} @Override public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSerializer newSerializer) { if (newSerializer instanceof ListSerializer) { ListSerializer newListSerializer = (ListSerializer) newSerializer; ListSerializerSnapshot listSerializerSnapshot = new ListSerializerSnapshot<>(newListSerializer); @SuppressWarnings("unchecked") TypeSerializerSchemaCompatibility result = (TypeSerializerSchemaCompatibility) listSerializerSnapshot.resolveSchemaCompatibility(newListSerializer); return result; } else { return super.resolveSchemaCompatibility(newSerializer); } } {code} Compatibility check of {{ListSerializer}} is delegated to the new list serializer snapshot class, {{ListSerializerSnapshot}}. However, it is incorrect to let the delegate wrap the new serializer (and therefore the new nested element serializer). By doing that, we're essentially checking compatibility of the new serializer with itself, whereas it should be checking compatibility with the restored serializer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #7459: [FLINK-10822] Make MetricFetcher update interval configurable
zentol commented on issue #7459: [FLINK-10822] Make MetricFetcher update interval configurable URL: https://github.com/apache/flink/pull/7459#issuecomment-454854517 @tillrohrmann What do you think about disabling the fetcher completely if the interval is configured to 0? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11350) Remove JobClientActorRecoveryITCase
[ https://issues.apache.org/jira/browse/FLINK-11350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11350: --- Labels: pull-request-available (was: ) > Remove JobClientActorRecoveryITCase > --- > > Key: FLINK-11350 > URL: https://issues.apache.org/jira/browse/FLINK-11350 > Project: Flink > Issue Type: Sub-task >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > > Remove {{JobClientActorRecoveryITCase}} since it only tests legacy code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #7447: [FLINK-11294] [test] Remove legacy JobInfo usage in valid tests
TisonKun commented on a change in pull request #7447: [FLINK-11294] [test] Remove legacy JobInfo usage in valid tests URL: https://github.com/apache/flink/pull/7447#discussion_r248355439 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java ## @@ -114,7 +112,7 @@ public void testPutAndRemoveJobGraph() throws Exception { verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId)); // Update (same ID) - jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 1); + jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), "Updated JobName"); Review comment: Also respond at #7512 , if we want to verify update exactly, we should check something different between these two JobGraph. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann opened a new pull request #7513: [FLINK-11350][tests] Delete JobClientActorRecoveryITCase
tillrohrmann opened a new pull request #7513: [FLINK-11350][tests] Delete JobClientActorRecoveryITCase URL: https://github.com/apache/flink/pull/7513 ## What is the purpose of the change The JobClientActorRecoveryITCase is no longer needed since it only tests the legacy code path. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on a change in pull request #7512: [FLINK-11294][tests] Remove legacy JobInfo usage in valid tests
TisonKun commented on a change in pull request #7512: [FLINK-11294][tests] Remove legacy JobInfo usage in valid tests URL: https://github.com/apache/flink/pull/7512#discussion_r248354857 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java ## @@ -114,7 +112,7 @@ public void testPutAndRemoveJobGraph() throws Exception { verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId)); // Update (same ID) - jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 1); + jobGraph = createSubmittedJobGraph(jobGraph.getJobId()); jobGraphs.putJobGraph(jobGraph); // Verify updated Review comment: `verifyJobGraphs` verifies JobID and JobName, if we don't change both of them, we should remove such verification. Otherwise we' better to check update. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on issue #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
azagrebin commented on issue #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#issuecomment-454843390 Thanks @tillrohrmann ! I pushed the update to address the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r248348274 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -61,6 +80,116 @@ static void transferAllStateDataToDirectory( downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); } + /** +* Upload all the files to checkpoint fileSystem using specified number of threads. +* +* @param files The files will be uploaded to checkpoint filesystem. +* @param numberOfSnapshottingThreads The number of threads used to upload the files. +* @param checkpointStreamFactory The checkpoint streamFactory used to create outputstream. +* @param closeableRegistry +* +* @throws Exception Thrown if can not upload all the files. +*/ + public static Map uploadFilesToCheckpointFs( + @Nonnull Map files, + int numberOfSnapshottingThreads, + CheckpointStreamFactory checkpointStreamFactory, + CloseableRegistry closeableRegistry) throws Exception { + + Map handles = new HashMap<>(); + + ExecutorService executorService = createExecutorService(numberOfSnapshottingThreads); Review comment: @klion26 The upload and download part are quite independent. The base class could contain the executor, number of threads, closable registry and close method (to shutdown the executor instead of shutdowning it every time). The upload and download classes can extend it and share the executor. The downloader could be then registered with the closable registry. For upload, we can of course shutdown it immediately after restore is done. I agree that it would make sense to keep threads up and for code reuse as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11370) Check and port ZooKeeperLeaderElectionITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun reassigned FLINK-11370: Assignee: TisonKun > Check and port ZooKeeperLeaderElectionITCase to new code base if necessary > -- > > Key: FLINK-11370 > URL: https://issues.apache.org/jira/browse/FLINK-11370 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: TisonKun >Priority: Major > > Check and port {{ZooKeeperLeaderElectionITCase}} to new code base if > necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11371) Close the AvroParquetReader after use
[ https://issues.apache.org/jira/browse/FLINK-11371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11371: --- Labels: pull-request-available (was: ) > Close the AvroParquetReader after use > - > > Key: FLINK-11371 > URL: https://issues.apache.org/jira/browse/FLINK-11371 > Project: Flink > Issue Type: Improvement > Components: Formats >Affects Versions: 1.7.1 >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > The AvroParquetReader is not being closed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Fokko opened a new pull request #7511: [FLINK-11371] The AvroParquetReader is not being closed
Fokko opened a new pull request #7511: [FLINK-11371] The AvroParquetReader is not being closed URL: https://github.com/apache/flink/pull/7511 https://issues.apache.org/jira/browse/FLINK-11371 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11371) Close the AvroParquetReader after use
Fokko Driesprong created FLINK-11371: Summary: Close the AvroParquetReader after use Key: FLINK-11371 URL: https://issues.apache.org/jira/browse/FLINK-11371 Project: Flink Issue Type: Improvement Components: Formats Affects Versions: 1.7.1 Reporter: Fokko Driesprong Assignee: Fokko Driesprong Fix For: 1.8.0 The AvroParquetReader is not being closed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11369) Check and port ZooKeeperHAJobManagerTest to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun reassigned FLINK-11369: Assignee: TisonKun > Check and port ZooKeeperHAJobManagerTest to new code base if necessary > -- > > Key: FLINK-11369 > URL: https://issues.apache.org/jira/browse/FLINK-11369 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: TisonKun >Priority: Major > > Check and port {{ZooKeeperHAJobManagerTest}} to new code base if necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)