[GitHub] [flink] flinkbot edited a comment on pull request #13977: [Mirror] [FLINK-19238] Sanity check for RocksDB arena block size
flinkbot edited a comment on pull request #13977: URL: https://github.com/apache/flink/pull/13977#issuecomment-723460604 ## CI report: * 39cc0db6968c005405244145a452dbada545cba9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9267) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9295) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
flinkbot edited a comment on pull request #13963: URL: https://github.com/apache/flink/pull/13963#issuecomment-723044893 ## CI report: * cfd0be434e9ef7b045dc0b418d68b1ad0665d186 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9252) * 873f52f02d69a32b1f78822fa6e0fe850833807f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9291) * 292f7debbab2b02b317b2578421edb9eed5cdb48 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9294) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] carp84 commented on pull request #13977: [Mirror] [FLINK-19238] Sanity check for RocksDB arena block size
carp84 commented on pull request #13977: URL: https://github.com/apache/flink/pull/13977#issuecomment-723542250 The pre-commit build with 39cc0db was failed due to FLINK-19882, rerun to get a green build. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19882) E2E: SQLClientHBaseITCase crash
[ https://issues.apache.org/jira/browse/FLINK-19882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227929#comment-17227929 ] Yu Li commented on FLINK-19882: --- Another instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9267=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > E2E: SQLClientHBaseITCase crash > --- > > Key: FLINK-19882 > URL: https://issues.apache.org/jira/browse/FLINK-19882 > Project: Flink > Issue Type: Test > Components: Connectors / HBase >Reporter: Jingsong Lee >Priority: Critical > Labels: test-stability > Fix For: 1.12.0 > > > INSTANCE: > [https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/8563/logs/141] > {code:java} > 2020-10-29T09:43:24.0088180Z [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test (end-to-end-tests) > on project flink-end-to-end-tests-hbase: There are test failures. > 2020-10-29T09:43:24.0088792Z [ERROR] > 2020-10-29T09:43:24.0089518Z [ERROR] Please refer to > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire-reports > for the individual test results. > 2020-10-29T09:43:24.0090427Z [ERROR] Please refer to dump files (if any > exist) [date].dump, [date]-jvmRun[N].dump and [date].dumpstream. > 2020-10-29T09:43:24.0090914Z [ERROR] The forked VM terminated without > properly saying goodbye. VM crash or System.exit called? > 2020-10-29T09:43:24.0093105Z [ERROR] Command was /bin/sh -c cd > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target > && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m > -Dmvn.forkNumber=2 -XX:+UseG1GC -jar > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire/surefirebooter6795869883612750001.jar > > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire > 2020-10-29T09-34-47_778-jvmRun2 surefire2269050977160717631tmp > surefire_67897497331523564186tmp > 2020-10-29T09:43:24.0094488Z [ERROR] Error occurred in starting fork, check > output in log > 2020-10-29T09:43:24.0094797Z [ERROR] Process Exit Code: 143 > 2020-10-29T09:43:24.0095033Z [ERROR] Crashed tests: > 2020-10-29T09:43:24.0095321Z [ERROR] > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase > 2020-10-29T09:43:24.0095828Z [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: The forked VM > terminated without properly saying goodbye. VM crash or System.exit called? > 2020-10-29T09:43:24.0097838Z [ERROR] Command was /bin/sh -c cd > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target > && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m > -Dmvn.forkNumber=2 -XX:+UseG1GC -jar > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire/surefirebooter6795869883612750001.jar > > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire > 2020-10-29T09-34-47_778-jvmRun2 surefire2269050977160717631tmp > surefire_67897497331523564186tmp > 2020-10-29T09:43:24.0098966Z [ERROR] Error occurred in starting fork, check > output in log > 2020-10-29T09:43:24.0099266Z [ERROR] Process Exit Code: 143 > 2020-10-29T09:43:24.0099502Z [ERROR] Crashed tests: > 2020-10-29T09:43:24.0099789Z [ERROR] > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase > 2020-10-29T09:43:24.0100331Z [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.fork(ForkStarter.java:669) > 2020-10-29T09:43:24.0100883Z [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:282) > 2020-10-29T09:43:24.0101774Z [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:245) > 2020-10-29T09:43:24.0102360Z [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183) > 2020-10-29T09:43:24.0103004Z [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011) > 2020-10-29T09:43:24.0103737Z [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857) > 2020-10-29T09:43:24.0104301Z [ERROR] at > org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132) > 2020-10-29T09:43:24.0104828Z [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) > 2020-10-29T09:43:24.0105334Z [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) > 2020-10-29T09:43:24.0105826Z [ERROR] at >
[GitHub] [flink] carp84 commented on pull request #13977: [Mirror] [FLINK-19238] Sanity check for RocksDB arena block size
carp84 commented on pull request #13977: URL: https://github.com/apache/flink/pull/13977#issuecomment-723542122 @flinkbot run azure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
flinkbot edited a comment on pull request #13963: URL: https://github.com/apache/flink/pull/13963#issuecomment-723044893 ## CI report: * cfd0be434e9ef7b045dc0b418d68b1ad0665d186 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9252) * 873f52f02d69a32b1f78822fa6e0fe850833807f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9291) * 292f7debbab2b02b317b2578421edb9eed5cdb48 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-19990) MultipleInputNodeCreationProcessor#isChainableSource should consider DataStreamScanProvider
[ https://issues.apache.org/jira/browse/FLINK-19990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-19990. Fix Version/s: 1.12.0 Assignee: Caizhi Weng Resolution: Fixed master (1.12): 83174261b33f942f6fd7475d7115ccd430f27bab > MultipleInputNodeCreationProcessor#isChainableSource should consider > DataStreamScanProvider > --- > > Key: FLINK-19990 > URL: https://issues.apache.org/jira/browse/FLINK-19990 > Project: Flink > Issue Type: Bug >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > {{MultipleInputNodeCreationProcessor#isChainableSource}} now only considers > {{SourceProvider}}. However {{DataStreamScanProvider}} providing > {{DataStream}} with {{SourceTransformation}} are also chainable sources, and > we need to take them into consideration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JingsongLi merged pull request #13942: [FLINK-19990][table-planner-blink] MultipleInputNodeCreationProcessor#isChainableSource now considers DataStreamScanProvider
JingsongLi merged pull request #13942: URL: https://github.com/apache/flink/pull/13942 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20045) ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval failed with "TimeoutException: Contender was not elected as the leader within 200000ms"
[ https://issues.apache.org/jira/browse/FLINK-20045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227925#comment-17227925 ] Yang Wang commented on FLINK-20045: --- I am having a look on this failed test. > ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval failed with > "TimeoutException: Contender was not elected as the leader within 20ms" > > > Key: FLINK-20045 > URL: https://issues.apache.org/jira/browse/FLINK-20045 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9251=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0 > {code} > 2020-11-07T10:34:07.5063203Z [ERROR] > testZooKeeperLeaderElectionRetrieval(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest) > Time elapsed: 202.445 s <<< ERROR! > 2020-11-07T10:34:07.5064331Z java.util.concurrent.TimeoutException: Contender > was not elected as the leader within 20ms > 2020-11-07T10:34:07.5064946Z at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:153) > 2020-11-07T10:34:07.5065762Z at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:139) > 2020-11-07T10:34:07.5066565Z at > org.apache.flink.runtime.leaderelection.TestingLeaderBase.waitForLeader(TestingLeaderBase.java:48) > 2020-11-07T10:34:07.5067185Z at > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval(ZooKeeperLeaderElectionTest.java:144) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13942: [FLINK-19990][table-planner-blink] MultipleInputNodeCreationProcessor#isChainableSource now considers DataStreamScanProvider
flinkbot edited a comment on pull request #13942: URL: https://github.com/apache/flink/pull/13942#issuecomment-722358038 ## CI report: * c2172fbea08608c4d073643da6c3d23cf895fa27 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9287) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-20047) DefaultLeaderRetrievalService should only notify the LeaderRetrievalListener when leader truly changed
Yang Wang created FLINK-20047: - Summary: DefaultLeaderRetrievalService should only notify the LeaderRetrievalListener when leader truly changed Key: FLINK-20047 URL: https://issues.apache.org/jira/browse/FLINK-20047 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Yang Wang Fix For: 1.12.0 Currently, in {{DefaultLeaderRetrievalService}} we are notifying the {{LeaderRetrievalListener}} even though the leader is not truly changed. In Kubernetes HA service, we could get lots of leader information notification events with same leader. Because the ConfigMap is updated periodically. We should filter out these useless events and do not notify the {{LeaderRetrievalListener}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] curcur commented on a change in pull request #13880: [FLINK-19693][runtime] Downstream Failover for Approximate Local Recovery
curcur commented on a change in pull request #13880: URL: https://github.com/apache/flink/pull/13880#discussion_r519263971 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph graph) { } else { graph.setStateBackend(stateBackend); graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); + graph.setAllVerticesInSameSlotSharingGroupByDefault(true); graph.setScheduleMode(ScheduleMode.EAGER); + + if (checkpointConfig.isApproximateLocalRecoveryEnabled()) { + checkApproximateLocalRecoveryCompatibility(); + graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE); + } else { + graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); + } } } + private void checkApproximateLocalRecoveryCompatibility() { + checkState( + !checkpointConfig.isUnalignedCheckpointsEnabled(), + "Approximate Local Recovery and Unaligned Checkpoint can not be used together yet"); Review comment: I will also add a test for this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on a change in pull request #13880: [FLINK-19693][runtime] Downstream Failover for Approximate Local Recovery
curcur commented on a change in pull request #13880: URL: https://github.com/apache/flink/pull/13880#discussion_r519263840 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph graph) { } else { graph.setStateBackend(stateBackend); graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); + graph.setAllVerticesInSameSlotSharingGroupByDefault(true); graph.setScheduleMode(ScheduleMode.EAGER); + + if (checkpointConfig.isApproximateLocalRecoveryEnabled()) { + checkApproximateLocalRecoveryCompatibility(); + graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE); + } else { + graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); + } } } + private void checkApproximateLocalRecoveryCompatibility() { + checkState( + !checkpointConfig.isUnalignedCheckpointsEnabled(), + "Approximate Local Recovery and Unaligned Checkpoint can not be used together yet"); Review comment: I've tried out different ways to do the check. It is slightly different from the unaligned checkpoint check, because the PipelinedRegionScheduler is configured in the JobMaster level, and approximate local recovery is configured at the job level. The check should happen in the same place where either PIPELINED_REGION_SCHEDULING or LEGACY_SCHEDULING is decided, where the type SchedulingStrategyFactory is decided. 1. In JobGraph, the most reasonable place to put the config "isApproximateLocalRecoveryEnabled" seems to be `JobCheckpointingSettings#CheckpointCoordinatorConfiguration`, similar to unaligned checkpoint's config. However `CheckpointCoordinatorConfiguration` as its name, is for CheckpointCoordinator and will be serialized to CheckpointCoordinator. But in fact, CheckpointCoordinator does not need `isApproximateLocalRecoveryEnabled` for anything, and it breaks a lot of tests, so, at this point, it is probably the best place to put. 2. So I put `isApproximateLocalRecoveryEnabled` in a similar place as `scheduleMode` in JobGraph, and will be removed together with `scheduleMode` later. This flag is only used to make sure ApproximateLocalRecovery is not used together with JobManagerOptions.SCHEDULING_STRATEGY to region 3. If JobManagerOptions.SCHEDULING_STRATEGY is set to legacy, EAGER strategy is enforced in `StreamGraphGenerator#configureStreamGraph` ``` } else { graph.setStateBackend(stateBackend); graph.setScheduleMode(ScheduleMode.EAGER); if (checkpointConfig.isApproximateLocalRecoveryEnabled()) { checkApproximateLocalRecoveryCompatibility(); graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE); } else { graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on a change in pull request #13880: [FLINK-19693][runtime] Downstream Failover for Approximate Local Recovery
curcur commented on a change in pull request #13880: URL: https://github.com/apache/flink/pull/13880#discussion_r519263840 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph graph) { } else { graph.setStateBackend(stateBackend); graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); + graph.setAllVerticesInSameSlotSharingGroupByDefault(true); graph.setScheduleMode(ScheduleMode.EAGER); + + if (checkpointConfig.isApproximateLocalRecoveryEnabled()) { + checkApproximateLocalRecoveryCompatibility(); + graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE); + } else { + graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); + } } } + private void checkApproximateLocalRecoveryCompatibility() { + checkState( + !checkpointConfig.isUnalignedCheckpointsEnabled(), + "Approximate Local Recovery and Unaligned Checkpoint can not be used together yet"); Review comment: I've tried out different ways to do the check. It is slightly different from the unaligned checkpoint check, because the PipelinedRegionScheduler is configured in the JobMaster level, and approximate local recovery is configured at the job level. The check should happen in the same place where either PIPELINED_REGION_SCHEDULING or LEGACY_SCHEDULING is decided, where the type SchedulingStrategyFactory is decided. 1. In JobGraph, the most reasonable place to put the config "isApproximateLocalRecoveryEnabled" seems to be `JobCheckpointingSettings#CheckpointCoordinatorConfiguration`, similar to unaligned checkpoint's config. However `CheckpointCoordinatorConfiguration` as its name, is for CheckpointCoordinator and will be serialized to CheckpointCoordinator. But in fact, CheckpointCoordinator does not need `isApproximateLocalRecoveryEnabled` for anything, and it breaks a lot of tests, so, at this point, it is probably the best place to put. 2. So I put `isApproximateLocalRecoveryEnabled` in a similar place as `scheduleMode` in JobGraph, and will be removed together with `scheduleMode` later. This flag is only used to make sure ApproximateLocalRecovery is not used together with JobManagerOptions.SCHEDULING_STRATEGY to region 3. If JobManagerOptions.SCHEDULING_STRATEGY is set to legacy, EAGER strategy is enforced in `StreamGraphGenerator#configureStreamGraph` ``` } else { graph.setStateBackend(stateBackend); graph.setScheduleMode(ScheduleMode.EAGER); if (checkpointConfig.isApproximateLocalRecoveryEnabled()) { checkApproximateLocalRecoveryCompatibility(); graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE); } else { graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
flinkbot edited a comment on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-706309837 ## CI report: * 95a2052bdb6586122683e9531f157230ec75cd37 UNKNOWN * a9439ddaafd89e49968c1f17f0f02f21293b8343 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9270) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9285) * 8de0f73bc4abfc12f7996a26b7753431fcf885d3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9292) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
flinkbot edited a comment on pull request #13963: URL: https://github.com/apache/flink/pull/13963#issuecomment-723044893 ## CI report: * cfd0be434e9ef7b045dc0b418d68b1ad0665d186 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9252) * 873f52f02d69a32b1f78822fa6e0fe850833807f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9291) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on a change in pull request #13880: [FLINK-19693][runtime] Downstream Failover for Approximate Local Recovery
curcur commented on a change in pull request #13880: URL: https://github.com/apache/flink/pull/13880#discussion_r519257405 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph graph) { } else { graph.setStateBackend(stateBackend); graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); + graph.setAllVerticesInSameSlotSharingGroupByDefault(true); Review comment: It seems coming from a code merge (for sorted merge shuffle), but for some reason, it was removed in a later version. It may be counted as my change since I manually resolve somehow. Thanks for catching this. I will remove it! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on a change in pull request #13880: [FLINK-19693][runtime] Downstream Failover for Approximate Local Recovery
curcur commented on a change in pull request #13880: URL: https://github.com/apache/flink/pull/13880#discussion_r519257405 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph graph) { } else { graph.setStateBackend(stateBackend); graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); + graph.setAllVerticesInSameSlotSharingGroupByDefault(true); Review comment: It seems coming from a code merge (for sorted merge shuffle), but for some reason, it was removed in a later version. Thanks for catching this. I will remove it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
flinkbot edited a comment on pull request #13963: URL: https://github.com/apache/flink/pull/13963#issuecomment-723044893 ## CI report: * cfd0be434e9ef7b045dc0b418d68b1ad0665d186 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9252) * 873f52f02d69a32b1f78822fa6e0fe850833807f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lirui-apache commented on pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
lirui-apache commented on pull request #13963: URL: https://github.com/apache/flink/pull/13963#issuecomment-723530163 Thanks @StephanEwen and @JingsongLi for the review. I have updated and addressed your comments. Please have another look, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
lirui-apache commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519256075 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.event.RequestSplitEvent; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.PendingSplitsCheckpoint; +import org.apache.flink.connector.file.src.assigners.FileSplitAssigner; +import org.apache.flink.connectors.hive.read.HiveSourceSplit; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.filesystem.ContinuousPartitionFetcher; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A continuously monitoring {@link SplitEnumerator} for hive source. + */ +public class ContinuousHiveSplitEnumerator> implements SplitEnumerator> { + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class); + + private final SplitEnumeratorContext enumeratorContext; + private final LinkedHashMap readersAwaitingSplit; + private final FileSplitAssigner splitAssigner; + private final long discoveryInterval; + + private final JobConf jobConf; + private final ObjectPath tablePath; + + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + + private final ReadWriteLock stateLock = new ReentrantReadWriteLock(); + // the maximum partition read offset seen so far + private volatile T currentReadOffset; + // the partitions that have been processed for a given read offset + private final Set> seenPartitions; + + public ContinuousHiveSplitEnumerator( + SplitEnumeratorContext enumeratorContext, + T currentReadOffset, + Collection> seenPartitions, + FileSplitAssigner splitAssigner, + long discoveryInterval, + JobConf jobConf, + ObjectPath tablePath, + ContinuousPartitionFetcher fetcher, + HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext) { + this.enumeratorContext = enumeratorContext; + this.currentReadOffset = currentReadOffset; + this.seenPartitions = new HashSet<>(seenPartitions); + this.splitAssigner = splitAssigner; + this.discoveryInterval = discoveryInterval; + this.jobConf = jobConf; + this.tablePath = tablePath; + this.fetcher = fetcher; + this.fetcherContext = fetcherContext; + readersAwaitingSplit = new LinkedHashMap<>(); + } + + @Override + public void start() { + try { + fetcherContext.open(); + enumeratorContext.callAsync( + this::monitorAndGetSplits, + this::handleNewSplits, + discoveryInterval, +
[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
lirui-apache commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519256019 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java ## @@ -46,29 +58,98 @@ private static final long serialVersionUID = 1L; + private final JobConfWrapper jobConfWrapper; + private final List partitionKeys; + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + private final ObjectPath tablePath; + HiveSource( JobConf jobConf, + ObjectPath tablePath, CatalogTable catalogTable, List partitions, @Nullable Long limit, String hiveVersion, boolean useMapRedReader, - boolean isStreamingSource, + @Nullable ContinuousEnumerationSettings continuousEnumerationSettings, Review comment: I added the builder, but the builder constructor still takes quite a few parameters. That's because the BulkFormat needs to be created in the constructor. And since BulkFormat is determined during construction, it makes no sense to allow modifying `tablePath` or `partitions` later through the builder. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
lirui-apache commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519255792 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java ## @@ -46,29 +58,98 @@ private static final long serialVersionUID = 1L; + private final JobConfWrapper jobConfWrapper; + private final List partitionKeys; + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + private final ObjectPath tablePath; + HiveSource( JobConf jobConf, + ObjectPath tablePath, CatalogTable catalogTable, List partitions, @Nullable Long limit, String hiveVersion, boolean useMapRedReader, - boolean isStreamingSource, + @Nullable ContinuousEnumerationSettings continuousEnumerationSettings, + ContinuousPartitionFetcher fetcher, + HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext, RowType producedRowType) { super( new org.apache.flink.core.fs.Path[1], new HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)), - DEFAULT_SPLIT_ASSIGNER, + continuousEnumerationSettings == null ? DEFAULT_SPLIT_ASSIGNER : SimpleSplitAssigner::new, Review comment: I think ideally we need to implement an assigner that assigns splits in order, but is also locality-aware. Guess we can leave that for the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19699) PrometheusReporterEndToEndITCase crashes with exit code 143
[ https://issues.apache.org/jira/browse/FLINK-19699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227911#comment-17227911 ] Zhu Zhu commented on FLINK-19699: - Another instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9256=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > PrometheusReporterEndToEndITCase crashes with exit code 143 > --- > > Key: FLINK-19699 > URL: https://issues.apache.org/jira/browse/FLINK-19699 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Robert Metzger >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.12.0 > > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7814=logs=68a897ab-3047-5660-245a-cce8f83859f6=16ca2cca-2f63-5cce-12d2-d519b930a729] > {code} > 2020-10-18T23:46:04.9667443Z [ERROR] The forked VM terminated without > properly saying goodbye. VM crash or System.exit called? > 2020-10-18T23:46:04.9669237Z [ERROR] Command was /bin/sh -c cd > /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target > && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m > -Dmvn.forkNumber=2 -XX:+UseG1GC -jar > /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire/surefirebooter6797466627443523305.jar > > /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire > 2020-10-18T23-44-09_467-jvmRun2 surefire930806459376622178tmp > surefire_41970585275084524978tmp > 2020-10-18T23:46:04.9670440Z [ERROR] Error occurred in starting fork, check > output in log > 2020-10-18T23:46:04.9671283Z [ERROR] Process Exit Code: 143 > 2020-10-18T23:46:04.9671614Z [ERROR] Crashed tests: > 2020-10-18T23:46:04.9672025Z [ERROR] > org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase > 2020-10-18T23:46:04.9672649Z [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: The forked VM > terminated without properly saying goodbye. VM crash or System.exit called? > 2020-10-18T23:46:04.9674834Z [ERROR] Command was /bin/sh -c cd > /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target > && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m > -Dmvn.forkNumber=2 -XX:+UseG1GC -jar > /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire/surefirebooter6797466627443523305.jar > > /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire > 2020-10-18T23-44-09_467-jvmRun2 surefire930806459376622178tmp > surefire_41970585275084524978tmp > 2020-10-18T23:46:04.9676153Z [ERROR] Error occurred in starting fork, check > output in log > 2020-10-18T23:46:04.9676556Z [ERROR] Process Exit Code: 143 > 2020-10-18T23:46:04.9676882Z [ERROR] Crashed tests: > 2020-10-18T23:46:04.9677288Z [ERROR] > org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase > 2020-10-18T23:46:04.9677827Z [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.fork(ForkStarter.java:669) > 2020-10-18T23:46:04.9678408Z [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:282) > 2020-10-18T23:46:04.9678965Z [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:245) > 2020-10-18T23:46:04.9679575Z [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183) > 2020-10-18T23:46:04.9680983Z [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011) > 2020-10-18T23:46:04.9681749Z [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857) > 2020-10-18T23:46:04.9682246Z [ERROR] at > org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132) > 2020-10-18T23:46:04.9682728Z [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) > 2020-10-18T23:46:04.9683179Z [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) > 2020-10-18T23:46:04.9683609Z [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) > 2020-10-18T23:46:04.9684102Z [ERROR] at > org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116) > 2020-10-18T23:46:04.9684639Z [ERROR] at >
[jira] [Commented] (FLINK-19882) E2E: SQLClientHBaseITCase crash
[ https://issues.apache.org/jira/browse/FLINK-19882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227910#comment-17227910 ] Zhu Zhu commented on FLINK-19882: - Another instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9268=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > E2E: SQLClientHBaseITCase crash > --- > > Key: FLINK-19882 > URL: https://issues.apache.org/jira/browse/FLINK-19882 > Project: Flink > Issue Type: Test > Components: Connectors / HBase >Reporter: Jingsong Lee >Priority: Critical > Labels: test-stability > Fix For: 1.12.0 > > > INSTANCE: > [https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/8563/logs/141] > {code:java} > 2020-10-29T09:43:24.0088180Z [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test (end-to-end-tests) > on project flink-end-to-end-tests-hbase: There are test failures. > 2020-10-29T09:43:24.0088792Z [ERROR] > 2020-10-29T09:43:24.0089518Z [ERROR] Please refer to > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire-reports > for the individual test results. > 2020-10-29T09:43:24.0090427Z [ERROR] Please refer to dump files (if any > exist) [date].dump, [date]-jvmRun[N].dump and [date].dumpstream. > 2020-10-29T09:43:24.0090914Z [ERROR] The forked VM terminated without > properly saying goodbye. VM crash or System.exit called? > 2020-10-29T09:43:24.0093105Z [ERROR] Command was /bin/sh -c cd > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target > && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m > -Dmvn.forkNumber=2 -XX:+UseG1GC -jar > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire/surefirebooter6795869883612750001.jar > > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire > 2020-10-29T09-34-47_778-jvmRun2 surefire2269050977160717631tmp > surefire_67897497331523564186tmp > 2020-10-29T09:43:24.0094488Z [ERROR] Error occurred in starting fork, check > output in log > 2020-10-29T09:43:24.0094797Z [ERROR] Process Exit Code: 143 > 2020-10-29T09:43:24.0095033Z [ERROR] Crashed tests: > 2020-10-29T09:43:24.0095321Z [ERROR] > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase > 2020-10-29T09:43:24.0095828Z [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: The forked VM > terminated without properly saying goodbye. VM crash or System.exit called? > 2020-10-29T09:43:24.0097838Z [ERROR] Command was /bin/sh -c cd > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target > && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m > -Dmvn.forkNumber=2 -XX:+UseG1GC -jar > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire/surefirebooter6795869883612750001.jar > > /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire > 2020-10-29T09-34-47_778-jvmRun2 surefire2269050977160717631tmp > surefire_67897497331523564186tmp > 2020-10-29T09:43:24.0098966Z [ERROR] Error occurred in starting fork, check > output in log > 2020-10-29T09:43:24.0099266Z [ERROR] Process Exit Code: 143 > 2020-10-29T09:43:24.0099502Z [ERROR] Crashed tests: > 2020-10-29T09:43:24.0099789Z [ERROR] > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase > 2020-10-29T09:43:24.0100331Z [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.fork(ForkStarter.java:669) > 2020-10-29T09:43:24.0100883Z [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:282) > 2020-10-29T09:43:24.0101774Z [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:245) > 2020-10-29T09:43:24.0102360Z [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183) > 2020-10-29T09:43:24.0103004Z [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011) > 2020-10-29T09:43:24.0103737Z [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857) > 2020-10-29T09:43:24.0104301Z [ERROR] at > org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132) > 2020-10-29T09:43:24.0104828Z [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) > 2020-10-29T09:43:24.0105334Z [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) > 2020-10-29T09:43:24.0105826Z [ERROR] at >
[GitHub] [flink] flinkbot edited a comment on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
flinkbot edited a comment on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-706309837 ## CI report: * 95a2052bdb6586122683e9531f157230ec75cd37 UNKNOWN * a9439ddaafd89e49968c1f17f0f02f21293b8343 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9270) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9285) * 8de0f73bc4abfc12f7996a26b7753431fcf885d3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format
flinkbot edited a comment on pull request #13296: URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193 ## CI report: * c266341c63cc4cdc28a6585a82294710fd8ebbc5 UNKNOWN * 3eb68994c05caed91cef945800d7cb586d617663 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9286) * 1d4db76b21faf3bb2835be6e529f1d2478237272 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9289) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-20037) Fix the Javadoc of TableEnvironment#fromValues(AbstractDataType, Object...)
[ https://issues.apache.org/jira/browse/FLINK-20037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-20037. --- Fix Version/s: 1.12.0 Resolution: Fixed Fixed in 1.12.0: eeb69fc9694489f200b94f70edcfc52ba0343718 > Fix the Javadoc of TableEnvironment#fromValues(AbstractDataType, Object...) > --- > > Key: FLINK-20037 > URL: https://issues.apache.org/jira/browse/FLINK-20037 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.11.1 >Reporter: yandufeng >Assignee: yandufeng >Priority: Trivial > Labels: pull-request-available, starter > Fix For: 1.12.0 > > > there is a comment problem in fromValues(AbstractDataType rowType, > Object... values) method of TableEnvironment, i think second column name is > "name" not "f1". this is my first issue, if there is a problem, please > understand. > * Examples: > * {@code > * tEnv.fromValues( > * DataTypes.ROW( > * DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)), > * DataTypes.FIELD("name", DataTypes.STRING()) > * ), > * row(1, "ABC"), > * row(2L, "ABCDE") > * ) > * } > * will produce a Table with a schema as follows: > * {@code > * root > * |-- id: DECIMAL(10, 2) > * |-- f1: STRING > * } -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wuchong merged pull request #13976: [FLINK-20037][Table SQL / API]update comment problem in fromValues(AbstractDataType rowType, Obj…
wuchong merged pull request #13976: URL: https://github.com/apache/flink/pull/13976 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format
flinkbot edited a comment on pull request #13296: URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193 ## CI report: * c266341c63cc4cdc28a6585a82294710fd8ebbc5 UNKNOWN * 3eb68994c05caed91cef945800d7cb586d617663 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9286) * 1d4db76b21faf3bb2835be6e529f1d2478237272 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20037) Fix the Javadoc of TableEnvironment#fromValues(AbstractDataType, Object...)
[ https://issues.apache.org/jira/browse/FLINK-20037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-20037: Summary: Fix the Javadoc of TableEnvironment#fromValues(AbstractDataType, Object...) (was: there is a comment problem in fromValues(AbstractDataType rowType, Object... values) method of TableEnvironment) > Fix the Javadoc of TableEnvironment#fromValues(AbstractDataType, Object...) > --- > > Key: FLINK-20037 > URL: https://issues.apache.org/jira/browse/FLINK-20037 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.11.1 >Reporter: yandufeng >Assignee: yandufeng >Priority: Trivial > Labels: pull-request-available, starter > > there is a comment problem in fromValues(AbstractDataType rowType, > Object... values) method of TableEnvironment, i think second column name is > "name" not "f1". this is my first issue, if there is a problem, please > understand. > * Examples: > * {@code > * tEnv.fromValues( > * DataTypes.ROW( > * DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)), > * DataTypes.FIELD("name", DataTypes.STRING()) > * ), > * row(1, "ABC"), > * row(2L, "ABCDE") > * ) > * } > * will produce a Table with a schema as follows: > * {@code > * root > * |-- id: DECIMAL(10, 2) > * |-- f1: STRING > * } -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-20037) there is a comment problem in fromValues(AbstractDataType rowType, Object... values) method of TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-20037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-20037: --- Assignee: yandufeng > there is a comment problem in fromValues(AbstractDataType rowType, > Object... values) method of TableEnvironment > --- > > Key: FLINK-20037 > URL: https://issues.apache.org/jira/browse/FLINK-20037 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.11.1 >Reporter: yandufeng >Assignee: yandufeng >Priority: Trivial > Labels: pull-request-available, starter > > there is a comment problem in fromValues(AbstractDataType rowType, > Object... values) method of TableEnvironment, i think second column name is > "name" not "f1". this is my first issue, if there is a problem, please > understand. > * Examples: > * {@code > * tEnv.fromValues( > * DataTypes.ROW( > * DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)), > * DataTypes.FIELD("name", DataTypes.STRING()) > * ), > * row(1, "ABC"), > * row(2L, "ABCDE") > * ) > * } > * will produce a Table with a schema as follows: > * {@code > * root > * |-- id: DECIMAL(10, 2) > * |-- f1: STRING > * } -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-20041) Support Watermark push down for kafka connector
[ https://issues.apache.org/jira/browse/FLINK-20041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-20041. --- Fix Version/s: 1.12.0 Assignee: Shengkai Fang Resolution: Fixed Implemented in master (1.12.0): 24d612bd4337f1443f379fa3bac2b6050b3d32c8 > Support Watermark push down for kafka connector > --- > > Key: FLINK-20041 > URL: https://issues.apache.org/jira/browse/FLINK-20041 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka, Table SQL / API >Affects Versions: 1.12.0 >Reporter: Shengkai Fang >Assignee: Shengkai Fang >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > Support watermark push down for kafka connector. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wuchong merged pull request #13975: [FLINK-20041][connector kafka] Support watermark push down for kafka …
wuchong merged pull request #13975: URL: https://github.com/apache/flink/pull/13975 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13942: [FLINK-19990][table-planner-blink] MultipleInputNodeCreationProcessor#isChainableSource now considers DataStreamScanProvider
flinkbot edited a comment on pull request #13942: URL: https://github.com/apache/flink/pull/13942#issuecomment-722358038 ## CI report: * ea8e4df517dcb1779d4959fc54917cbcac76c07f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9271) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9195) * c2172fbea08608c4d073643da6c3d23cf895fa27 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9287) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
lirui-apache commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519249864 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpointSerializer.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.connector.file.src.PendingSplitsCheckpoint; +import org.apache.flink.connector.file.src.PendingSplitsCheckpointSerializer; +import org.apache.flink.connectors.hive.read.HiveSourceSplit; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * SerDe for {@link ContinuousHivePendingSplitsCheckpoint}. + */ +public class ContinuousHivePendingSplitsCheckpointSerializer implements SimpleVersionedSerializer> { + + private static final int VERSION = 1; + + private final PendingSplitsCheckpointSerializer superSerDe; + + public ContinuousHivePendingSplitsCheckpointSerializer(SimpleVersionedSerializer splitSerDe) { + superSerDe = new PendingSplitsCheckpointSerializer<>(splitSerDe); + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(PendingSplitsCheckpoint checkpoint) throws IOException { + Preconditions.checkArgument(checkpoint.getClass() == ContinuousHivePendingSplitsCheckpoint.class, + "Only supports %s", ContinuousHivePendingSplitsCheckpoint.class.getName()); + + ContinuousHivePendingSplitsCheckpoint hiveCheckpoint = (ContinuousHivePendingSplitsCheckpoint) checkpoint; + PendingSplitsCheckpoint superCP = PendingSplitsCheckpoint.fromCollectionSnapshot( + hiveCheckpoint.getSplits(), hiveCheckpoint.getAlreadyProcessedPaths()); + byte[] superBytes = superSerDe.serialize(superCP); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) { + outputStream.writeInt(superBytes.length); + outputStream.write(superBytes); + outputStream.writeObject(hiveCheckpoint.getCurrentReadOffset()); Review comment: Do you mean we need a SerDe interface for the read offset? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13942: [FLINK-19990][table-planner-blink] MultipleInputNodeCreationProcessor#isChainableSource now considers DataStreamScanProvider
flinkbot edited a comment on pull request #13942: URL: https://github.com/apache/flink/pull/13942#issuecomment-722358038 ## CI report: * ea8e4df517dcb1779d4959fc54917cbcac76c07f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9271) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9195) * c2172fbea08608c4d073643da6c3d23cf895fa27 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
flinkbot edited a comment on pull request #13845: URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990 ## CI report: * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN * 23323a7b983fac19fdd620b0cc82eace73cc587f UNKNOWN * d4c46ab021f7d464004ca5f58589b25a5334f147 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9284) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format
flinkbot edited a comment on pull request #13296: URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193 ## CI report: * c266341c63cc4cdc28a6585a82294710fd8ebbc5 UNKNOWN * 3eb68994c05caed91cef945800d7cb586d617663 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9286) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format
flinkbot edited a comment on pull request #13296: URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193 ## CI report: * c266341c63cc4cdc28a6585a82294710fd8ebbc5 UNKNOWN * c57d805bea826af82626be82c5487e7259d3eaf9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9262) * 3eb68994c05caed91cef945800d7cb586d617663 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
lirui-apache commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519247124 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.event.RequestSplitEvent; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.PendingSplitsCheckpoint; +import org.apache.flink.connector.file.src.assigners.FileSplitAssigner; +import org.apache.flink.connectors.hive.read.HiveSourceSplit; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.filesystem.ContinuousPartitionFetcher; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A continuously monitoring {@link SplitEnumerator} for hive source. + */ +public class ContinuousHiveSplitEnumerator> implements SplitEnumerator> { + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class); + + private final SplitEnumeratorContext enumeratorContext; + private final LinkedHashMap readersAwaitingSplit; + private final FileSplitAssigner splitAssigner; + private final long discoveryInterval; + + private final JobConf jobConf; + private final ObjectPath tablePath; + + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + + private final ReadWriteLock stateLock = new ReentrantReadWriteLock(); + // the maximum partition read offset seen so far + private volatile T currentReadOffset; + // the partitions that have been processed for a given read offset + private final Set> seenPartitions; + + public ContinuousHiveSplitEnumerator( + SplitEnumeratorContext enumeratorContext, + T currentReadOffset, + Collection> seenPartitions, + FileSplitAssigner splitAssigner, + long discoveryInterval, + JobConf jobConf, + ObjectPath tablePath, + ContinuousPartitionFetcher fetcher, + HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext) { + this.enumeratorContext = enumeratorContext; + this.currentReadOffset = currentReadOffset; + this.seenPartitions = new HashSet<>(seenPartitions); + this.splitAssigner = splitAssigner; + this.discoveryInterval = discoveryInterval; + this.jobConf = jobConf; + this.tablePath = tablePath; + this.fetcher = fetcher; + this.fetcherContext = fetcherContext; + readersAwaitingSplit = new LinkedHashMap<>(); + } + + @Override + public void start() { + try { + fetcherContext.open(); + enumeratorContext.callAsync( + this::monitorAndGetSplits, + this::handleNewSplits, + discoveryInterval, +
[GitHub] [flink] RocMarshal commented on pull request #13750: [FLINK-19394][docs-zh] Translate the 'Monitoring Checkpointing' page of 'Debugging & Monitoring' into Chinese
RocMarshal commented on pull request #13750: URL: https://github.com/apache/flink/pull/13750#issuecomment-723521553 @dianfu Could you help me to merge this PR if there is nothing inappropriate ? Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
lirui-apache commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519246308 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.event.RequestSplitEvent; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.PendingSplitsCheckpoint; +import org.apache.flink.connector.file.src.assigners.FileSplitAssigner; +import org.apache.flink.connectors.hive.read.HiveSourceSplit; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.filesystem.ContinuousPartitionFetcher; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A continuously monitoring {@link SplitEnumerator} for hive source. + */ +public class ContinuousHiveSplitEnumerator> implements SplitEnumerator> { + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class); + + private final SplitEnumeratorContext enumeratorContext; + private final LinkedHashMap readersAwaitingSplit; + private final FileSplitAssigner splitAssigner; + private final long discoveryInterval; + + private final JobConf jobConf; + private final ObjectPath tablePath; + + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + + private final ReadWriteLock stateLock = new ReentrantReadWriteLock(); Review comment: We do. Because the `monitorAndGetSplits` is scheduled asynchronously by `SplitEnumeratorContext::callAsync`. So there can be concurrent calls of, say, `monitorAndGetSplits` and `snapshotState`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20037) there is a comment problem in fromValues(AbstractDataType rowType, Object... values) method of TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-20037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227896#comment-17227896 ] yandufeng commented on FLINK-20037: --- Do i need to do something? https://github.com/apache/flink/pull/13976 > there is a comment problem in fromValues(AbstractDataType rowType, > Object... values) method of TableEnvironment > --- > > Key: FLINK-20037 > URL: https://issues.apache.org/jira/browse/FLINK-20037 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.11.1 >Reporter: yandufeng >Priority: Trivial > Labels: pull-request-available, starter > > there is a comment problem in fromValues(AbstractDataType rowType, > Object... values) method of TableEnvironment, i think second column name is > "name" not "f1". this is my first issue, if there is a problem, please > understand. > * Examples: > * {@code > * tEnv.fromValues( > * DataTypes.ROW( > * DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)), > * DataTypes.FIELD("name", DataTypes.STRING()) > * ), > * row(1, "ABC"), > * row(2L, "ABCDE") > * ) > * } > * will produce a Table with a schema as follows: > * {@code > * root > * |-- id: DECIMAL(10, 2) > * |-- f1: STRING > * } -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
flinkbot edited a comment on pull request #13845: URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990 ## CI report: * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN * fe94615bb4be9771757f98e93ff11e8d765231f2 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9282) * 23323a7b983fac19fdd620b0cc82eace73cc587f UNKNOWN * d4c46ab021f7d464004ca5f58589b25a5334f147 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9284) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on pull request #13920: [FLINK-19743] Add metric definitions in FLIP-33 and report some of them by default.
becketqin commented on pull request #13920: URL: https://github.com/apache/flink/pull/13920#issuecomment-723520262 > A thought on performance: There are some metrics that use volatile variables. > > So far we always avoided that. Even "infrequently accessed" fields, like watermarks, are quite frequent in some use cases. > And a single volatile write has a very significant overhead in a high-performance pipeline. It is the number one thing we try to avoid, because it breaks the processor pipeline and puts pressure on the memory bus and processor's cache coherency mechanism. For the metrics, performance (least impact on main data paths) should have clear precedence. > > For metrics, we don't need a hard "happens before" relationship or any "immediate visibility". If the visibility is some 100s of milliseconds later (which would be a lot), it is no issue for the metrics at all, which are periodically reported every few seconds. > > The case where the JIT inlines the variable and eliminates cross-thread visibility entirely is AFAIK not realistic here. And if we ever find that the metric gets not reported, we can still use an occasional volatile write (like we do occasional volatile exception checks, every 1000 records, in the network stack). Thanks for the comments and suggestions. I was actually struggling on this a little bit. The current implementation is based on the following assumptions: * The volatile read is usually as cheap as a normal variable read unless there is write contention. Modern CPUs can achieve a cheap read if there isn't a contention on the shared variable. The CPU cache line usually can cache the shared variable and just read from it. The reads only go to main memory if the value has been updated by another core, in that case the reading CPU's cache line is invalidated. * In the current implementation, the main thread performs a read on a volatile variable of `shouldUpdateCurrentEventTimeLag` for every record. The `shouldUpdateCurrentEventTimeLag` is set to true by a separate thread every second. If the main thread sees the value becomes true, it will update the volatile variable of `currentEmitEventTimeLag` with `System.currentTimeMillis() - timestamp`. So this is a volatile variable write and potentially a system call that the main thread performs once per second. * The `watermark` update is infrequent. From your comment this assumption seems to optimistic. So we can probably also update the `currentWatermark` once per second. Another part that took me some time to think about is whether we should use number-of-record-based interval v.s. time-based interval for metric reporting. In cases when there is a large throughput, these two approaches probably won't make any difference. But in case of low throughput, time-based-interval seems preferable. Imagine we emit record every 1000 records, but there is a low throughput stream that only processes one record every 5 seconds. In order to see the metrics get updated we need to wait for over an hour, which seems bad. I am curious do we have some experience that invalidates these assumptions? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
flinkbot edited a comment on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-706309837 ## CI report: * 95a2052bdb6586122683e9531f157230ec75cd37 UNKNOWN * a9439ddaafd89e49968c1f17f0f02f21293b8343 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9270) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9285) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19863) SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process failed due to timeout"
[ https://issues.apache.org/jira/browse/FLINK-19863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227894#comment-17227894 ] Dian Fu commented on FLINK-19863: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9281=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process > failed due to timeout" > --- > > Key: FLINK-19863 > URL: https://issues.apache.org/jira/browse/FLINK-19863 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Critical > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541=logs=91bf6583-3fb2-592f-e4d4-d79d79c3230a=3425d8ba-5f03-540a-c64b-51b8481bf7d6 > {code} > 00:50:02,589 [main] INFO > org.apache.flink.tests.util.flink.FlinkDistribution [] - Stopping > Flink cluster. > 00:50:04,106 [main] INFO > org.apache.flink.tests.util.flink.FlinkDistribution [] - Stopping > Flink cluster. > 00:50:04,741 [main] INFO > org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource [] - Backed up > logs to > /home/vsts/work/1/s/flink-end-to-end-tests/artifacts/flink-b3924665-1ac9-4309-8255-20f0dc94e7b9. > 00:50:04,788 [main] INFO > org.apache.flink.tests.util.hbase.LocalStandaloneHBaseResource [] - Stopping > HBase Cluster > 00:50:16,243 [main] ERROR > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase [] - > > Test testHBase[0: > hbase-version:1.4.3](org.apache.flink.tests.util.hbase.SQLClientHBaseITCase) > failed with: > java.io.IOException: Process failed due to timeout. > at > org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:130) > at > org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:108) > at > org.apache.flink.tests.util.flink.FlinkDistribution.submitSQLJob(FlinkDistribution.java:221) > at > org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource$StandaloneClusterController.submitSQLJob(LocalStandaloneFlinkResource.java:196) > at > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.executeSqlStatements(SQLClientHBaseITCase.java:215) > at > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.testHBase(SQLClientHBaseITCase.java:152) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20006) FileSinkITCase.testFileSink: The record 0 should occur 4 times, but only occurs 8time expected:<4> but was:<8>
[ https://issues.apache.org/jira/browse/FLINK-20006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227893#comment-17227893 ] Dian Fu commented on FLINK-20006: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=298e20ef-7951-5965-0e79-ea664ddc435e=b4cd3436-dbe8-556d-3bca-42f92c3cbf2f > FileSinkITCase.testFileSink: The record 0 should occur 4 times, but only > occurs 8time expected:<4> but was:<8> > --- > > Key: FLINK-20006 > URL: https://issues.apache.org/jira/browse/FLINK-20006 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9082=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf > {code} > 2020-11-05T13:31:16.7006473Z [ERROR] Tests run: 4, Failures: 1, Errors: 0, > Skipped: 0, Time elapsed: 5.565 s <<< FAILURE! - in > org.apache.flink.connector.file.sink.FileSinkITCase > 2020-11-05T13:31:16.7007237Z [ERROR] testFileSink[executionMode = STREAMING, > triggerFailover = true](org.apache.flink.connector.file.sink.FileSinkITCase) > Time elapsed: 0.548 s <<< FAILURE! > 2020-11-05T13:31:16.7007897Z java.lang.AssertionError: The record 0 should > occur 4 times, but only occurs 8time expected:<4> but was:<8> > 2020-11-05T13:31:16.7008317Z at org.junit.Assert.fail(Assert.java:88) > 2020-11-05T13:31:16.7008644Z at > org.junit.Assert.failNotEquals(Assert.java:834) > 2020-11-05T13:31:16.7008987Z at > org.junit.Assert.assertEquals(Assert.java:645) > 2020-11-05T13:31:16.7009392Z at > org.apache.flink.connector.file.sink.FileSinkITCase.checkResult(FileSinkITCase.java:218) > 2020-11-05T13:31:16.7009889Z at > org.apache.flink.connector.file.sink.FileSinkITCase.testFileSink(FileSinkITCase.java:132) > 2020-11-05T13:31:16.7010316Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20006) FileSinkITCase.testFileSink: The record 0 should occur 4 times, but only occurs 8time expected:<4> but was:<8>
[ https://issues.apache.org/jira/browse/FLINK-20006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227892#comment-17227892 ] Dian Fu commented on FLINK-20006: - Another instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=420bd9ec-164e-562e-8947-0dacde3cec91 {code} 2020-11-07T22:18:57.6776989Z [ERROR] testFileSink[triggerFailover = false](org.apache.flink.connector.file.sink.StreamingExecutionFileSinkITCase) Time elapsed: 2.773 s <<< FAILURE! 2020-11-07T22:18:57.6777584Z java.lang.AssertionError: expected:<2500> but was:<0> 2020-11-07T22:18:57.6777929Zat org.junit.Assert.fail(Assert.java:88) 2020-11-07T22:18:57.6778280Zat org.junit.Assert.failNotEquals(Assert.java:834) 2020-11-07T22:18:57.6778647Zat org.junit.Assert.assertEquals(Assert.java:645) 2020-11-07T22:18:57.6778997Zat org.junit.Assert.assertEquals(Assert.java:631) 2020-11-07T22:18:57.6779449Zat org.apache.flink.connector.file.sink.FileSinkITBase.checkResult(FileSinkITBase.java:152) 2020-11-07T22:18:57.6780189Zat org.apache.flink.connector.file.sink.FileSinkITBase.testFileSink(FileSinkITBase.java:104) {code} > FileSinkITCase.testFileSink: The record 0 should occur 4 times, but only > occurs 8time expected:<4> but was:<8> > --- > > Key: FLINK-20006 > URL: https://issues.apache.org/jira/browse/FLINK-20006 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9082=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf > {code} > 2020-11-05T13:31:16.7006473Z [ERROR] Tests run: 4, Failures: 1, Errors: 0, > Skipped: 0, Time elapsed: 5.565 s <<< FAILURE! - in > org.apache.flink.connector.file.sink.FileSinkITCase > 2020-11-05T13:31:16.7007237Z [ERROR] testFileSink[executionMode = STREAMING, > triggerFailover = true](org.apache.flink.connector.file.sink.FileSinkITCase) > Time elapsed: 0.548 s <<< FAILURE! > 2020-11-05T13:31:16.7007897Z java.lang.AssertionError: The record 0 should > occur 4 times, but only occurs 8time expected:<4> but was:<8> > 2020-11-05T13:31:16.7008317Z at org.junit.Assert.fail(Assert.java:88) > 2020-11-05T13:31:16.7008644Z at > org.junit.Assert.failNotEquals(Assert.java:834) > 2020-11-05T13:31:16.7008987Z at > org.junit.Assert.assertEquals(Assert.java:645) > 2020-11-05T13:31:16.7009392Z at > org.apache.flink.connector.file.sink.FileSinkITCase.checkResult(FileSinkITCase.java:218) > 2020-11-05T13:31:16.7009889Z at > org.apache.flink.connector.file.sink.FileSinkITCase.testFileSink(FileSinkITCase.java:132) > 2020-11-05T13:31:16.7010316Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (FLINK-20006) FileSinkITCase.testFileSink: The record 0 should occur 4 times, but only occurs 8time expected:<4> but was:<8>
[ https://issues.apache.org/jira/browse/FLINK-20006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reopened FLINK-20006: - > FileSinkITCase.testFileSink: The record 0 should occur 4 times, but only > occurs 8time expected:<4> but was:<8> > --- > > Key: FLINK-20006 > URL: https://issues.apache.org/jira/browse/FLINK-20006 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9082=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf > {code} > 2020-11-05T13:31:16.7006473Z [ERROR] Tests run: 4, Failures: 1, Errors: 0, > Skipped: 0, Time elapsed: 5.565 s <<< FAILURE! - in > org.apache.flink.connector.file.sink.FileSinkITCase > 2020-11-05T13:31:16.7007237Z [ERROR] testFileSink[executionMode = STREAMING, > triggerFailover = true](org.apache.flink.connector.file.sink.FileSinkITCase) > Time elapsed: 0.548 s <<< FAILURE! > 2020-11-05T13:31:16.7007897Z java.lang.AssertionError: The record 0 should > occur 4 times, but only occurs 8time expected:<4> but was:<8> > 2020-11-05T13:31:16.7008317Z at org.junit.Assert.fail(Assert.java:88) > 2020-11-05T13:31:16.7008644Z at > org.junit.Assert.failNotEquals(Assert.java:834) > 2020-11-05T13:31:16.7008987Z at > org.junit.Assert.assertEquals(Assert.java:645) > 2020-11-05T13:31:16.7009392Z at > org.apache.flink.connector.file.sink.FileSinkITCase.checkResult(FileSinkITCase.java:218) > 2020-11-05T13:31:16.7009889Z at > org.apache.flink.connector.file.sink.FileSinkITCase.testFileSink(FileSinkITCase.java:132) > 2020-11-05T13:31:16.7010316Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20046) StreamTableAggregateTests.test_map_view_iterate is instable
[ https://issues.apache.org/jira/browse/FLINK-20046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-20046: Labels: test-stability (was: ) > StreamTableAggregateTests.test_map_view_iterate is instable > --- > > Key: FLINK-20046 > URL: https://issues.apache.org/jira/browse/FLINK-20046 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=821b528f-1eed-5598-a3b4-7f748b13f261=4fad9527-b9a5-5015-1b70-8356e5c91490 > {code} > 2020-11-07T22:50:57.4180758Z ___ > StreamTableAggregateTests.test_map_view_iterate > 2020-11-07T22:50:57.4181301Z > 2020-11-07T22:50:57.4181965Z self = > testMethod=test_map_view_iterate> > 2020-11-07T22:50:57.4182348Z > 2020-11-07T22:50:57.4182535Z def test_map_view_iterate(self): > 2020-11-07T22:50:57.4182812Z test_iterate = > udaf(TestIterateAggregateFunction()) > 2020-11-07T22:50:57.4183320Z > self.t_env.get_config().set_idle_state_retention(datetime.timedelta(days=1)) > 2020-11-07T22:50:57.4183763Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4297555Z "python.fn-execution.bundle.size", > "2") > 2020-11-07T22:50:57.4297922Z # trigger the cache eviction in a bundle. > 2020-11-07T22:50:57.4308028Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4308653Z "python.state.cache-size", "2") > 2020-11-07T22:50:57.4308945Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4309382Z "python.map-state.read-cache-size", > "2") > 2020-11-07T22:50:57.4309676Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4310428Z "python.map-state.write-cache-size", > "2") > 2020-11-07T22:50:57.4310701Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4311130Z > "python.map-state.iterate-response-batch-size", "2") > 2020-11-07T22:50:57.4311361Z t = self.t_env.from_elements( > 2020-11-07T22:50:57.4311691Z [(1, 'Hi_', 'hi'), > 2020-11-07T22:50:57.4312004Z (1, 'Hi', 'hi'), > 2020-11-07T22:50:57.4312316Z (2, 'hello', 'hello'), > 2020-11-07T22:50:57.4312639Z (3, 'Hi_', 'hi'), > 2020-11-07T22:50:57.4312975Z (3, 'Hi', 'hi'), > 2020-11-07T22:50:57.4313285Z (4, 'hello', 'hello'), > 2020-11-07T22:50:57.4313609Z (5, 'Hi2_', 'hi'), > 2020-11-07T22:50:57.4313908Z (5, 'Hi2', 'hi'), > 2020-11-07T22:50:57.4314238Z (6, 'hello2', 'hello'), > 2020-11-07T22:50:57.4314558Z (7, 'Hi', 'hi'), > 2020-11-07T22:50:57.4315053Z (8, 'hello', 'hello'), > 2020-11-07T22:50:57.4315396Z (9, 'Hi2', 'hi'), > 2020-11-07T22:50:57.4315773Z (13, 'Hi3', 'hi')], ['a', 'b', 'c']) > 2020-11-07T22:50:57.4316023Z > self.t_env.create_temporary_view("source", t) > 2020-11-07T22:50:57.4316299Z table_with_retract_message = > self.t_env.sql_query( > 2020-11-07T22:50:57.4316615Z "select LAST_VALUE(b) as b, > LAST_VALUE(c) as c from source group by a") > 2020-11-07T22:50:57.4316919Z result = > table_with_retract_message.group_by(t.c) \ > 2020-11-07T22:50:57.4317197Z > .select(test_iterate(t.b).alias("a"), t.c) \ > 2020-11-07T22:50:57.4317619Z .select(col("a").get(0).alias("a"), > 2020-11-07T22:50:57.4318111Z col("a").get(1).alias("b"), > 2020-11-07T22:50:57.4318357Z col("a").get(2).alias("c"), > 2020-11-07T22:50:57.4318586Z col("a").get(3).alias("d"), > 2020-11-07T22:50:57.4318814Z t.c.alias("e")) > 2020-11-07T22:50:57.4319023Z assert_frame_equal( > 2020-11-07T22:50:57.4319208Z > result.to_pandas(), > 2020-11-07T22:50:57.4319408Z pd.DataFrame([ > 2020-11-07T22:50:57.4319872Z ["hello,hello2", "1,3", > 'hello:3,hello2:1', 2, "hello"], > 2020-11-07T22:50:57.4320398Z ["Hi,Hi2,Hi3", "1,2,3", > "Hi:3,Hi2:2,Hi3:1", 3, "hi"]], > 2020-11-07T22:50:57.4321047Z columns=['a', 'b', 'c', 'd', > 'e'])) > 2020-11-07T22:50:57.4321198Z > 2020-11-07T22:50:57.4321385Z pyflink/table/tests/test_aggregate.py:468: > 2020-11-07T22:50:57.4321648Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2020-11-07T22:50:57.4322040Z pyflink/table/table.py:807: in to_pandas >
[jira] [Commented] (FLINK-20046) StreamTableAggregateTests.test_map_view_iterate is instable
[ https://issues.apache.org/jira/browse/FLINK-20046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227891#comment-17227891 ] Dian Fu commented on FLINK-20046: - cc [~zhongwei] > StreamTableAggregateTests.test_map_view_iterate is instable > --- > > Key: FLINK-20046 > URL: https://issues.apache.org/jira/browse/FLINK-20046 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=821b528f-1eed-5598-a3b4-7f748b13f261=4fad9527-b9a5-5015-1b70-8356e5c91490 > {code} > 2020-11-07T22:50:57.4180758Z ___ > StreamTableAggregateTests.test_map_view_iterate > 2020-11-07T22:50:57.4181301Z > 2020-11-07T22:50:57.4181965Z self = > testMethod=test_map_view_iterate> > 2020-11-07T22:50:57.4182348Z > 2020-11-07T22:50:57.4182535Z def test_map_view_iterate(self): > 2020-11-07T22:50:57.4182812Z test_iterate = > udaf(TestIterateAggregateFunction()) > 2020-11-07T22:50:57.4183320Z > self.t_env.get_config().set_idle_state_retention(datetime.timedelta(days=1)) > 2020-11-07T22:50:57.4183763Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4297555Z "python.fn-execution.bundle.size", > "2") > 2020-11-07T22:50:57.4297922Z # trigger the cache eviction in a bundle. > 2020-11-07T22:50:57.4308028Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4308653Z "python.state.cache-size", "2") > 2020-11-07T22:50:57.4308945Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4309382Z "python.map-state.read-cache-size", > "2") > 2020-11-07T22:50:57.4309676Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4310428Z "python.map-state.write-cache-size", > "2") > 2020-11-07T22:50:57.4310701Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4311130Z > "python.map-state.iterate-response-batch-size", "2") > 2020-11-07T22:50:57.4311361Z t = self.t_env.from_elements( > 2020-11-07T22:50:57.4311691Z [(1, 'Hi_', 'hi'), > 2020-11-07T22:50:57.4312004Z (1, 'Hi', 'hi'), > 2020-11-07T22:50:57.4312316Z (2, 'hello', 'hello'), > 2020-11-07T22:50:57.4312639Z (3, 'Hi_', 'hi'), > 2020-11-07T22:50:57.4312975Z (3, 'Hi', 'hi'), > 2020-11-07T22:50:57.4313285Z (4, 'hello', 'hello'), > 2020-11-07T22:50:57.4313609Z (5, 'Hi2_', 'hi'), > 2020-11-07T22:50:57.4313908Z (5, 'Hi2', 'hi'), > 2020-11-07T22:50:57.4314238Z (6, 'hello2', 'hello'), > 2020-11-07T22:50:57.4314558Z (7, 'Hi', 'hi'), > 2020-11-07T22:50:57.4315053Z (8, 'hello', 'hello'), > 2020-11-07T22:50:57.4315396Z (9, 'Hi2', 'hi'), > 2020-11-07T22:50:57.4315773Z (13, 'Hi3', 'hi')], ['a', 'b', 'c']) > 2020-11-07T22:50:57.4316023Z > self.t_env.create_temporary_view("source", t) > 2020-11-07T22:50:57.4316299Z table_with_retract_message = > self.t_env.sql_query( > 2020-11-07T22:50:57.4316615Z "select LAST_VALUE(b) as b, > LAST_VALUE(c) as c from source group by a") > 2020-11-07T22:50:57.4316919Z result = > table_with_retract_message.group_by(t.c) \ > 2020-11-07T22:50:57.4317197Z > .select(test_iterate(t.b).alias("a"), t.c) \ > 2020-11-07T22:50:57.4317619Z .select(col("a").get(0).alias("a"), > 2020-11-07T22:50:57.4318111Z col("a").get(1).alias("b"), > 2020-11-07T22:50:57.4318357Z col("a").get(2).alias("c"), > 2020-11-07T22:50:57.4318586Z col("a").get(3).alias("d"), > 2020-11-07T22:50:57.4318814Z t.c.alias("e")) > 2020-11-07T22:50:57.4319023Z assert_frame_equal( > 2020-11-07T22:50:57.4319208Z > result.to_pandas(), > 2020-11-07T22:50:57.4319408Z pd.DataFrame([ > 2020-11-07T22:50:57.4319872Z ["hello,hello2", "1,3", > 'hello:3,hello2:1', 2, "hello"], > 2020-11-07T22:50:57.4320398Z ["Hi,Hi2,Hi3", "1,2,3", > "Hi:3,Hi2:2,Hi3:1", 3, "hi"]], > 2020-11-07T22:50:57.4321047Z columns=['a', 'b', 'c', 'd', > 'e'])) > 2020-11-07T22:50:57.4321198Z > 2020-11-07T22:50:57.4321385Z pyflink/table/tests/test_aggregate.py:468: > 2020-11-07T22:50:57.4321648Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2020-11-07T22:50:57.4322040Z pyflink/table/table.py:807: in to_pandas >
[jira] [Created] (FLINK-20046) StreamTableAggregateTests.test_map_view_iterate is instable
Dian Fu created FLINK-20046: --- Summary: StreamTableAggregateTests.test_map_view_iterate is instable Key: FLINK-20046 URL: https://issues.apache.org/jira/browse/FLINK-20046 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.12.0 Reporter: Dian Fu Fix For: 1.12.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=821b528f-1eed-5598-a3b4-7f748b13f261=4fad9527-b9a5-5015-1b70-8356e5c91490 {code} 2020-11-07T22:50:57.4180758Z ___ StreamTableAggregateTests.test_map_view_iterate 2020-11-07T22:50:57.4181301Z 2020-11-07T22:50:57.4181965Z self = 2020-11-07T22:50:57.4182348Z 2020-11-07T22:50:57.4182535Z def test_map_view_iterate(self): 2020-11-07T22:50:57.4182812Z test_iterate = udaf(TestIterateAggregateFunction()) 2020-11-07T22:50:57.4183320Z self.t_env.get_config().set_idle_state_retention(datetime.timedelta(days=1)) 2020-11-07T22:50:57.4183763Z self.t_env.get_config().get_configuration().set_string( 2020-11-07T22:50:57.4297555Z "python.fn-execution.bundle.size", "2") 2020-11-07T22:50:57.4297922Z # trigger the cache eviction in a bundle. 2020-11-07T22:50:57.4308028Z self.t_env.get_config().get_configuration().set_string( 2020-11-07T22:50:57.4308653Z "python.state.cache-size", "2") 2020-11-07T22:50:57.4308945Z self.t_env.get_config().get_configuration().set_string( 2020-11-07T22:50:57.4309382Z "python.map-state.read-cache-size", "2") 2020-11-07T22:50:57.4309676Z self.t_env.get_config().get_configuration().set_string( 2020-11-07T22:50:57.4310428Z "python.map-state.write-cache-size", "2") 2020-11-07T22:50:57.4310701Z self.t_env.get_config().get_configuration().set_string( 2020-11-07T22:50:57.4311130Z "python.map-state.iterate-response-batch-size", "2") 2020-11-07T22:50:57.4311361Z t = self.t_env.from_elements( 2020-11-07T22:50:57.4311691Z [(1, 'Hi_', 'hi'), 2020-11-07T22:50:57.4312004Z (1, 'Hi', 'hi'), 2020-11-07T22:50:57.4312316Z (2, 'hello', 'hello'), 2020-11-07T22:50:57.4312639Z (3, 'Hi_', 'hi'), 2020-11-07T22:50:57.4312975Z (3, 'Hi', 'hi'), 2020-11-07T22:50:57.4313285Z (4, 'hello', 'hello'), 2020-11-07T22:50:57.4313609Z (5, 'Hi2_', 'hi'), 2020-11-07T22:50:57.4313908Z (5, 'Hi2', 'hi'), 2020-11-07T22:50:57.4314238Z (6, 'hello2', 'hello'), 2020-11-07T22:50:57.4314558Z (7, 'Hi', 'hi'), 2020-11-07T22:50:57.4315053Z (8, 'hello', 'hello'), 2020-11-07T22:50:57.4315396Z (9, 'Hi2', 'hi'), 2020-11-07T22:50:57.4315773Z (13, 'Hi3', 'hi')], ['a', 'b', 'c']) 2020-11-07T22:50:57.4316023Z self.t_env.create_temporary_view("source", t) 2020-11-07T22:50:57.4316299Z table_with_retract_message = self.t_env.sql_query( 2020-11-07T22:50:57.4316615Z "select LAST_VALUE(b) as b, LAST_VALUE(c) as c from source group by a") 2020-11-07T22:50:57.4316919Z result = table_with_retract_message.group_by(t.c) \ 2020-11-07T22:50:57.4317197Z .select(test_iterate(t.b).alias("a"), t.c) \ 2020-11-07T22:50:57.4317619Z .select(col("a").get(0).alias("a"), 2020-11-07T22:50:57.4318111Z col("a").get(1).alias("b"), 2020-11-07T22:50:57.4318357Z col("a").get(2).alias("c"), 2020-11-07T22:50:57.4318586Z col("a").get(3).alias("d"), 2020-11-07T22:50:57.4318814Z t.c.alias("e")) 2020-11-07T22:50:57.4319023Z assert_frame_equal( 2020-11-07T22:50:57.4319208Z > result.to_pandas(), 2020-11-07T22:50:57.4319408Z pd.DataFrame([ 2020-11-07T22:50:57.4319872Z ["hello,hello2", "1,3", 'hello:3,hello2:1', 2, "hello"], 2020-11-07T22:50:57.4320398Z ["Hi,Hi2,Hi3", "1,2,3", "Hi:3,Hi2:2,Hi3:1", 3, "hi"]], 2020-11-07T22:50:57.4321047Z columns=['a', 'b', 'c', 'd', 'e'])) 2020-11-07T22:50:57.4321198Z 2020-11-07T22:50:57.4321385Z pyflink/table/tests/test_aggregate.py:468: 2020-11-07T22:50:57.4321648Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 2020-11-07T22:50:57.4322040Z pyflink/table/table.py:807: in to_pandas 2020-11-07T22:50:57.4322299Z .collectAsPandasDataFrame(self._j_table, max_arrow_batch_size) 2020-11-07T22:50:57.4322794Z .tox/py35-cython/lib/python3.5/site-packages/py4j/java_gateway.py:1286: in __call__ 2020-11-07T22:50:57.4323103Z answer, self.gateway_client, self.target_id, self.name) 2020-11-07T22:50:57.4323351Z pyflink/util/exceptions.py:147: in deco 2020-11-07T22:50:57.4323537Z return f(*a, **kw) 2020-11-07T22:50:57.4323783Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[jira] [Commented] (FLINK-19863) SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process failed due to timeout"
[ https://issues.apache.org/jira/browse/FLINK-19863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227890#comment-17227890 ] Dian Fu commented on FLINK-19863: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=91bf6583-3fb2-592f-e4d4-d79d79c3230a=3425d8ba-5f03-540a-c64b-51b8481bf7d6 > SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process > failed due to timeout" > --- > > Key: FLINK-19863 > URL: https://issues.apache.org/jira/browse/FLINK-19863 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Critical > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541=logs=91bf6583-3fb2-592f-e4d4-d79d79c3230a=3425d8ba-5f03-540a-c64b-51b8481bf7d6 > {code} > 00:50:02,589 [main] INFO > org.apache.flink.tests.util.flink.FlinkDistribution [] - Stopping > Flink cluster. > 00:50:04,106 [main] INFO > org.apache.flink.tests.util.flink.FlinkDistribution [] - Stopping > Flink cluster. > 00:50:04,741 [main] INFO > org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource [] - Backed up > logs to > /home/vsts/work/1/s/flink-end-to-end-tests/artifacts/flink-b3924665-1ac9-4309-8255-20f0dc94e7b9. > 00:50:04,788 [main] INFO > org.apache.flink.tests.util.hbase.LocalStandaloneHBaseResource [] - Stopping > HBase Cluster > 00:50:16,243 [main] ERROR > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase [] - > > Test testHBase[0: > hbase-version:1.4.3](org.apache.flink.tests.util.hbase.SQLClientHBaseITCase) > failed with: > java.io.IOException: Process failed due to timeout. > at > org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:130) > at > org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:108) > at > org.apache.flink.tests.util.flink.FlinkDistribution.submitSQLJob(FlinkDistribution.java:221) > at > org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource$StandaloneClusterController.submitSQLJob(LocalStandaloneFlinkResource.java:196) > at > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.executeSqlStatements(SQLClientHBaseITCase.java:215) > at > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.testHBase(SQLClientHBaseITCase.java:152) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (FLINK-19545) Add e2e test for native Kubernetes HA
[ https://issues.apache.org/jira/browse/FLINK-19545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang updated FLINK-19545: -- Comment: was deleted (was: We also need to add some integration tests for the KubernetesHAService, {{KubernetesLeaderElectionService}}, {{KubernetesLeaderRetrievalService}}, {{KubernetesJobGraphStore}}, etc.) > Add e2e test for native Kubernetes HA > - > > Key: FLINK-19545 > URL: https://issues.apache.org/jira/browse/FLINK-19545 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Yang Wang >Assignee: Yang Wang >Priority: Major > Fix For: 1.12.0 > > > We could use minikube for the E2E tests. Start a Flink session/application > cluster on K8s, kill one TaskManager pod or JobManager Pod and wait for the > job recovered from the latest checkpoint successfully. > {panel} > {panel} > |{{kubectl }}{{exec}} {{-it \{pod_name} -- }}{{/bin/sh}} {{-c }}{{"kill 1"}}| -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19545) Add e2e test for native Kubernetes HA
[ https://issues.apache.org/jira/browse/FLINK-19545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227889#comment-17227889 ] Yang Wang commented on FLINK-19545: --- We already have some IT cases for the critical methods. Then we just need to add a E2E test in this issue. {code:java} # Run the ITCases run_mvn test -Dtest=org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClientITCase run_mvn test -Dtest=org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElectorITCase run_mvn test -Dtest=org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionAndRetrievalITCase run_mvn test -Dtest=org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreITCase {code} > Add e2e test for native Kubernetes HA > - > > Key: FLINK-19545 > URL: https://issues.apache.org/jira/browse/FLINK-19545 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Yang Wang >Assignee: Yang Wang >Priority: Major > Fix For: 1.12.0 > > > We could use minikube for the E2E tests. Start a Flink session/application > cluster on K8s, kill one TaskManager pod or JobManager Pod and wait for the > job recovered from the latest checkpoint successfully. > {panel} > {panel} > |{{kubectl }}{{exec}} {{-it \{pod_name} -- }}{{/bin/sh}} {{-c }}{{"kill 1"}}| -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19842) PyFlinkStreamUserDefinedTableFunctionTests.test_table_function_with_sql_query is unstable
[ https://issues.apache.org/jira/browse/FLINK-19842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227888#comment-17227888 ] Dian Fu commented on FLINK-19842: - Seems another instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3 {code} 2020-11-07T23:17:06.7806808Z self = 2020-11-07T23:17:06.7807102Z 2020-11-07T23:17:06.7807304Z def test_table_function(self): 2020-11-07T23:17:06.7807580Z self._register_table_sink( 2020-11-07T23:17:06.7807959Z ['a', 'b', 'c'], 2020-11-07T23:17:06.7808316Z [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()]) 2020-11-07T23:17:06.7808665Z 2020-11-07T23:17:06.7808944Z multi_emit = udtf(MultiEmit(), result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()]) 2020-11-07T23:17:06.7809335Z multi_num = udf(MultiNum(), result_type=DataTypes.BIGINT()) 2020-11-07T23:17:06.7809595Z 2020-11-07T23:17:06.7810056Z t = self.t_env.from_elements([(1, 1, 3), (2, 1, 6), (3, 2, 9)], ['a', 'b', 'c']) 2020-11-07T23:17:06.7810613Z t = t.join_lateral(multi_emit(t.a, multi_num(t.b)).alias('x', 'y')) 2020-11-07T23:17:06.7811170Z t = t.left_outer_join_lateral(condition_multi_emit(t.x, t.y).alias('m')) \ 2020-11-07T23:17:06.7811479Z .select("x, y, m") 2020-11-07T23:17:06.7812029Z t = t.left_outer_join_lateral(identity(t.m).alias('n')) \ 2020-11-07T23:17:06.7812326Z .select("x, y, n") 2020-11-07T23:17:06.7812560Z actual = self._get_output(t) 2020-11-07T23:17:06.7812820Z self.assert_equals(actual, 2020-11-07T23:17:06.7813237Z["1,0,null", "1,1,null", "2,0,null", "2,1,null", "3,0,0", "3,0,1", 2020-11-07T23:17:06.7813628Z > "3,0,2", "3,1,1", "3,1,2", "3,2,2", "3,3,null"]) {code} > PyFlinkStreamUserDefinedTableFunctionTests.test_table_function_with_sql_query > is unstable > - > > Key: FLINK-19842 > URL: https://issues.apache.org/jira/browse/FLINK-19842 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8401=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3 > {code} > === FAILURES > === > _ > PyFlinkStreamUserDefinedTableFunctionTests.test_table_function_with_sql_query > _ > self = > testMethod=test_table_function_with_sql_query> > def test_table_function_with_sql_query(self): > self._register_table_sink( > ['a', 'b', 'c'], > [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()]) > > self.t_env.create_temporary_system_function( > "multi_emit", udtf(MultiEmit(), result_types=[DataTypes.BIGINT(), > DataTypes.BIGINT()])) > > t = self.t_env.from_elements([(1, 1, 3), (2, 1, 6), (3, 2, 9)], ['a', > 'b', 'c']) > self.t_env.register_table("MyTable", t) > t = self.t_env.sql_query( > "SELECT a, x, y FROM MyTable LEFT JOIN LATERAL > TABLE(multi_emit(a, b)) as T(x, y)" > " ON TRUE") > actual = self._get_output(t) > > self.assert_equals(actual, ["1,1,0", "2,2,0", "3,3,0", "3,3,1"]) > pyflink/table/tests/test_udtf.py:61: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > cls = 'pyflink.table.tests.test_udtf.PyFlinkStreamUserDefinedTableFunctionTests'> > actual = JavaObject id=o37759, expected = ['1,1,0', '2,2,0', '3,3,0', '3,3,1'] > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wangyang0918 commented on pull request #13871: [FLINK-19544][k8s] Implement CheckpointRecoveryFactory based on Kubernetes API
wangyang0918 commented on pull request #13871: URL: https://github.com/apache/flink/pull/13871#issuecomment-723518654 @tillrohrmann @xintongsong Thanks all for the elaborative review and help with merging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-18262) PyFlink end-to-end test stalls
[ https://issues.apache.org/jira/browse/FLINK-18262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227886#comment-17227886 ] Dian Fu edited comment on FLINK-18262 at 11/8/20, 1:38 AM: --- Seems another instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9278=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 {code} 2020-11-07T21:48:40.0076169Z Nov 07 21:48:40 We now have 2 NodeManagers up. 2020-11-07T22:15:02.5561863Z == 2020-11-07T22:15:02.5563141Z === WARNING: This E2E Run took already 80% of the allocated time budget of 250 minutes === 2020-11-07T22:15:02.5564157Z == 2020-11-07T22:54:02.5518469Z == 2020-11-07T22:54:02.5520427Z === WARNING: This E2E Run will time out in the next few minutes. Starting to upload the log output === 2020-11-07T22:54:02.5521385Z == 2020-11-07T23:05:09.8660803Z ##[error]The task has timed out. {code} was (Author: dian.fu): Seems another instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9278=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > PyFlink end-to-end test stalls > -- > > Key: FLINK-18262 > URL: https://issues.apache.org/jira/browse/FLINK-18262 > Project: Flink > Issue Type: Bug > Components: API / Python, Tests >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Priority: Critical > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=3299=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5 > {code} > 2020-06-11T17:25:37.2919869Z We now have 2 NodeManagers up. > 2020-06-11T18:22:11.3398295Z ##[error]The operation was canceled. > 2020-06-11T18:22:11.3411819Z ##[section]Finishing: Run e2e tests > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (FLINK-18262) PyFlink end-to-end test stalls
[ https://issues.apache.org/jira/browse/FLINK-18262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reopened FLINK-18262: - > PyFlink end-to-end test stalls > -- > > Key: FLINK-18262 > URL: https://issues.apache.org/jira/browse/FLINK-18262 > Project: Flink > Issue Type: Bug > Components: API / Python, Tests >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Priority: Critical > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=3299=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5 > {code} > 2020-06-11T17:25:37.2919869Z We now have 2 NodeManagers up. > 2020-06-11T18:22:11.3398295Z ##[error]The operation was canceled. > 2020-06-11T18:22:11.3411819Z ##[section]Finishing: Run e2e tests > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
lirui-apache commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519243418 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java ## @@ -46,29 +58,98 @@ private static final long serialVersionUID = 1L; + private final JobConfWrapper jobConfWrapper; + private final List partitionKeys; + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + private final ObjectPath tablePath; + HiveSource( JobConf jobConf, + ObjectPath tablePath, CatalogTable catalogTable, List partitions, @Nullable Long limit, String hiveVersion, boolean useMapRedReader, - boolean isStreamingSource, + @Nullable ContinuousEnumerationSettings continuousEnumerationSettings, + ContinuousPartitionFetcher fetcher, + HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext, RowType producedRowType) { super( new org.apache.flink.core.fs.Path[1], new HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)), - DEFAULT_SPLIT_ASSIGNER, + continuousEnumerationSettings == null ? DEFAULT_SPLIT_ASSIGNER : SimpleSplitAssigner::new, Review comment: In streaming read, we would want the splits to be consumed in order. The `DEFAULT_SPLIT_ASSIGNER` is locality-aware and therefore doesn't meet the requirement. But perhaps I should only do this for partitioned table, because non-partitioned table still reuse super class's enumerator and the splits are not generated in order in the first place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18262) PyFlink end-to-end test stalls
[ https://issues.apache.org/jira/browse/FLINK-18262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227886#comment-17227886 ] Dian Fu commented on FLINK-18262: - Seems another instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9278=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > PyFlink end-to-end test stalls > -- > > Key: FLINK-18262 > URL: https://issues.apache.org/jira/browse/FLINK-18262 > Project: Flink > Issue Type: Bug > Components: API / Python, Tests >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Priority: Critical > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=3299=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5 > {code} > 2020-06-11T17:25:37.2919869Z We now have 2 NodeManagers up. > 2020-06-11T18:22:11.3398295Z ##[error]The operation was canceled. > 2020-06-11T18:22:11.3411819Z ##[section]Finishing: Run e2e tests > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
lirui-apache commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519243125 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.event.RequestSplitEvent; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.PendingSplitsCheckpoint; +import org.apache.flink.connector.file.src.assigners.FileSplitAssigner; +import org.apache.flink.connectors.hive.read.HiveSourceSplit; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.filesystem.ContinuousPartitionFetcher; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A continuously monitoring {@link SplitEnumerator} for hive source. + */ +public class ContinuousHiveSplitEnumerator> implements SplitEnumerator> { + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class); + + private final SplitEnumeratorContext enumeratorContext; + private final LinkedHashMap readersAwaitingSplit; + private final FileSplitAssigner splitAssigner; + private final long discoveryInterval; + + private final JobConf jobConf; + private final ObjectPath tablePath; + + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + + private final ReadWriteLock stateLock = new ReentrantReadWriteLock(); + // the maximum partition read offset seen so far + private volatile T currentReadOffset; + // the partitions that have been processed for a given read offset + private final Set> seenPartitions; + + public ContinuousHiveSplitEnumerator( + SplitEnumeratorContext enumeratorContext, + T currentReadOffset, + Collection> seenPartitions, + FileSplitAssigner splitAssigner, + long discoveryInterval, + JobConf jobConf, + ObjectPath tablePath, + ContinuousPartitionFetcher fetcher, + HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext) { + this.enumeratorContext = enumeratorContext; + this.currentReadOffset = currentReadOffset; + this.seenPartitions = new HashSet<>(seenPartitions); + this.splitAssigner = splitAssigner; + this.discoveryInterval = discoveryInterval; + this.jobConf = jobConf; + this.tablePath = tablePath; + this.fetcher = fetcher; + this.fetcherContext = fetcherContext; + readersAwaitingSplit = new LinkedHashMap<>(); + } + + @Override + public void start() { + try { + fetcherContext.open(); + enumeratorContext.callAsync( + this::monitorAndGetSplits, + this::handleNewSplits, + discoveryInterval, +
[GitHub] [flink] dianfu commented on pull request #13930: [FLINK-19974][python][e2e] Fix the bug that kafka related services are not teardown properly after tests finished
dianfu commented on pull request #13930: URL: https://github.com/apache/flink/pull/13930#issuecomment-723517930 Actually the tests have passed, not sure why the status is still "PENDING". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
flinkbot edited a comment on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-706309837 ## CI report: * 95a2052bdb6586122683e9531f157230ec75cd37 UNKNOWN * a9439ddaafd89e49968c1f17f0f02f21293b8343 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9270) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9285) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20045) ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval failed with "TimeoutException: Contender was not elected as the leader within 200000ms"
[ https://issues.apache.org/jira/browse/FLINK-20045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-20045: Labels: test-stability (was: ) > ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval failed with > "TimeoutException: Contender was not elected as the leader within 20ms" > > > Key: FLINK-20045 > URL: https://issues.apache.org/jira/browse/FLINK-20045 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9251=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0 > {code} > 2020-11-07T10:34:07.5063203Z [ERROR] > testZooKeeperLeaderElectionRetrieval(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest) > Time elapsed: 202.445 s <<< ERROR! > 2020-11-07T10:34:07.5064331Z java.util.concurrent.TimeoutException: Contender > was not elected as the leader within 20ms > 2020-11-07T10:34:07.5064946Z at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:153) > 2020-11-07T10:34:07.5065762Z at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:139) > 2020-11-07T10:34:07.5066565Z at > org.apache.flink.runtime.leaderelection.TestingLeaderBase.waitForLeader(TestingLeaderBase.java:48) > 2020-11-07T10:34:07.5067185Z at > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval(ZooKeeperLeaderElectionTest.java:144) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20045) ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval failed with "TimeoutException: Contender was not elected as the leader within 200000ms"
Dian Fu created FLINK-20045: --- Summary: ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval failed with "TimeoutException: Contender was not elected as the leader within 20ms" Key: FLINK-20045 URL: https://issues.apache.org/jira/browse/FLINK-20045 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Dian Fu https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9251=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0 {code} 2020-11-07T10:34:07.5063203Z [ERROR] testZooKeeperLeaderElectionRetrieval(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest) Time elapsed: 202.445 s <<< ERROR! 2020-11-07T10:34:07.5064331Z java.util.concurrent.TimeoutException: Contender was not elected as the leader within 20ms 2020-11-07T10:34:07.5064946Zat org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:153) 2020-11-07T10:34:07.5065762Zat org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:139) 2020-11-07T10:34:07.5066565Zat org.apache.flink.runtime.leaderelection.TestingLeaderBase.waitForLeader(TestingLeaderBase.java:48) 2020-11-07T10:34:07.5067185Zat org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval(ZooKeeperLeaderElectionTest.java:144) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19699) PrometheusReporterEndToEndITCase crashes with exit code 143
[ https://issues.apache.org/jira/browse/FLINK-19699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227884#comment-17227884 ] Dian Fu commented on FLINK-19699: - Another instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9257=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > PrometheusReporterEndToEndITCase crashes with exit code 143 > --- > > Key: FLINK-19699 > URL: https://issues.apache.org/jira/browse/FLINK-19699 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Robert Metzger >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.12.0 > > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7814=logs=68a897ab-3047-5660-245a-cce8f83859f6=16ca2cca-2f63-5cce-12d2-d519b930a729] > {code} > 2020-10-18T23:46:04.9667443Z [ERROR] The forked VM terminated without > properly saying goodbye. VM crash or System.exit called? > 2020-10-18T23:46:04.9669237Z [ERROR] Command was /bin/sh -c cd > /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target > && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m > -Dmvn.forkNumber=2 -XX:+UseG1GC -jar > /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire/surefirebooter6797466627443523305.jar > > /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire > 2020-10-18T23-44-09_467-jvmRun2 surefire930806459376622178tmp > surefire_41970585275084524978tmp > 2020-10-18T23:46:04.9670440Z [ERROR] Error occurred in starting fork, check > output in log > 2020-10-18T23:46:04.9671283Z [ERROR] Process Exit Code: 143 > 2020-10-18T23:46:04.9671614Z [ERROR] Crashed tests: > 2020-10-18T23:46:04.9672025Z [ERROR] > org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase > 2020-10-18T23:46:04.9672649Z [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: The forked VM > terminated without properly saying goodbye. VM crash or System.exit called? > 2020-10-18T23:46:04.9674834Z [ERROR] Command was /bin/sh -c cd > /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target > && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m > -Dmvn.forkNumber=2 -XX:+UseG1GC -jar > /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire/surefirebooter6797466627443523305.jar > > /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire > 2020-10-18T23-44-09_467-jvmRun2 surefire930806459376622178tmp > surefire_41970585275084524978tmp > 2020-10-18T23:46:04.9676153Z [ERROR] Error occurred in starting fork, check > output in log > 2020-10-18T23:46:04.9676556Z [ERROR] Process Exit Code: 143 > 2020-10-18T23:46:04.9676882Z [ERROR] Crashed tests: > 2020-10-18T23:46:04.9677288Z [ERROR] > org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase > 2020-10-18T23:46:04.9677827Z [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.fork(ForkStarter.java:669) > 2020-10-18T23:46:04.9678408Z [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:282) > 2020-10-18T23:46:04.9678965Z [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:245) > 2020-10-18T23:46:04.9679575Z [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183) > 2020-10-18T23:46:04.9680983Z [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011) > 2020-10-18T23:46:04.9681749Z [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857) > 2020-10-18T23:46:04.9682246Z [ERROR] at > org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132) > 2020-10-18T23:46:04.9682728Z [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) > 2020-10-18T23:46:04.9683179Z [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) > 2020-10-18T23:46:04.9683609Z [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) > 2020-10-18T23:46:04.9684102Z [ERROR] at > org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116) > 2020-10-18T23:46:04.9684639Z [ERROR] at >
[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
JingsongLi commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519241982 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpointSerializer.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.connector.file.src.PendingSplitsCheckpoint; +import org.apache.flink.connector.file.src.PendingSplitsCheckpointSerializer; +import org.apache.flink.connectors.hive.read.HiveSourceSplit; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * SerDe for {@link ContinuousHivePendingSplitsCheckpoint}. + */ +public class ContinuousHivePendingSplitsCheckpointSerializer implements SimpleVersionedSerializer> { + + private static final int VERSION = 1; + + private final PendingSplitsCheckpointSerializer superSerDe; + + public ContinuousHivePendingSplitsCheckpointSerializer(SimpleVersionedSerializer splitSerDe) { + superSerDe = new PendingSplitsCheckpointSerializer<>(splitSerDe); + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(PendingSplitsCheckpoint checkpoint) throws IOException { + Preconditions.checkArgument(checkpoint.getClass() == ContinuousHivePendingSplitsCheckpoint.class, + "Only supports %s", ContinuousHivePendingSplitsCheckpoint.class.getName()); + + ContinuousHivePendingSplitsCheckpoint hiveCheckpoint = (ContinuousHivePendingSplitsCheckpoint) checkpoint; + PendingSplitsCheckpoint superCP = PendingSplitsCheckpoint.fromCollectionSnapshot( + hiveCheckpoint.getSplits(), hiveCheckpoint.getAlreadyProcessedPaths()); + byte[] superBytes = superSerDe.serialize(superCP); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) { + outputStream.writeInt(superBytes.length); + outputStream.write(superBytes); + outputStream.writeObject(hiveCheckpoint.getCurrentReadOffset()); + outputStream.writeInt(hiveCheckpoint.getSeenPartitions().size()); Review comment: It is better to use `ListSer` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
JingsongLi commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519241963 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpointSerializer.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.connector.file.src.PendingSplitsCheckpoint; +import org.apache.flink.connector.file.src.PendingSplitsCheckpointSerializer; +import org.apache.flink.connectors.hive.read.HiveSourceSplit; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * SerDe for {@link ContinuousHivePendingSplitsCheckpoint}. + */ +public class ContinuousHivePendingSplitsCheckpointSerializer implements SimpleVersionedSerializer> { + + private static final int VERSION = 1; + + private final PendingSplitsCheckpointSerializer superSerDe; + + public ContinuousHivePendingSplitsCheckpointSerializer(SimpleVersionedSerializer splitSerDe) { + superSerDe = new PendingSplitsCheckpointSerializer<>(splitSerDe); + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(PendingSplitsCheckpoint checkpoint) throws IOException { + Preconditions.checkArgument(checkpoint.getClass() == ContinuousHivePendingSplitsCheckpoint.class, + "Only supports %s", ContinuousHivePendingSplitsCheckpoint.class.getName()); + + ContinuousHivePendingSplitsCheckpoint hiveCheckpoint = (ContinuousHivePendingSplitsCheckpoint) checkpoint; + PendingSplitsCheckpoint superCP = PendingSplitsCheckpoint.fromCollectionSnapshot( + hiveCheckpoint.getSplits(), hiveCheckpoint.getAlreadyProcessedPaths()); + byte[] superBytes = superSerDe.serialize(superCP); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) { + outputStream.writeInt(superBytes.length); + outputStream.write(superBytes); + outputStream.writeObject(hiveCheckpoint.getCurrentReadOffset()); Review comment: It is better to pass serializer here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20011) PageRankITCase.testPrintWithRMatGraph hangs
[ https://issues.apache.org/jira/browse/FLINK-20011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227883#comment-17227883 ] Dian Fu commented on FLINK-20011: - Another instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9257=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5 > PageRankITCase.testPrintWithRMatGraph hangs > --- > > Key: FLINK-20011 > URL: https://issues.apache.org/jira/browse/FLINK-20011 > Project: Flink > Issue Type: Bug > Components: Library / Graph Processing (Gelly), Runtime / > Coordination, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9121=logs=1fc6e7bf-633c-5081-c32a-9dea24b05730=80a658d1-f7f6-5d93-2758-53ac19fd5b19] > {code} > 2020-11-05T22:42:34.4186647Z "main" #1 prio=5 os_prio=0 > tid=0x7fa98c00b800 nid=0x32f8 waiting on condition [0x7fa995c12000] > 2020-11-05T22:42:34.4187168Z java.lang.Thread.State: WAITING (parking) > 2020-11-05T22:42:34.4187563Z at sun.misc.Unsafe.park(Native Method) > 2020-11-05T22:42:34.4188246Z - parking to wait for <0x8736d120> (a > java.util.concurrent.CompletableFuture$Signaller) > 2020-11-05T22:42:34.411Z at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > 2020-11-05T22:42:34.4189351Z at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > 2020-11-05T22:42:34.4189930Z at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > 2020-11-05T22:42:34.4190509Z at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > 2020-11-05T22:42:34.4191059Z at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > 2020-11-05T22:42:34.4191591Z at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:893) > 2020-11-05T22:42:34.4192208Z at > org.apache.flink.graph.asm.dataset.DataSetAnalyticBase.execute(DataSetAnalyticBase.java:55) > 2020-11-05T22:42:34.4192787Z at > org.apache.flink.graph.drivers.output.Print.write(Print.java:48) > 2020-11-05T22:42:34.4193373Z at > org.apache.flink.graph.Runner.execute(Runner.java:454) > 2020-11-05T22:42:34.4194156Z at > org.apache.flink.graph.Runner.main(Runner.java:507) > 2020-11-05T22:42:34.4194618Z at > org.apache.flink.graph.drivers.DriverBaseITCase.getSystemOutput(DriverBaseITCase.java:208) > 2020-11-05T22:42:34.4195192Z at > org.apache.flink.graph.drivers.DriverBaseITCase.expectedCount(DriverBaseITCase.java:100) > 2020-11-05T22:42:34.4195914Z at > org.apache.flink.graph.drivers.PageRankITCase.testPrintWithRMatGraph(PageRankITCase.java:60) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
JingsongLi commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519241825 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.event.RequestSplitEvent; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.PendingSplitsCheckpoint; +import org.apache.flink.connector.file.src.assigners.FileSplitAssigner; +import org.apache.flink.connectors.hive.read.HiveSourceSplit; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.filesystem.ContinuousPartitionFetcher; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A continuously monitoring {@link SplitEnumerator} for hive source. + */ +public class ContinuousHiveSplitEnumerator> implements SplitEnumerator> { + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class); + + private final SplitEnumeratorContext enumeratorContext; + private final LinkedHashMap readersAwaitingSplit; + private final FileSplitAssigner splitAssigner; + private final long discoveryInterval; + + private final JobConf jobConf; + private final ObjectPath tablePath; + + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + + private final ReadWriteLock stateLock = new ReentrantReadWriteLock(); + // the maximum partition read offset seen so far + private volatile T currentReadOffset; + // the partitions that have been processed for a given read offset + private final Set> seenPartitions; + + public ContinuousHiveSplitEnumerator( + SplitEnumeratorContext enumeratorContext, + T currentReadOffset, + Collection> seenPartitions, + FileSplitAssigner splitAssigner, + long discoveryInterval, + JobConf jobConf, + ObjectPath tablePath, + ContinuousPartitionFetcher fetcher, + HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext) { + this.enumeratorContext = enumeratorContext; + this.currentReadOffset = currentReadOffset; + this.seenPartitions = new HashSet<>(seenPartitions); + this.splitAssigner = splitAssigner; + this.discoveryInterval = discoveryInterval; + this.jobConf = jobConf; + this.tablePath = tablePath; + this.fetcher = fetcher; + this.fetcherContext = fetcherContext; + readersAwaitingSplit = new LinkedHashMap<>(); + } + + @Override + public void start() { + try { + fetcherContext.open(); + enumeratorContext.callAsync( + this::monitorAndGetSplits, + this::handleNewSplits, + discoveryInterval, +
[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
JingsongLi commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519241603 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.event.RequestSplitEvent; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.PendingSplitsCheckpoint; +import org.apache.flink.connector.file.src.assigners.FileSplitAssigner; +import org.apache.flink.connectors.hive.read.HiveSourceSplit; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.filesystem.ContinuousPartitionFetcher; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A continuously monitoring {@link SplitEnumerator} for hive source. + */ +public class ContinuousHiveSplitEnumerator> implements SplitEnumerator> { + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class); + + private final SplitEnumeratorContext enumeratorContext; + private final LinkedHashMap readersAwaitingSplit; + private final FileSplitAssigner splitAssigner; + private final long discoveryInterval; + + private final JobConf jobConf; + private final ObjectPath tablePath; + + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + + private final ReadWriteLock stateLock = new ReentrantReadWriteLock(); Review comment: Do we really need this lock? I think thread safety should be ensured by framework This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
becketqin commented on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-723516469 @flinkbot run azure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
JingsongLi commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519241466 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java ## @@ -151,23 +139,63 @@ public boolean isBounded() { catalogTable, hiveShim, remainingPartitions); + Configuration configuration = Configuration.fromMap(catalogTable.getOptions()); - @SuppressWarnings("unchecked") - TypeInformation typeInfo = - (TypeInformation) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); + Duration monitorInterval = null; + ContinuousPartitionFetcher fetcher = null; + HiveContinuousPartitionFetcherContext fetcherContext = null; + if (isStreamingSource()) { + monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null + ? DEFAULT_SCAN_MONITOR_INTERVAL + : configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL); + fetcher = new HiveContinuousPartitionFetcher(); + + final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, + HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal); + fetcherContext = new HiveContinuousPartitionFetcherContext( + tablePath, + hiveShim, + new JobConfWrapper(jobConf), + catalogTable.getPartitionKeys(), + getProducedTableSchema().getFieldDataTypes(), + getProducedTableSchema().getFieldNames(), + configuration, + defaultPartitionName); + } - HiveTableInputFormat inputFormat = getInputFormat( + HiveSource hiveSource = new HiveSource( + jobConf, + tablePath, + catalogTable, allHivePartitions, - flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER)); + limit, + hiveVersion, + flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER), + isStreamingSource() ? new ContinuousEnumerationSettings(monitorInterval) : null, + fetcher, + fetcherContext, + (RowType) getProducedDataType().getLogicalType() + ); + DataStreamSource source = execEnv.fromSource( + hiveSource, WatermarkStrategy.noWatermarks(), "HiveSource-" + tablePath.getFullName()); if (isStreamingSource()) { Review comment: Why not move these check to upper? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
JingsongLi commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519241369 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java ## @@ -46,29 +58,98 @@ private static final long serialVersionUID = 1L; + private final JobConfWrapper jobConfWrapper; + private final List partitionKeys; + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + private final ObjectPath tablePath; + HiveSource( JobConf jobConf, + ObjectPath tablePath, CatalogTable catalogTable, List partitions, @Nullable Long limit, String hiveVersion, boolean useMapRedReader, - boolean isStreamingSource, + @Nullable ContinuousEnumerationSettings continuousEnumerationSettings, Review comment: It is better to create a builder for `HiveSource` instead of passing all arguments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
JingsongLi commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519241002 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java ## @@ -46,29 +58,98 @@ private static final long serialVersionUID = 1L; + private final JobConfWrapper jobConfWrapper; + private final List partitionKeys; + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + private final ObjectPath tablePath; + HiveSource( JobConf jobConf, + ObjectPath tablePath, CatalogTable catalogTable, List partitions, @Nullable Long limit, String hiveVersion, boolean useMapRedReader, - boolean isStreamingSource, + @Nullable ContinuousEnumerationSettings continuousEnumerationSettings, + ContinuousPartitionFetcher fetcher, + HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext, RowType producedRowType) { super( new org.apache.flink.core.fs.Path[1], new HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)), - DEFAULT_SPLIT_ASSIGNER, + continuousEnumerationSettings == null ? DEFAULT_SPLIT_ASSIGNER : SimpleSplitAssigner::new, createBulkFormat(new JobConf(jobConf), catalogTable, hiveVersion, producedRowType, useMapRedReader, limit), - null); - Preconditions.checkArgument(!isStreamingSource, "HiveSource currently only supports bounded mode"); + continuousEnumerationSettings); + this.jobConfWrapper = new JobConfWrapper(jobConf); + this.tablePath = tablePath; + this.partitionKeys = catalogTable.getPartitionKeys(); + this.fetcher = fetcher; + this.fetcherContext = fetcherContext; } @Override public SimpleVersionedSerializer getSplitSerializer() { return HiveSourceSplitSerializer.INSTANCE; } + @Override + public SimpleVersionedSerializer> getEnumeratorCheckpointSerializer() { + if (needContinuousSplitEnumerator()) { + return new ContinuousHivePendingSplitsCheckpointSerializer(getSplitSerializer()); + } else { + return super.getEnumeratorCheckpointSerializer(); + } + } + + @Override + public SplitEnumerator> createEnumerator( + SplitEnumeratorContext enumContext) { + if (needContinuousSplitEnumerator()) { + return createContinuousSplitEnumerator( + enumContext, fetcherContext.getConsumeStartOffset(), Collections.emptyList(), Collections.emptyList()); + } else { + return super.createEnumerator(enumContext); + } + } + + @Override + public SplitEnumerator> restoreEnumerator( + SplitEnumeratorContext enumContext, PendingSplitsCheckpoint checkpoint) { + if (needContinuousSplitEnumerator()) { + Preconditions.checkState(checkpoint instanceof ContinuousHivePendingSplitsCheckpoint, + "Illegal type of splits checkpoint %s for streaming read partitioned table", checkpoint.getClass().getName()); + ContinuousHivePendingSplitsCheckpoint hiveCheckpoint = (ContinuousHivePendingSplitsCheckpoint) checkpoint; + return createContinuousSplitEnumerator( + enumContext, hiveCheckpoint.getCurrentReadOffset(), hiveCheckpoint.getSeenPartitions(), hiveCheckpoint.getSplits()); + } else { + return super.restoreEnumerator(enumContext, checkpoint); + } + } + + private boolean needContinuousSplitEnumerator() { + return getBoundedness() == Boundedness.CONTINUOUS_UNBOUNDED && !partitionKeys.isEmpty(); + } + + private SplitEnumerator> createContinuousSplitEnumerator( + SplitEnumeratorContext enumContext, + Comparable currentReadOffset, + Collection> seenPartitions, + Collection splits) { + return new ContinuousHiveSplitEnumerator( Review comment: Ditto
[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
JingsongLi commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519240975 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java ## @@ -46,29 +58,98 @@ private static final long serialVersionUID = 1L; + private final JobConfWrapper jobConfWrapper; + private final List partitionKeys; + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + private final ObjectPath tablePath; + HiveSource( JobConf jobConf, + ObjectPath tablePath, CatalogTable catalogTable, List partitions, @Nullable Long limit, String hiveVersion, boolean useMapRedReader, - boolean isStreamingSource, + @Nullable ContinuousEnumerationSettings continuousEnumerationSettings, + ContinuousPartitionFetcher fetcher, + HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext, RowType producedRowType) { super( new org.apache.flink.core.fs.Path[1], new HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)), - DEFAULT_SPLIT_ASSIGNER, + continuousEnumerationSettings == null ? DEFAULT_SPLIT_ASSIGNER : SimpleSplitAssigner::new, createBulkFormat(new JobConf(jobConf), catalogTable, hiveVersion, producedRowType, useMapRedReader, limit), - null); - Preconditions.checkArgument(!isStreamingSource, "HiveSource currently only supports bounded mode"); + continuousEnumerationSettings); + this.jobConfWrapper = new JobConfWrapper(jobConf); + this.tablePath = tablePath; + this.partitionKeys = catalogTable.getPartitionKeys(); + this.fetcher = fetcher; + this.fetcherContext = fetcherContext; } @Override public SimpleVersionedSerializer getSplitSerializer() { return HiveSourceSplitSerializer.INSTANCE; } + @Override + public SimpleVersionedSerializer> getEnumeratorCheckpointSerializer() { + if (needContinuousSplitEnumerator()) { + return new ContinuousHivePendingSplitsCheckpointSerializer(getSplitSerializer()); + } else { + return super.getEnumeratorCheckpointSerializer(); + } + } + + @Override + public SplitEnumerator> createEnumerator( + SplitEnumeratorContext enumContext) { + if (needContinuousSplitEnumerator()) { + return createContinuousSplitEnumerator( + enumContext, fetcherContext.getConsumeStartOffset(), Collections.emptyList(), Collections.emptyList()); + } else { + return super.createEnumerator(enumContext); + } + } + + @Override + public SplitEnumerator> restoreEnumerator( + SplitEnumeratorContext enumContext, PendingSplitsCheckpoint checkpoint) { + if (needContinuousSplitEnumerator()) { + Preconditions.checkState(checkpoint instanceof ContinuousHivePendingSplitsCheckpoint, + "Illegal type of splits checkpoint %s for streaming read partitioned table", checkpoint.getClass().getName()); + ContinuousHivePendingSplitsCheckpoint hiveCheckpoint = (ContinuousHivePendingSplitsCheckpoint) checkpoint; + return createContinuousSplitEnumerator( + enumContext, hiveCheckpoint.getCurrentReadOffset(), hiveCheckpoint.getSeenPartitions(), hiveCheckpoint.getSplits()); + } else { + return super.restoreEnumerator(enumContext, checkpoint); + } + } + + private boolean needContinuousSplitEnumerator() { Review comment: `continuousPartitionedEnumerator`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
JingsongLi commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519240879 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java ## @@ -46,29 +58,98 @@ private static final long serialVersionUID = 1L; + private final JobConfWrapper jobConfWrapper; + private final List partitionKeys; + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + private final ObjectPath tablePath; + HiveSource( JobConf jobConf, + ObjectPath tablePath, CatalogTable catalogTable, List partitions, @Nullable Long limit, String hiveVersion, boolean useMapRedReader, - boolean isStreamingSource, + @Nullable ContinuousEnumerationSettings continuousEnumerationSettings, + ContinuousPartitionFetcher fetcher, + HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext, RowType producedRowType) { super( new org.apache.flink.core.fs.Path[1], new HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)), - DEFAULT_SPLIT_ASSIGNER, + continuousEnumerationSettings == null ? DEFAULT_SPLIT_ASSIGNER : SimpleSplitAssigner::new, Review comment: Why not `DEFAULT_SPLIT_ASSIGNER `? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
JingsongLi commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519240794 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.event.RequestSplitEvent; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.PendingSplitsCheckpoint; +import org.apache.flink.connector.file.src.assigners.FileSplitAssigner; +import org.apache.flink.connectors.hive.read.HiveSourceSplit; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.filesystem.ContinuousPartitionFetcher; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A continuously monitoring {@link SplitEnumerator} for hive source. + */ +public class ContinuousHiveSplitEnumerator> implements SplitEnumerator> { + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class); + + private final SplitEnumeratorContext enumeratorContext; + private final LinkedHashMap readersAwaitingSplit; + private final FileSplitAssigner splitAssigner; + private final long discoveryInterval; + + private final JobConf jobConf; + private final ObjectPath tablePath; + + private final ContinuousPartitionFetcher fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; + + private final ReadWriteLock stateLock = new ReentrantReadWriteLock(); + // the maximum partition read offset seen so far + private volatile T currentReadOffset; + // the partitions that have been processed for a given read offset + private final Set> seenPartitions; + + public ContinuousHiveSplitEnumerator( + SplitEnumeratorContext enumeratorContext, + T currentReadOffset, + Collection> seenPartitions, + FileSplitAssigner splitAssigner, + long discoveryInterval, + JobConf jobConf, + ObjectPath tablePath, + ContinuousPartitionFetcher fetcher, + HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext) { + this.enumeratorContext = enumeratorContext; + this.currentReadOffset = currentReadOffset; + this.seenPartitions = new HashSet<>(seenPartitions); + this.splitAssigner = splitAssigner; + this.discoveryInterval = discoveryInterval; + this.jobConf = jobConf; + this.tablePath = tablePath; + this.fetcher = fetcher; + this.fetcherContext = fetcherContext; + readersAwaitingSplit = new LinkedHashMap<>(); + } + + @Override + public void start() { + try { + fetcherContext.open(); + enumeratorContext.callAsync( + this::monitorAndGetSplits, + this::handleNewSplits, + discoveryInterval, +
[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
JingsongLi commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519240289 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpoint.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.connector.file.src.PendingSplitsCheckpoint; +import org.apache.flink.connectors.hive.read.HiveSourceSplit; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * The checkpoint of current state of continuous hive source reading. + */ +public class ContinuousHivePendingSplitsCheckpoint extends PendingSplitsCheckpoint { + + private final Comparable currentReadOffset; + private final Collection> seenPartitions; + + public ContinuousHivePendingSplitsCheckpoint(Collection splits, Review comment: Code style: ``` public ContinuousHivePendingSplitsCheckpoint( Collection splits, Comparable currentReadOffset, Collection> seenPartitions) { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
JingsongLi commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519240181 ## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java ## @@ -75,12 +75,12 @@ private final FileEnumerator.Provider enumeratorFactory; - private final FileSplitAssigner.Provider assignerFactory; + protected final FileSplitAssigner.Provider assignerFactory; Review comment: +1 Some communities directly open mandatory checkstyle checks, providing only setters and getters instead of protected fields. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
flinkbot edited a comment on pull request #13845: URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990 ## CI report: * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN * 215a25d83260a048877bdf8462a06473676efb8a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9223) * fe94615bb4be9771757f98e93ff11e8d765231f2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9282) * 23323a7b983fac19fdd620b0cc82eace73cc587f UNKNOWN * d4c46ab021f7d464004ca5f58589b25a5334f147 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9284) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
flinkbot edited a comment on pull request #13845: URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990 ## CI report: * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN * 215a25d83260a048877bdf8462a06473676efb8a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9223) * fe94615bb4be9771757f98e93ff11e8d765231f2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9282) * 23323a7b983fac19fdd620b0cc82eace73cc587f UNKNOWN * d4c46ab021f7d464004ca5f58589b25a5334f147 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StephanEwen commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming
StephanEwen commented on a change in pull request #13963: URL: https://github.com/apache/flink/pull/13963#discussion_r519231464 ## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java ## @@ -75,12 +75,12 @@ private final FileEnumerator.Provider enumeratorFactory; - private final FileSplitAssigner.Provider assignerFactory; + protected final FileSplitAssigner.Provider assignerFactory; Review comment: I have a slight preference to keep these fields `private` and add a public getter (`public FileSplitAssigner.Provider getAssigner()`. ## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java ## @@ -75,12 +75,12 @@ private final FileEnumerator.Provider enumeratorFactory; - private final FileSplitAssigner.Provider assignerFactory; + protected final FileSplitAssigner.Provider assignerFactory; private final BulkFormat readerFormat; @Nullable - private final ContinuousEnumerationSettings continuousEnumerationSettings; + protected final ContinuousEnumerationSettings continuousEnumerationSettings; Review comment: Similar as above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StephanEwen commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
StephanEwen commented on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-723506656 Thanks a lot, @becketqin for taking these comments into account. The current solution looks good to me. +1 to merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
flinkbot edited a comment on pull request #13845: URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990 ## CI report: * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN * 215a25d83260a048877bdf8462a06473676efb8a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9223) * fe94615bb4be9771757f98e93ff11e8d765231f2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9282) * 23323a7b983fac19fdd620b0cc82eace73cc587f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
AHeise commented on a change in pull request #13845: URL: https://github.com/apache/flink/pull/13845#discussion_r519229516 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Demultiplexer.java ## @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.RescaledChannelsMapping; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.VirtualChannelSelector; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * {@link RecordDeserializer}-like interface for recovery. To avoid additional virtual method calls on the + * non-recovery hotpath, this interface is not extending RecordDeserializer. + */ +interface Demultiplexer extends AutoCloseable { + RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate deserializationDelegate) throws IOException; + + void setNextBuffer(Buffer buffer) throws IOException; + + void select(VirtualChannelSelector event); + + @Override + void close(); +} + +class NoDataDemultiplexer implements Demultiplexer { + private final InputChannelInfo channelInfo; + + public NoDataDemultiplexer(InputChannelInfo channelInfo) { + this.channelInfo = channelInfo; + } + + @Override + public RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate deserializationDelegate) { + throw getException(); + } + + @Override + public void setNextBuffer(Buffer buffer) { + throw getException(); + } + + @Override + public void select(VirtualChannelSelector event) { + throw getException(); + } + + private IllegalStateException getException() { + return new IllegalStateException(channelInfo + " should not receive any data/events during recovery"); + } + + @Override + public void close() { + } +} + +/** + * Parameter structure to pass all relevant information to the factory methods of @{@link Demultiplexer}. + */ +class DemultiplexParameters { + final IOManager ioManager; + final InflightDataRescalingDescriptor channelMapping; + final Function> gatePartitionerRetriever; + final SerializationDelegate delegate; + final int numberOfChannels; + final int subtaskIndex; + + @SuppressWarnings("unchecked") + DemultiplexParameters( + TypeSerializer inputSerializer, + IOManager ioManager, + InflightDataRescalingDescriptor channelMapping, +
[GitHub] [flink] StephanEwen commented on pull request #13885: [FLINK-19911] Read checkpoint stream with buffer to speedup restore
StephanEwen commented on pull request #13885: URL: https://github.com/apache/flink/pull/13885#issuecomment-723504501 Are the HDFS input streams generally not buffered? Would it make sense to adjust the `HadoopDataInputStream` class to be buffered? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
AHeise commented on a change in pull request #13845: URL: https://github.com/apache/flink/pull/13845#discussion_r519229050 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RescalingStreamTaskNetworkInput.java ## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; +import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.api.VirtualChannelSelector; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link StreamTaskNetworkInput} implementation that demultiplexes virtual channels. + * + * The demultiplexing works in two dimensions for the following cases. + * + * Subtasks of the current operator have been collapsed in a round-robin fashion. + * The connected output operator has been rescaled (up and down!) and there is an overlap of channels (mostly + * relevant to keyed exchanges). + * + * In both cases, records from multiple old channels are received over one new physical channel, which need to + * demultiplex the record to correctly restore spanning records (similar to how StreamTaskNetworkInput works). + * + * Note that when both cases occur at the same time (downscaling of several operators), there is the cross product of + * channels. So if two subtasks are collapsed and two channels overlap from the output side, there is a total of 4 + * virtual channels. + */ +@Internal +public final class RescalingStreamTaskNetworkInput implements RecoverableStreamTaskInput { Review comment: The man motivation was to avoid a secondary implementations of `RecordDeserializer` as that would translate to virtual calls in the regular `StreamTaskNetworkInput`. I have not measured the impact but it was a major concern of @pnowojski . Now, it would be very well possible to subclass `StreamTaskNetworkInput` or extract a common super class. However, because `recordDeserializers` is of a different type without common ancestor (for CHA), it's hard to generalize. I had hoped for some input on how to solve it. ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RescalingStreamTaskNetworkInput.java ## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with +
[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
AHeise commented on a change in pull request #13845: URL: https://github.com/apache/flink/pull/13845#discussion_r519229083 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java ## @@ -88,38 +127,66 @@ private RecoveredInputChannel getChannel(InputChannelInfo info) { private final ResultPartitionWriter[] writers; private final boolean notifyAndBlockOnCompletion; - ResultSubpartitionRecoveredStateHandler(ResultPartitionWriter[] writers, boolean notifyAndBlockOnCompletion) { + private final InflightDataRescalingDescriptor channelMapping; + + private final Map> rescaledChannels = new HashMap<>(); + + ResultSubpartitionRecoveredStateHandler(ResultPartitionWriter[] writers, boolean notifyAndBlockOnCompletion, InflightDataRescalingDescriptor channelMapping) { this.writers = writers; + this.channelMapping = channelMapping; this.notifyAndBlockOnCompletion = notifyAndBlockOnCompletion; } @Override public BufferWithContext> getBuffer(ResultSubpartitionInfo subpartitionInfo) throws IOException, InterruptedException { - BufferBuilder bufferBuilder = getSubpartition(subpartitionInfo).requestBufferBuilderBlocking(); + final List channels = getMappedChannels(subpartitionInfo); + BufferBuilder bufferBuilder = channels.get(0).requestBufferBuilderBlocking(); return new BufferWithContext<>(wrap(bufferBuilder), Tuple2.of(bufferBuilder, bufferBuilder.createBufferConsumer())); } @Override - public void recover(ResultSubpartitionInfo subpartitionInfo, Tuple2 bufferBuilderAndConsumer) throws IOException { + public void recover( + ResultSubpartitionInfo subpartitionInfo, + int oldSubtaskIndex, + Tuple2 bufferBuilderAndConsumer) throws IOException { bufferBuilderAndConsumer.f0.finish(); if (bufferBuilderAndConsumer.f1.isDataAvailable()) { - boolean added = getSubpartition(subpartitionInfo).add(bufferBuilderAndConsumer.f1, Integer.MIN_VALUE); - if (!added) { - throw new IOException("Buffer consumer couldn't be added to ResultSubpartition"); + final List channels = getMappedChannels(subpartitionInfo); + for (final CheckpointedResultSubpartition channel : channels) { + // channel selector is created from the downstream's point of view: the subtask of downstream = subpartition index of recovered buffer + final VirtualChannelSelector channelSelector = new VirtualChannelSelector(subpartitionInfo.getSubPartitionIdx(), oldSubtaskIndex); + channel.add(EventSerializer.toBufferConsumer(channelSelector, false), Integer.MIN_VALUE); + boolean added = channel.add(bufferBuilderAndConsumer.f1.copy(), Integer.MIN_VALUE); + if (!added) { + throw new IOException("Buffer consumer couldn't be added to ResultSubpartition"); + } } - } else { - bufferBuilderAndConsumer.f1.close(); } + bufferBuilderAndConsumer.f1.close(); } - private CheckpointedResultSubpartition getSubpartition(ResultSubpartitionInfo subpartitionInfo) { - ResultPartitionWriter writer = writers[subpartitionInfo.getPartitionIdx()]; - if (writer instanceof CheckpointedResultPartition) { - return ((CheckpointedResultPartition) writer).getCheckpointedSubpartition(subpartitionInfo.getSubPartitionIdx()); - } else { - throw new IllegalStateException( - "Cannot restore state to a non-checkpointable partition type: " + writer); + private CheckpointedResultSubpartition getSubpartition(int partitionIndex, int subPartitionIdx) { + ResultPartitionWriter writer = writers[partitionIndex]; + if (!(writer instanceof CheckpointedResultPartition)) { + throw new IllegalStateException("Cannot restore state to a non-checkpointable partition type: " + writer); } + return ((CheckpointedResultPartition) writer).getCheckpointedSubpartition(subPartitionIdx); + } + + private List getMappedChannels(ResultSubpartitionInfo subpartitionInfo) { + return rescaledChannels.computeIfAbsent(subpartitionInfo, this::calculateMapping); + } + + private static final Logger LOG =
[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
flinkbot edited a comment on pull request #13845: URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990 ## CI report: * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN * 215a25d83260a048877bdf8462a06473676efb8a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9223) * fe94615bb4be9771757f98e93ff11e8d765231f2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9282) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
AHeise commented on a change in pull request #13845: URL: https://github.com/apache/flink/pull/13845#discussion_r519228672 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Demultiplexer.java ## @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.RescaledChannelsMapping; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.VirtualChannelSelector; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * {@link RecordDeserializer}-like interface for recovery. To avoid additional virtual method calls on the + * non-recovery hotpath, this interface is not extending RecordDeserializer. + */ +interface Demultiplexer extends AutoCloseable { Review comment: Good idea. Would it be okay to keep the implementation names as is to avoid super-long names? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
AHeise commented on a change in pull request #13845: URL: https://github.com/apache/flink/pull/13845#discussion_r519228359 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Demultiplexer.java ## @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.RescaledChannelsMapping; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.VirtualChannelSelector; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * {@link RecordDeserializer}-like interface for recovery. To avoid additional virtual method calls on the + * non-recovery hotpath, this interface is not extending RecordDeserializer. + */ +interface Demultiplexer extends AutoCloseable { + RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate deserializationDelegate) throws IOException; + + void setNextBuffer(Buffer buffer) throws IOException; + + void select(VirtualChannelSelector event); + + @Override + void close(); +} + +class NoDataDemultiplexer implements Demultiplexer { + private final InputChannelInfo channelInfo; + + public NoDataDemultiplexer(InputChannelInfo channelInfo) { + this.channelInfo = channelInfo; + } + + @Override + public RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate deserializationDelegate) { + throw getException(); + } + + @Override + public void setNextBuffer(Buffer buffer) { + throw getException(); + } + + @Override + public void select(VirtualChannelSelector event) { + throw getException(); + } + + private IllegalStateException getException() { + return new IllegalStateException(channelInfo + " should not receive any data/events during recovery"); + } + + @Override + public void close() { + } +} + +/** + * Parameter structure to pass all relevant information to the factory methods of @{@link Demultiplexer}. + */ +class DemultiplexParameters { + final IOManager ioManager; + final InflightDataRescalingDescriptor channelMapping; + final Function> gatePartitionerRetriever; + final SerializationDelegate delegate; + final int numberOfChannels; + final int subtaskIndex; + + @SuppressWarnings("unchecked") + DemultiplexParameters( + TypeSerializer inputSerializer, + IOManager ioManager, + InflightDataRescalingDescriptor channelMapping, +
[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
AHeise commented on a change in pull request #13845: URL: https://github.com/apache/flink/pull/13845#discussion_r519228182 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Demultiplexer.java ## @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.RescaledChannelsMapping; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.VirtualChannelSelector; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * {@link RecordDeserializer}-like interface for recovery. To avoid additional virtual method calls on the + * non-recovery hotpath, this interface is not extending RecordDeserializer. + */ +interface Demultiplexer extends AutoCloseable { + RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate deserializationDelegate) throws IOException; + + void setNextBuffer(Buffer buffer) throws IOException; + + void select(VirtualChannelSelector event); + + @Override + void close(); +} + +class NoDataDemultiplexer implements Demultiplexer { + private final InputChannelInfo channelInfo; + + public NoDataDemultiplexer(InputChannelInfo channelInfo) { + this.channelInfo = channelInfo; + } + + @Override + public RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate deserializationDelegate) { + throw getException(); + } + + @Override + public void setNextBuffer(Buffer buffer) { + throw getException(); + } + + @Override + public void select(VirtualChannelSelector event) { + throw getException(); + } + + private IllegalStateException getException() { + return new IllegalStateException(channelInfo + " should not receive any data/events during recovery"); + } + + @Override + public void close() { + } +} + +/** + * Parameter structure to pass all relevant information to the factory methods of @{@link Demultiplexer}. + */ +class DemultiplexParameters { + final IOManager ioManager; + final InflightDataRescalingDescriptor channelMapping; + final Function> gatePartitionerRetriever; + final SerializationDelegate delegate; + final int numberOfChannels; + final int subtaskIndex; + + @SuppressWarnings("unchecked") + DemultiplexParameters( + TypeSerializer inputSerializer, + IOManager ioManager, + InflightDataRescalingDescriptor channelMapping, +
[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
AHeise commented on a change in pull request #13845: URL: https://github.com/apache/flink/pull/13845#discussion_r519227877 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RescalingStreamTaskNetworkInput.java ## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; +import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.api.VirtualChannelSelector; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link StreamTaskNetworkInput} implementation that demultiplexes virtual channels. + * + * The demultiplexing works in two dimensions for the following cases. + * + * Subtasks of the current operator have been collapsed in a round-robin fashion. + * The connected output operator has been rescaled (up and down!) and there is an overlap of channels (mostly + * relevant to keyed exchanges). + * + * In both cases, records from multiple old channels are received over one new physical channel, which need to + * demultiplex the record to correctly restore spanning records (similar to how StreamTaskNetworkInput works). + * + * Note that when both cases occur at the same time (downscaling of several operators), there is the cross product of + * channels. So if two subtasks are collapsed and two channels overlap from the output side, there is a total of 4 + * virtual channels. + */ +@Internal +public final class RescalingStreamTaskNetworkInput implements RecoverableStreamTaskInput { + + private final CheckpointedInputGate checkpointedInputGate; + + private final DeserializationDelegate deserializationDelegate; + + private final Demultiplexer[] channelDemultiplexers; + + /** Valve that controls how watermarks and stream statuses are forwarded. */ + private final StatusWatermarkValve statusWatermarkValve; + + private final int inputIndex; + + private final Map channelIndexes; Review comment: `StatusWatermarkValve` operators on indexes, so we need this indirection. We should probably refactor that (it's the same in `StreamTaskNetworkInput`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
AHeise commented on a change in pull request #13845: URL: https://github.com/apache/flink/pull/13845#discussion_r519227412 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Demultiplexer.java ## @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.RescaledChannelsMapping; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.VirtualChannelSelector; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * {@link RecordDeserializer}-like interface for recovery. To avoid additional virtual method calls on the + * non-recovery hotpath, this interface is not extending RecordDeserializer. + */ +interface Demultiplexer extends AutoCloseable { + RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate deserializationDelegate) throws IOException; + + void setNextBuffer(Buffer buffer) throws IOException; + + void select(VirtualChannelSelector event); + + @Override + void close(); +} + +class NoDataDemultiplexer implements Demultiplexer { + private final InputChannelInfo channelInfo; + + public NoDataDemultiplexer(InputChannelInfo channelInfo) { + this.channelInfo = channelInfo; + } + + @Override + public RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate deserializationDelegate) { + throw getException(); + } + + @Override + public void setNextBuffer(Buffer buffer) { + throw getException(); + } + + @Override + public void select(VirtualChannelSelector event) { + throw getException(); + } + + private IllegalStateException getException() { + return new IllegalStateException(channelInfo + " should not receive any data/events during recovery"); + } + + @Override + public void close() { + } +} + +/** + * Parameter structure to pass all relevant information to the factory methods of @{@link Demultiplexer}. + */ +class DemultiplexParameters { + final IOManager ioManager; + final InflightDataRescalingDescriptor channelMapping; + final Function> gatePartitionerRetriever; + final SerializationDelegate delegate; + final int numberOfChannels; + final int subtaskIndex; + + @SuppressWarnings("unchecked") + DemultiplexParameters( + TypeSerializer inputSerializer, + IOManager ioManager, + InflightDataRescalingDescriptor channelMapping, +
[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
flinkbot edited a comment on pull request #13845: URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990 ## CI report: * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN * 215a25d83260a048877bdf8462a06473676efb8a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9223) * fe94615bb4be9771757f98e93ff11e8d765231f2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.
AHeise commented on a change in pull request #13845: URL: https://github.com/apache/flink/pull/13845#discussion_r519226903 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Demultiplexer.java ## @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.RescaledChannelsMapping; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.VirtualChannelSelector; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * {@link RecordDeserializer}-like interface for recovery. To avoid additional virtual method calls on the + * non-recovery hotpath, this interface is not extending RecordDeserializer. + */ +interface Demultiplexer extends AutoCloseable { + RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate deserializationDelegate) throws IOException; + + void setNextBuffer(Buffer buffer) throws IOException; + + void select(VirtualChannelSelector event); + + @Override + void close(); +} + +class NoDataDemultiplexer implements Demultiplexer { + private final InputChannelInfo channelInfo; + + public NoDataDemultiplexer(InputChannelInfo channelInfo) { + this.channelInfo = channelInfo; + } + + @Override + public RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate deserializationDelegate) { + throw getException(); + } + + @Override + public void setNextBuffer(Buffer buffer) { + throw getException(); + } + + @Override + public void select(VirtualChannelSelector event) { + throw getException(); + } + + private IllegalStateException getException() { + return new IllegalStateException(channelInfo + " should not receive any data/events during recovery"); + } + + @Override + public void close() { + } +} + +/** + * Parameter structure to pass all relevant information to the factory methods of @{@link Demultiplexer}. + */ +class DemultiplexParameters { + final IOManager ioManager; + final InflightDataRescalingDescriptor channelMapping; + final Function> gatePartitionerRetriever; + final SerializationDelegate delegate; + final int numberOfChannels; + final int subtaskIndex; + + @SuppressWarnings("unchecked") + DemultiplexParameters( + TypeSerializer inputSerializer, + IOManager ioManager, + InflightDataRescalingDescriptor channelMapping, +