[GitHub] [flink] flinkbot edited a comment on pull request #13977: [Mirror] [FLINK-19238] Sanity check for RocksDB arena block size

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13977:
URL: https://github.com/apache/flink/pull/13977#issuecomment-723460604


   
   ## CI report:
   
   * 39cc0db6968c005405244145a452dbada545cba9 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9267)
 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9295)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13963:
URL: https://github.com/apache/flink/pull/13963#issuecomment-723044893


   
   ## CI report:
   
   * cfd0be434e9ef7b045dc0b418d68b1ad0665d186 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9252)
 
   * 873f52f02d69a32b1f78822fa6e0fe850833807f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9291)
 
   * 292f7debbab2b02b317b2578421edb9eed5cdb48 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9294)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] carp84 commented on pull request #13977: [Mirror] [FLINK-19238] Sanity check for RocksDB arena block size

2020-11-07 Thread GitBox


carp84 commented on pull request #13977:
URL: https://github.com/apache/flink/pull/13977#issuecomment-723542250


   The pre-commit build with 39cc0db was failed due to FLINK-19882, rerun to 
get a green build.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19882) E2E: SQLClientHBaseITCase crash

2020-11-07 Thread Yu Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227929#comment-17227929
 ] 

Yu Li commented on FLINK-19882:
---

Another instance:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9267=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529

> E2E: SQLClientHBaseITCase crash
> ---
>
> Key: FLINK-19882
> URL: https://issues.apache.org/jira/browse/FLINK-19882
> Project: Flink
>  Issue Type: Test
>  Components: Connectors / HBase
>Reporter: Jingsong Lee
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> INSTANCE: 
> [https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/8563/logs/141]
> {code:java}
> 2020-10-29T09:43:24.0088180Z [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test (end-to-end-tests) 
> on project flink-end-to-end-tests-hbase: There are test failures.
> 2020-10-29T09:43:24.0088792Z [ERROR] 
> 2020-10-29T09:43:24.0089518Z [ERROR] Please refer to 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire-reports
>  for the individual test results.
> 2020-10-29T09:43:24.0090427Z [ERROR] Please refer to dump files (if any 
> exist) [date].dump, [date]-jvmRun[N].dump and [date].dumpstream.
> 2020-10-29T09:43:24.0090914Z [ERROR] The forked VM terminated without 
> properly saying goodbye. VM crash or System.exit called?
> 2020-10-29T09:43:24.0093105Z [ERROR] Command was /bin/sh -c cd 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target
>  && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m 
> -Dmvn.forkNumber=2 -XX:+UseG1GC -jar 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire/surefirebooter6795869883612750001.jar
>  
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire
>  2020-10-29T09-34-47_778-jvmRun2 surefire2269050977160717631tmp 
> surefire_67897497331523564186tmp
> 2020-10-29T09:43:24.0094488Z [ERROR] Error occurred in starting fork, check 
> output in log
> 2020-10-29T09:43:24.0094797Z [ERROR] Process Exit Code: 143
> 2020-10-29T09:43:24.0095033Z [ERROR] Crashed tests:
> 2020-10-29T09:43:24.0095321Z [ERROR] 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase
> 2020-10-29T09:43:24.0095828Z [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: The forked VM 
> terminated without properly saying goodbye. VM crash or System.exit called?
> 2020-10-29T09:43:24.0097838Z [ERROR] Command was /bin/sh -c cd 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target
>  && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m 
> -Dmvn.forkNumber=2 -XX:+UseG1GC -jar 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire/surefirebooter6795869883612750001.jar
>  
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire
>  2020-10-29T09-34-47_778-jvmRun2 surefire2269050977160717631tmp 
> surefire_67897497331523564186tmp
> 2020-10-29T09:43:24.0098966Z [ERROR] Error occurred in starting fork, check 
> output in log
> 2020-10-29T09:43:24.0099266Z [ERROR] Process Exit Code: 143
> 2020-10-29T09:43:24.0099502Z [ERROR] Crashed tests:
> 2020-10-29T09:43:24.0099789Z [ERROR] 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase
> 2020-10-29T09:43:24.0100331Z [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.fork(ForkStarter.java:669)
> 2020-10-29T09:43:24.0100883Z [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:282)
> 2020-10-29T09:43:24.0101774Z [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:245)
> 2020-10-29T09:43:24.0102360Z [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183)
> 2020-10-29T09:43:24.0103004Z [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011)
> 2020-10-29T09:43:24.0103737Z [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857)
> 2020-10-29T09:43:24.0104301Z [ERROR] at 
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
> 2020-10-29T09:43:24.0104828Z [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> 2020-10-29T09:43:24.0105334Z [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> 2020-10-29T09:43:24.0105826Z [ERROR] at 
> 

[GitHub] [flink] carp84 commented on pull request #13977: [Mirror] [FLINK-19238] Sanity check for RocksDB arena block size

2020-11-07 Thread GitBox


carp84 commented on pull request #13977:
URL: https://github.com/apache/flink/pull/13977#issuecomment-723542122


   @flinkbot run azure



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13963:
URL: https://github.com/apache/flink/pull/13963#issuecomment-723044893


   
   ## CI report:
   
   * cfd0be434e9ef7b045dc0b418d68b1ad0665d186 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9252)
 
   * 873f52f02d69a32b1f78822fa6e0fe850833807f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9291)
 
   * 292f7debbab2b02b317b2578421edb9eed5cdb48 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-19990) MultipleInputNodeCreationProcessor#isChainableSource should consider DataStreamScanProvider

2020-11-07 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-19990.

Fix Version/s: 1.12.0
 Assignee: Caizhi Weng
   Resolution: Fixed

master (1.12): 83174261b33f942f6fd7475d7115ccd430f27bab

> MultipleInputNodeCreationProcessor#isChainableSource should consider 
> DataStreamScanProvider
> ---
>
> Key: FLINK-19990
> URL: https://issues.apache.org/jira/browse/FLINK-19990
> Project: Flink
>  Issue Type: Bug
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> {{MultipleInputNodeCreationProcessor#isChainableSource}} now only considers 
> {{SourceProvider}}. However {{DataStreamScanProvider}} providing 
> {{DataStream}} with {{SourceTransformation}} are also chainable sources, and 
> we need to take them into consideration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JingsongLi merged pull request #13942: [FLINK-19990][table-planner-blink] MultipleInputNodeCreationProcessor#isChainableSource now considers DataStreamScanProvider

2020-11-07 Thread GitBox


JingsongLi merged pull request #13942:
URL: https://github.com/apache/flink/pull/13942


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-20045) ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval failed with "TimeoutException: Contender was not elected as the leader within 200000ms"

2020-11-07 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227925#comment-17227925
 ] 

Yang Wang commented on FLINK-20045:
---

I am having a look on this failed test.

> ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval failed with 
> "TimeoutException: Contender was not elected as the leader within 20ms"
> 
>
> Key: FLINK-20045
> URL: https://issues.apache.org/jira/browse/FLINK-20045
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9251=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0
> {code}
> 2020-11-07T10:34:07.5063203Z [ERROR] 
> testZooKeeperLeaderElectionRetrieval(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest)
>   Time elapsed: 202.445 s  <<< ERROR!
> 2020-11-07T10:34:07.5064331Z java.util.concurrent.TimeoutException: Contender 
> was not elected as the leader within 20ms
> 2020-11-07T10:34:07.5064946Z  at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:153)
> 2020-11-07T10:34:07.5065762Z  at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:139)
> 2020-11-07T10:34:07.5066565Z  at 
> org.apache.flink.runtime.leaderelection.TestingLeaderBase.waitForLeader(TestingLeaderBase.java:48)
> 2020-11-07T10:34:07.5067185Z  at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval(ZooKeeperLeaderElectionTest.java:144)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13942: [FLINK-19990][table-planner-blink] MultipleInputNodeCreationProcessor#isChainableSource now considers DataStreamScanProvider

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13942:
URL: https://github.com/apache/flink/pull/13942#issuecomment-722358038


   
   ## CI report:
   
   * c2172fbea08608c4d073643da6c3d23cf895fa27 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9287)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-20047) DefaultLeaderRetrievalService should only notify the LeaderRetrievalListener when leader truly changed

2020-11-07 Thread Yang Wang (Jira)
Yang Wang created FLINK-20047:
-

 Summary: DefaultLeaderRetrievalService should only notify the 
LeaderRetrievalListener when leader truly changed
 Key: FLINK-20047
 URL: https://issues.apache.org/jira/browse/FLINK-20047
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Yang Wang
 Fix For: 1.12.0


Currently, in {{DefaultLeaderRetrievalService}} we are notifying the 
{{LeaderRetrievalListener}} even though the leader is not truly changed. In 
Kubernetes HA service, we could get lots of leader information notification 
events with same leader. Because the ConfigMap is updated periodically. We 
should filter out these useless events and do not notify the 
{{LeaderRetrievalListener}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] curcur commented on a change in pull request #13880: [FLINK-19693][runtime] Downstream Failover for Approximate Local Recovery

2020-11-07 Thread GitBox


curcur commented on a change in pull request #13880:
URL: https://github.com/apache/flink/pull/13880#discussion_r519263971



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##
@@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph 
graph) {
} else {
graph.setStateBackend(stateBackend);

graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+   
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
graph.setScheduleMode(ScheduleMode.EAGER);
+
+   if 
(checkpointConfig.isApproximateLocalRecoveryEnabled()) {
+   checkApproximateLocalRecoveryCompatibility();
+   
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
+   } else {
+   
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+   }
}
}
 
+   private void checkApproximateLocalRecoveryCompatibility() {
+   checkState(
+   !checkpointConfig.isUnalignedCheckpointsEnabled(),
+   "Approximate Local Recovery and Unaligned Checkpoint 
can not be used together yet");

Review comment:
   I will also add a test for this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #13880: [FLINK-19693][runtime] Downstream Failover for Approximate Local Recovery

2020-11-07 Thread GitBox


curcur commented on a change in pull request #13880:
URL: https://github.com/apache/flink/pull/13880#discussion_r519263840



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##
@@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph 
graph) {
} else {
graph.setStateBackend(stateBackend);

graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+   
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
graph.setScheduleMode(ScheduleMode.EAGER);
+
+   if 
(checkpointConfig.isApproximateLocalRecoveryEnabled()) {
+   checkApproximateLocalRecoveryCompatibility();
+   
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
+   } else {
+   
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+   }
}
}
 
+   private void checkApproximateLocalRecoveryCompatibility() {
+   checkState(
+   !checkpointConfig.isUnalignedCheckpointsEnabled(),
+   "Approximate Local Recovery and Unaligned Checkpoint 
can not be used together yet");

Review comment:
   I've tried out different ways to do the check. It is slightly different 
from the unaligned checkpoint check, because the PipelinedRegionScheduler is 
configured in the JobMaster level, and approximate local recovery is configured 
at the job level.
   
   The check should happen in the same place where either 
PIPELINED_REGION_SCHEDULING or LEGACY_SCHEDULING is decided, where the type 
SchedulingStrategyFactory is decided.
   
   1. In JobGraph, the most reasonable place to put the config 
"isApproximateLocalRecoveryEnabled" seems to be 
`JobCheckpointingSettings#CheckpointCoordinatorConfiguration`, similar to 
unaligned checkpoint's config. However `CheckpointCoordinatorConfiguration` as 
its name, is for CheckpointCoordinator and will be serialized to 
CheckpointCoordinator. But in fact, CheckpointCoordinator does not need 
`isApproximateLocalRecoveryEnabled` for anything, and it breaks a lot of tests, 
so, at this point, it is probably the best place to put.
   
   2. So I put `isApproximateLocalRecoveryEnabled` in a similar place as 
`scheduleMode` in JobGraph, and will be removed together with `scheduleMode` 
later. This flag is only used to make sure ApproximateLocalRecovery is not used 
together with JobManagerOptions.SCHEDULING_STRATEGY to region
   
   3. If JobManagerOptions.SCHEDULING_STRATEGY is set to legacy, EAGER strategy 
is enforced in `StreamGraphGenerator#configureStreamGraph`
   
   ```
   } else {
graph.setStateBackend(stateBackend);
graph.setScheduleMode(ScheduleMode.EAGER);
   
if 
(checkpointConfig.isApproximateLocalRecoveryEnabled()) {
checkApproximateLocalRecoveryCompatibility();

graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
} else {

graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
}
}
   ```
   
   
   






This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #13880: [FLINK-19693][runtime] Downstream Failover for Approximate Local Recovery

2020-11-07 Thread GitBox


curcur commented on a change in pull request #13880:
URL: https://github.com/apache/flink/pull/13880#discussion_r519263840



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##
@@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph 
graph) {
} else {
graph.setStateBackend(stateBackend);

graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+   
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
graph.setScheduleMode(ScheduleMode.EAGER);
+
+   if 
(checkpointConfig.isApproximateLocalRecoveryEnabled()) {
+   checkApproximateLocalRecoveryCompatibility();
+   
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
+   } else {
+   
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+   }
}
}
 
+   private void checkApproximateLocalRecoveryCompatibility() {
+   checkState(
+   !checkpointConfig.isUnalignedCheckpointsEnabled(),
+   "Approximate Local Recovery and Unaligned Checkpoint 
can not be used together yet");

Review comment:
   I've tried out different ways to do the check. It is slightly different 
from the unaligned checkpoint check, because the PipelinedRegionScheduler is 
configured in the JobMaster level, and approximate local recovery is configured 
at the job level.
   
   The check should happen in the same place where either 
PIPELINED_REGION_SCHEDULING or LEGACY_SCHEDULING is decided, where the type 
SchedulingStrategyFactory is decided.
   
   1. In JobGraph, the most reasonable place to put the config 
"isApproximateLocalRecoveryEnabled" seems to be 
`JobCheckpointingSettings#CheckpointCoordinatorConfiguration`, similar to 
unaligned checkpoint's config. However `CheckpointCoordinatorConfiguration` as 
its name, is for CheckpointCoordinator and will be serialized to 
CheckpointCoordinator. But in fact, CheckpointCoordinator does not need 
`isApproximateLocalRecoveryEnabled` for anything, and it breaks a lot of tests, 
so, at this point, it is probably the best place to put.
   
   2. So I put `isApproximateLocalRecoveryEnabled` in a similar place as 
`scheduleMode` in JobGraph, and will be removed together with `scheduleMode` 
later. This flag is only used to make sure ApproximateLocalRecovery is not used 
together with JobManagerOptions.SCHEDULING_STRATEGY to region
   
   3. If JobManagerOptions.SCHEDULING_STRATEGY is set to legacy, EAGER strategy 
is enforced in `StreamGraphGenerator#configureStreamGraph`
   
   ```
   } else {
graph.setStateBackend(stateBackend);
graph.setScheduleMode(ScheduleMode.EAGER);
   
if (checkpointConfig.isApproximateLocalRecoveryEnabled()) {
checkApproximateLocalRecoveryCompatibility();

graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
} else {

graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
}
   }
   ```
   
   
   






This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-706309837


   
   ## CI report:
   
   * 95a2052bdb6586122683e9531f157230ec75cd37 UNKNOWN
   * a9439ddaafd89e49968c1f17f0f02f21293b8343 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9270)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9285)
 
   * 8de0f73bc4abfc12f7996a26b7753431fcf885d3 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9292)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13963:
URL: https://github.com/apache/flink/pull/13963#issuecomment-723044893


   
   ## CI report:
   
   * cfd0be434e9ef7b045dc0b418d68b1ad0665d186 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9252)
 
   * 873f52f02d69a32b1f78822fa6e0fe850833807f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9291)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #13880: [FLINK-19693][runtime] Downstream Failover for Approximate Local Recovery

2020-11-07 Thread GitBox


curcur commented on a change in pull request #13880:
URL: https://github.com/apache/flink/pull/13880#discussion_r519257405



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##
@@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph 
graph) {
} else {
graph.setStateBackend(stateBackend);

graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+   
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);

Review comment:
   It seems coming from a code merge (for sorted merge shuffle), but for 
some reason, it was removed in a later version.
   
   It may be counted as my change since I manually resolve somehow. 
   
   Thanks for catching this. I will remove it!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #13880: [FLINK-19693][runtime] Downstream Failover for Approximate Local Recovery

2020-11-07 Thread GitBox


curcur commented on a change in pull request #13880:
URL: https://github.com/apache/flink/pull/13880#discussion_r519257405



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##
@@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph 
graph) {
} else {
graph.setStateBackend(stateBackend);

graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+   
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);

Review comment:
   It seems coming from a code merge (for sorted merge shuffle), but for 
some reason, it was removed in a later version.
   
   Thanks for catching this. I will remove it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13963:
URL: https://github.com/apache/flink/pull/13963#issuecomment-723044893


   
   ## CI report:
   
   * cfd0be434e9ef7b045dc0b418d68b1ad0665d186 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9252)
 
   * 873f52f02d69a32b1f78822fa6e0fe850833807f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lirui-apache commented on pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


lirui-apache commented on pull request #13963:
URL: https://github.com/apache/flink/pull/13963#issuecomment-723530163


   Thanks @StephanEwen and @JingsongLi for the review. I have updated and 
addressed your comments. Please have another look, thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


lirui-apache commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519256075



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.event.RequestSplitEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A continuously monitoring {@link SplitEnumerator} for hive source.
+ */
+public class ContinuousHiveSplitEnumerator> implements 
SplitEnumerator> {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class);
+
+   private final SplitEnumeratorContext enumeratorContext;
+   private final LinkedHashMap readersAwaitingSplit;
+   private final FileSplitAssigner splitAssigner;
+   private final long discoveryInterval;
+
+   private final JobConf jobConf;
+   private final ObjectPath tablePath;
+
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+
+   private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+   // the maximum partition read offset seen so far
+   private volatile T currentReadOffset;
+   // the partitions that have been processed for a given read offset
+   private final Set> seenPartitions;
+
+   public ContinuousHiveSplitEnumerator(
+   SplitEnumeratorContext 
enumeratorContext,
+   T currentReadOffset,
+   Collection> seenPartitions,
+   FileSplitAssigner splitAssigner,
+   long discoveryInterval,
+   JobConf jobConf,
+   ObjectPath tablePath,
+   ContinuousPartitionFetcher fetcher,
+   
HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext) {
+   this.enumeratorContext = enumeratorContext;
+   this.currentReadOffset = currentReadOffset;
+   this.seenPartitions = new HashSet<>(seenPartitions);
+   this.splitAssigner = splitAssigner;
+   this.discoveryInterval = discoveryInterval;
+   this.jobConf = jobConf;
+   this.tablePath = tablePath;
+   this.fetcher = fetcher;
+   this.fetcherContext = fetcherContext;
+   readersAwaitingSplit = new LinkedHashMap<>();
+   }
+
+   @Override
+   public void start() {
+   try {
+   fetcherContext.open();
+   enumeratorContext.callAsync(
+   this::monitorAndGetSplits,
+   this::handleNewSplits,
+   discoveryInterval,
+

[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


lirui-apache commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519256019



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
##
@@ -46,29 +58,98 @@
 
private static final long serialVersionUID = 1L;
 
+   private final JobConfWrapper jobConfWrapper;
+   private final List partitionKeys;
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+   private final ObjectPath tablePath;
+
HiveSource(
JobConf jobConf,
+   ObjectPath tablePath,
CatalogTable catalogTable,
List partitions,
@Nullable Long limit,
String hiveVersion,
boolean useMapRedReader,
-   boolean isStreamingSource,
+   @Nullable ContinuousEnumerationSettings 
continuousEnumerationSettings,

Review comment:
   I added the builder, but the builder constructor still takes quite a few 
parameters. That's because the BulkFormat needs to be created in the 
constructor. And since BulkFormat is determined during construction, it makes 
no sense to allow modifying `tablePath` or `partitions` later through the 
builder.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


lirui-apache commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519255792



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
##
@@ -46,29 +58,98 @@
 
private static final long serialVersionUID = 1L;
 
+   private final JobConfWrapper jobConfWrapper;
+   private final List partitionKeys;
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+   private final ObjectPath tablePath;
+
HiveSource(
JobConf jobConf,
+   ObjectPath tablePath,
CatalogTable catalogTable,
List partitions,
@Nullable Long limit,
String hiveVersion,
boolean useMapRedReader,
-   boolean isStreamingSource,
+   @Nullable ContinuousEnumerationSettings 
continuousEnumerationSettings,
+   ContinuousPartitionFetcher fetcher,
+   
HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext,
RowType producedRowType) {
super(
new org.apache.flink.core.fs.Path[1],
new 
HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)),
-   DEFAULT_SPLIT_ASSIGNER,
+   continuousEnumerationSettings == null ? 
DEFAULT_SPLIT_ASSIGNER : SimpleSplitAssigner::new,

Review comment:
   I think ideally we need to implement an assigner that assigns splits in 
order, but is also locality-aware. Guess we can leave that for the future.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19699) PrometheusReporterEndToEndITCase crashes with exit code 143

2020-11-07 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227911#comment-17227911
 ] 

Zhu Zhu commented on FLINK-19699:
-

Another instance:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9256=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529

> PrometheusReporterEndToEndITCase crashes with exit code 143
> ---
>
> Key: FLINK-19699
> URL: https://issues.apache.org/jira/browse/FLINK-19699
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7814=logs=68a897ab-3047-5660-245a-cce8f83859f6=16ca2cca-2f63-5cce-12d2-d519b930a729]
> {code}
> 2020-10-18T23:46:04.9667443Z [ERROR] The forked VM terminated without 
> properly saying goodbye. VM crash or System.exit called?
> 2020-10-18T23:46:04.9669237Z [ERROR] Command was /bin/sh -c cd 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target
>  && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m 
> -Dmvn.forkNumber=2 -XX:+UseG1GC -jar 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire/surefirebooter6797466627443523305.jar
>  
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire
>  2020-10-18T23-44-09_467-jvmRun2 surefire930806459376622178tmp 
> surefire_41970585275084524978tmp
> 2020-10-18T23:46:04.9670440Z [ERROR] Error occurred in starting fork, check 
> output in log
> 2020-10-18T23:46:04.9671283Z [ERROR] Process Exit Code: 143
> 2020-10-18T23:46:04.9671614Z [ERROR] Crashed tests:
> 2020-10-18T23:46:04.9672025Z [ERROR] 
> org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase
> 2020-10-18T23:46:04.9672649Z [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: The forked VM 
> terminated without properly saying goodbye. VM crash or System.exit called?
> 2020-10-18T23:46:04.9674834Z [ERROR] Command was /bin/sh -c cd 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target
>  && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m 
> -Dmvn.forkNumber=2 -XX:+UseG1GC -jar 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire/surefirebooter6797466627443523305.jar
>  
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire
>  2020-10-18T23-44-09_467-jvmRun2 surefire930806459376622178tmp 
> surefire_41970585275084524978tmp
> 2020-10-18T23:46:04.9676153Z [ERROR] Error occurred in starting fork, check 
> output in log
> 2020-10-18T23:46:04.9676556Z [ERROR] Process Exit Code: 143
> 2020-10-18T23:46:04.9676882Z [ERROR] Crashed tests:
> 2020-10-18T23:46:04.9677288Z [ERROR] 
> org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase
> 2020-10-18T23:46:04.9677827Z [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.fork(ForkStarter.java:669)
> 2020-10-18T23:46:04.9678408Z [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:282)
> 2020-10-18T23:46:04.9678965Z [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:245)
> 2020-10-18T23:46:04.9679575Z [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183)
> 2020-10-18T23:46:04.9680983Z [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011)
> 2020-10-18T23:46:04.9681749Z [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857)
> 2020-10-18T23:46:04.9682246Z [ERROR] at 
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
> 2020-10-18T23:46:04.9682728Z [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> 2020-10-18T23:46:04.9683179Z [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> 2020-10-18T23:46:04.9683609Z [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> 2020-10-18T23:46:04.9684102Z [ERROR] at 
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
> 2020-10-18T23:46:04.9684639Z [ERROR] at 
> 

[jira] [Commented] (FLINK-19882) E2E: SQLClientHBaseITCase crash

2020-11-07 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227910#comment-17227910
 ] 

Zhu Zhu commented on FLINK-19882:
-

Another instance:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9268=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529

> E2E: SQLClientHBaseITCase crash
> ---
>
> Key: FLINK-19882
> URL: https://issues.apache.org/jira/browse/FLINK-19882
> Project: Flink
>  Issue Type: Test
>  Components: Connectors / HBase
>Reporter: Jingsong Lee
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> INSTANCE: 
> [https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/8563/logs/141]
> {code:java}
> 2020-10-29T09:43:24.0088180Z [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test (end-to-end-tests) 
> on project flink-end-to-end-tests-hbase: There are test failures.
> 2020-10-29T09:43:24.0088792Z [ERROR] 
> 2020-10-29T09:43:24.0089518Z [ERROR] Please refer to 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire-reports
>  for the individual test results.
> 2020-10-29T09:43:24.0090427Z [ERROR] Please refer to dump files (if any 
> exist) [date].dump, [date]-jvmRun[N].dump and [date].dumpstream.
> 2020-10-29T09:43:24.0090914Z [ERROR] The forked VM terminated without 
> properly saying goodbye. VM crash or System.exit called?
> 2020-10-29T09:43:24.0093105Z [ERROR] Command was /bin/sh -c cd 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target
>  && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m 
> -Dmvn.forkNumber=2 -XX:+UseG1GC -jar 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire/surefirebooter6795869883612750001.jar
>  
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire
>  2020-10-29T09-34-47_778-jvmRun2 surefire2269050977160717631tmp 
> surefire_67897497331523564186tmp
> 2020-10-29T09:43:24.0094488Z [ERROR] Error occurred in starting fork, check 
> output in log
> 2020-10-29T09:43:24.0094797Z [ERROR] Process Exit Code: 143
> 2020-10-29T09:43:24.0095033Z [ERROR] Crashed tests:
> 2020-10-29T09:43:24.0095321Z [ERROR] 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase
> 2020-10-29T09:43:24.0095828Z [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: The forked VM 
> terminated without properly saying goodbye. VM crash or System.exit called?
> 2020-10-29T09:43:24.0097838Z [ERROR] Command was /bin/sh -c cd 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target
>  && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m 
> -Dmvn.forkNumber=2 -XX:+UseG1GC -jar 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire/surefirebooter6795869883612750001.jar
>  
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-end-to-end-tests-hbase/target/surefire
>  2020-10-29T09-34-47_778-jvmRun2 surefire2269050977160717631tmp 
> surefire_67897497331523564186tmp
> 2020-10-29T09:43:24.0098966Z [ERROR] Error occurred in starting fork, check 
> output in log
> 2020-10-29T09:43:24.0099266Z [ERROR] Process Exit Code: 143
> 2020-10-29T09:43:24.0099502Z [ERROR] Crashed tests:
> 2020-10-29T09:43:24.0099789Z [ERROR] 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase
> 2020-10-29T09:43:24.0100331Z [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.fork(ForkStarter.java:669)
> 2020-10-29T09:43:24.0100883Z [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:282)
> 2020-10-29T09:43:24.0101774Z [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:245)
> 2020-10-29T09:43:24.0102360Z [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183)
> 2020-10-29T09:43:24.0103004Z [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011)
> 2020-10-29T09:43:24.0103737Z [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857)
> 2020-10-29T09:43:24.0104301Z [ERROR] at 
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
> 2020-10-29T09:43:24.0104828Z [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> 2020-10-29T09:43:24.0105334Z [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> 2020-10-29T09:43:24.0105826Z [ERROR] at 
> 

[GitHub] [flink] flinkbot edited a comment on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-706309837


   
   ## CI report:
   
   * 95a2052bdb6586122683e9531f157230ec75cd37 UNKNOWN
   * a9439ddaafd89e49968c1f17f0f02f21293b8343 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9270)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9285)
 
   * 8de0f73bc4abfc12f7996a26b7753431fcf885d3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13296:
URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193


   
   ## CI report:
   
   * c266341c63cc4cdc28a6585a82294710fd8ebbc5 UNKNOWN
   * 3eb68994c05caed91cef945800d7cb586d617663 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9286)
 
   * 1d4db76b21faf3bb2835be6e529f1d2478237272 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9289)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-20037) Fix the Javadoc of TableEnvironment#fromValues(AbstractDataType, Object...)

2020-11-07 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-20037.
---
Fix Version/s: 1.12.0
   Resolution: Fixed

Fixed in 1.12.0: eeb69fc9694489f200b94f70edcfc52ba0343718

> Fix the Javadoc of TableEnvironment#fromValues(AbstractDataType, Object...)
> ---
>
> Key: FLINK-20037
> URL: https://issues.apache.org/jira/browse/FLINK-20037
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.11.1
>Reporter: yandufeng
>Assignee: yandufeng
>Priority: Trivial
>  Labels: pull-request-available, starter
> Fix For: 1.12.0
>
>
> there is a comment problem in fromValues(AbstractDataType rowType, 
> Object... values) method of TableEnvironment, i think second column name is 
> "name" not "f1". this is my first issue, if there is a problem, please 
> understand.
> * Examples:
> * {@code
> * tEnv.fromValues(
> * DataTypes.ROW(
> * DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
> * DataTypes.FIELD("name", DataTypes.STRING())
> * ),
> * row(1, "ABC"),
> * row(2L, "ABCDE")
> * )
> * }
> * will produce a Table with a schema as follows:
> * {@code
> * root
> * |-- id: DECIMAL(10, 2)
> * |-- f1: STRING
> * }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong merged pull request #13976: [FLINK-20037][Table SQL / API]update comment problem in fromValues(AbstractDataType rowType, Obj…

2020-11-07 Thread GitBox


wuchong merged pull request #13976:
URL: https://github.com/apache/flink/pull/13976


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13296:
URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193


   
   ## CI report:
   
   * c266341c63cc4cdc28a6585a82294710fd8ebbc5 UNKNOWN
   * 3eb68994c05caed91cef945800d7cb586d617663 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9286)
 
   * 1d4db76b21faf3bb2835be6e529f1d2478237272 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-20037) Fix the Javadoc of TableEnvironment#fromValues(AbstractDataType, Object...)

2020-11-07 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-20037:

Summary: Fix the Javadoc of TableEnvironment#fromValues(AbstractDataType, 
Object...)  (was: there is a comment problem in fromValues(AbstractDataType 
rowType, Object... values)  method of TableEnvironment)

> Fix the Javadoc of TableEnvironment#fromValues(AbstractDataType, Object...)
> ---
>
> Key: FLINK-20037
> URL: https://issues.apache.org/jira/browse/FLINK-20037
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.11.1
>Reporter: yandufeng
>Assignee: yandufeng
>Priority: Trivial
>  Labels: pull-request-available, starter
>
> there is a comment problem in fromValues(AbstractDataType rowType, 
> Object... values) method of TableEnvironment, i think second column name is 
> "name" not "f1". this is my first issue, if there is a problem, please 
> understand.
> * Examples:
> * {@code
> * tEnv.fromValues(
> * DataTypes.ROW(
> * DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
> * DataTypes.FIELD("name", DataTypes.STRING())
> * ),
> * row(1, "ABC"),
> * row(2L, "ABCDE")
> * )
> * }
> * will produce a Table with a schema as follows:
> * {@code
> * root
> * |-- id: DECIMAL(10, 2)
> * |-- f1: STRING
> * }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-20037) there is a comment problem in fromValues(AbstractDataType rowType, Object... values) method of TableEnvironment

2020-11-07 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-20037:
---

Assignee: yandufeng

> there is a comment problem in fromValues(AbstractDataType rowType, 
> Object... values)  method of TableEnvironment
> ---
>
> Key: FLINK-20037
> URL: https://issues.apache.org/jira/browse/FLINK-20037
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.11.1
>Reporter: yandufeng
>Assignee: yandufeng
>Priority: Trivial
>  Labels: pull-request-available, starter
>
> there is a comment problem in fromValues(AbstractDataType rowType, 
> Object... values) method of TableEnvironment, i think second column name is 
> "name" not "f1". this is my first issue, if there is a problem, please 
> understand.
> * Examples:
> * {@code
> * tEnv.fromValues(
> * DataTypes.ROW(
> * DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
> * DataTypes.FIELD("name", DataTypes.STRING())
> * ),
> * row(1, "ABC"),
> * row(2L, "ABCDE")
> * )
> * }
> * will produce a Table with a schema as follows:
> * {@code
> * root
> * |-- id: DECIMAL(10, 2)
> * |-- f1: STRING
> * }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-20041) Support Watermark push down for kafka connector

2020-11-07 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-20041.
---
Fix Version/s: 1.12.0
 Assignee: Shengkai Fang
   Resolution: Fixed

Implemented in master (1.12.0): 24d612bd4337f1443f379fa3bac2b6050b3d32c8

> Support Watermark push down for kafka connector
> ---
>
> Key: FLINK-20041
> URL: https://issues.apache.org/jira/browse/FLINK-20041
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Table SQL / API
>Affects Versions: 1.12.0
>Reporter: Shengkai Fang
>Assignee: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Support watermark push down for kafka connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong merged pull request #13975: [FLINK-20041][connector kafka] Support watermark push down for kafka …

2020-11-07 Thread GitBox


wuchong merged pull request #13975:
URL: https://github.com/apache/flink/pull/13975


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13942: [FLINK-19990][table-planner-blink] MultipleInputNodeCreationProcessor#isChainableSource now considers DataStreamScanProvider

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13942:
URL: https://github.com/apache/flink/pull/13942#issuecomment-722358038


   
   ## CI report:
   
   * ea8e4df517dcb1779d4959fc54917cbcac76c07f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9271)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9195)
 
   * c2172fbea08608c4d073643da6c3d23cf895fa27 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9287)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


lirui-apache commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519249864



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpointSerializer.java
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpointSerializer;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * SerDe for {@link ContinuousHivePendingSplitsCheckpoint}.
+ */
+public class ContinuousHivePendingSplitsCheckpointSerializer implements 
SimpleVersionedSerializer> {
+
+   private static final int VERSION = 1;
+
+   private final PendingSplitsCheckpointSerializer 
superSerDe;
+
+   public 
ContinuousHivePendingSplitsCheckpointSerializer(SimpleVersionedSerializer
 splitSerDe) {
+   superSerDe = new 
PendingSplitsCheckpointSerializer<>(splitSerDe);
+   }
+
+   @Override
+   public int getVersion() {
+   return VERSION;
+   }
+
+   @Override
+   public byte[] serialize(PendingSplitsCheckpoint 
checkpoint) throws IOException {
+   Preconditions.checkArgument(checkpoint.getClass() == 
ContinuousHivePendingSplitsCheckpoint.class,
+   "Only supports %s", 
ContinuousHivePendingSplitsCheckpoint.class.getName());
+
+   ContinuousHivePendingSplitsCheckpoint hiveCheckpoint = 
(ContinuousHivePendingSplitsCheckpoint) checkpoint;
+   PendingSplitsCheckpoint superCP = 
PendingSplitsCheckpoint.fromCollectionSnapshot(
+   hiveCheckpoint.getSplits(), 
hiveCheckpoint.getAlreadyProcessedPaths());
+   byte[] superBytes = superSerDe.serialize(superCP);
+   ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+   try (ObjectOutputStream outputStream = new 
ObjectOutputStream(byteArrayOutputStream)) {
+   outputStream.writeInt(superBytes.length);
+   outputStream.write(superBytes);
+   
outputStream.writeObject(hiveCheckpoint.getCurrentReadOffset());

Review comment:
   Do you mean we need a SerDe interface for the read offset?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13942: [FLINK-19990][table-planner-blink] MultipleInputNodeCreationProcessor#isChainableSource now considers DataStreamScanProvider

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13942:
URL: https://github.com/apache/flink/pull/13942#issuecomment-722358038


   
   ## CI report:
   
   * ea8e4df517dcb1779d4959fc54917cbcac76c07f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9271)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9195)
 
   * c2172fbea08608c4d073643da6c3d23cf895fa27 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13845:
URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990


   
   ## CI report:
   
   * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN
   * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN
   * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN
   * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN
   * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN
   * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN
   * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN
   * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN
   * 23323a7b983fac19fdd620b0cc82eace73cc587f UNKNOWN
   * d4c46ab021f7d464004ca5f58589b25a5334f147 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9284)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13296:
URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193


   
   ## CI report:
   
   * c266341c63cc4cdc28a6585a82294710fd8ebbc5 UNKNOWN
   * 3eb68994c05caed91cef945800d7cb586d617663 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9286)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13296:
URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193


   
   ## CI report:
   
   * c266341c63cc4cdc28a6585a82294710fd8ebbc5 UNKNOWN
   * c57d805bea826af82626be82c5487e7259d3eaf9 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9262)
 
   * 3eb68994c05caed91cef945800d7cb586d617663 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


lirui-apache commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519247124



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.event.RequestSplitEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A continuously monitoring {@link SplitEnumerator} for hive source.
+ */
+public class ContinuousHiveSplitEnumerator> implements 
SplitEnumerator> {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class);
+
+   private final SplitEnumeratorContext enumeratorContext;
+   private final LinkedHashMap readersAwaitingSplit;
+   private final FileSplitAssigner splitAssigner;
+   private final long discoveryInterval;
+
+   private final JobConf jobConf;
+   private final ObjectPath tablePath;
+
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+
+   private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+   // the maximum partition read offset seen so far
+   private volatile T currentReadOffset;
+   // the partitions that have been processed for a given read offset
+   private final Set> seenPartitions;
+
+   public ContinuousHiveSplitEnumerator(
+   SplitEnumeratorContext 
enumeratorContext,
+   T currentReadOffset,
+   Collection> seenPartitions,
+   FileSplitAssigner splitAssigner,
+   long discoveryInterval,
+   JobConf jobConf,
+   ObjectPath tablePath,
+   ContinuousPartitionFetcher fetcher,
+   
HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext) {
+   this.enumeratorContext = enumeratorContext;
+   this.currentReadOffset = currentReadOffset;
+   this.seenPartitions = new HashSet<>(seenPartitions);
+   this.splitAssigner = splitAssigner;
+   this.discoveryInterval = discoveryInterval;
+   this.jobConf = jobConf;
+   this.tablePath = tablePath;
+   this.fetcher = fetcher;
+   this.fetcherContext = fetcherContext;
+   readersAwaitingSplit = new LinkedHashMap<>();
+   }
+
+   @Override
+   public void start() {
+   try {
+   fetcherContext.open();
+   enumeratorContext.callAsync(
+   this::monitorAndGetSplits,
+   this::handleNewSplits,
+   discoveryInterval,
+

[GitHub] [flink] RocMarshal commented on pull request #13750: [FLINK-19394][docs-zh] Translate the 'Monitoring Checkpointing' page of 'Debugging & Monitoring' into Chinese

2020-11-07 Thread GitBox


RocMarshal commented on pull request #13750:
URL: https://github.com/apache/flink/pull/13750#issuecomment-723521553


   @dianfu 
   Could you help me to merge this PR if there is nothing inappropriate ?
   
   Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


lirui-apache commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519246308



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.event.RequestSplitEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A continuously monitoring {@link SplitEnumerator} for hive source.
+ */
+public class ContinuousHiveSplitEnumerator> implements 
SplitEnumerator> {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class);
+
+   private final SplitEnumeratorContext enumeratorContext;
+   private final LinkedHashMap readersAwaitingSplit;
+   private final FileSplitAssigner splitAssigner;
+   private final long discoveryInterval;
+
+   private final JobConf jobConf;
+   private final ObjectPath tablePath;
+
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+
+   private final ReadWriteLock stateLock = new ReentrantReadWriteLock();

Review comment:
   We do. Because the `monitorAndGetSplits` is scheduled asynchronously by 
`SplitEnumeratorContext::callAsync`. So there can be concurrent calls of, say, 
`monitorAndGetSplits` and `snapshotState`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-20037) there is a comment problem in fromValues(AbstractDataType rowType, Object... values) method of TableEnvironment

2020-11-07 Thread yandufeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227896#comment-17227896
 ] 

yandufeng commented on FLINK-20037:
---

Do i need to do something?
https://github.com/apache/flink/pull/13976

> there is a comment problem in fromValues(AbstractDataType rowType, 
> Object... values)  method of TableEnvironment
> ---
>
> Key: FLINK-20037
> URL: https://issues.apache.org/jira/browse/FLINK-20037
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.11.1
>Reporter: yandufeng
>Priority: Trivial
>  Labels: pull-request-available, starter
>
> there is a comment problem in fromValues(AbstractDataType rowType, 
> Object... values) method of TableEnvironment, i think second column name is 
> "name" not "f1". this is my first issue, if there is a problem, please 
> understand.
> * Examples:
> * {@code
> * tEnv.fromValues(
> * DataTypes.ROW(
> * DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
> * DataTypes.FIELD("name", DataTypes.STRING())
> * ),
> * row(1, "ABC"),
> * row(2L, "ABCDE")
> * )
> * }
> * will produce a Table with a schema as follows:
> * {@code
> * root
> * |-- id: DECIMAL(10, 2)
> * |-- f1: STRING
> * }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13845:
URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990


   
   ## CI report:
   
   * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN
   * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN
   * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN
   * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN
   * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN
   * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN
   * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN
   * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN
   * fe94615bb4be9771757f98e93ff11e8d765231f2 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9282)
 
   * 23323a7b983fac19fdd620b0cc82eace73cc587f UNKNOWN
   * d4c46ab021f7d464004ca5f58589b25a5334f147 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9284)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on pull request #13920: [FLINK-19743] Add metric definitions in FLIP-33 and report some of them by default.

2020-11-07 Thread GitBox


becketqin commented on pull request #13920:
URL: https://github.com/apache/flink/pull/13920#issuecomment-723520262


   > A thought on performance: There are some metrics that use volatile 
variables.
   > 
   > So far we always avoided that. Even "infrequently accessed" fields, like 
watermarks, are quite frequent in some use cases.
   > And a single volatile write has a very significant overhead in a 
high-performance pipeline. It is the number one thing we try to avoid, because 
it breaks the processor pipeline and puts pressure on the memory bus and 
processor's cache coherency mechanism. For the metrics, performance (least 
impact on main data paths) should have clear precedence.
   > 
   > For metrics, we don't need a hard "happens before" relationship or any 
"immediate visibility". If the visibility is some 100s of milliseconds later 
(which would be a lot), it is no issue for the metrics at all, which are 
periodically reported every few seconds.
   > 
   > The case where the JIT inlines the variable and eliminates cross-thread 
visibility entirely is AFAIK not realistic here. And if we ever find that the 
metric gets not reported, we can still use an occasional volatile write (like 
we do occasional volatile exception checks, every 1000 records, in the network 
stack).
   
   Thanks for the comments and suggestions. I was actually struggling on this a 
little bit. The current implementation is based on the following assumptions:
   * The volatile read is usually as cheap as a normal variable read unless 
there is write contention. Modern CPUs can achieve a cheap read if there isn't 
a contention on the shared variable. The CPU cache line usually can cache the 
shared variable and just read from it. The reads only go to main memory if the 
value has been updated by another core, in that case the reading CPU's cache 
line is invalidated.
   * In the current implementation, the main thread performs a read on a 
volatile variable of `shouldUpdateCurrentEventTimeLag` for every record. The 
`shouldUpdateCurrentEventTimeLag` is set to true by a separate thread every 
second. If the main thread sees the value becomes true, it will update the 
volatile variable of `currentEmitEventTimeLag` with `System.currentTimeMillis() 
- timestamp`. So this is a volatile variable write and potentially a system 
call that the main thread performs once per second.
   * The `watermark` update is infrequent. From your comment this assumption 
seems to optimistic. So we can probably also update the `currentWatermark` once 
per second.
   
   Another part that took me some time to think about is whether we should use 
number-of-record-based interval v.s. time-based interval for metric reporting. 
In cases when there is a large throughput, these two approaches probably won't 
make any difference. But in case of low throughput, time-based-interval seems 
preferable. Imagine we emit record every 1000 records, but there is a low 
throughput stream that only processes one record every 5 seconds. In order to 
see the metrics get updated we need to wait for over an hour, which seems bad.
   
   I am curious do we have some experience that invalidates these assumptions?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-706309837


   
   ## CI report:
   
   * 95a2052bdb6586122683e9531f157230ec75cd37 UNKNOWN
   * a9439ddaafd89e49968c1f17f0f02f21293b8343 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9270)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9285)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19863) SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process failed due to timeout"

2020-11-07 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227894#comment-17227894
 ] 

Dian Fu commented on FLINK-19863:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9281=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529

> SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process 
> failed due to timeout"
> ---
>
> Key: FLINK-19863
> URL: https://issues.apache.org/jira/browse/FLINK-19863
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541=logs=91bf6583-3fb2-592f-e4d4-d79d79c3230a=3425d8ba-5f03-540a-c64b-51b8481bf7d6
> {code}
> 00:50:02,589 [main] INFO  
> org.apache.flink.tests.util.flink.FlinkDistribution  [] - Stopping 
> Flink cluster.
> 00:50:04,106 [main] INFO  
> org.apache.flink.tests.util.flink.FlinkDistribution  [] - Stopping 
> Flink cluster.
> 00:50:04,741 [main] INFO  
> org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource [] - Backed up 
> logs to 
> /home/vsts/work/1/s/flink-end-to-end-tests/artifacts/flink-b3924665-1ac9-4309-8255-20f0dc94e7b9.
> 00:50:04,788 [main] INFO  
> org.apache.flink.tests.util.hbase.LocalStandaloneHBaseResource [] - Stopping 
> HBase Cluster
> 00:50:16,243 [main] ERROR 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase   [] - 
> 
> Test testHBase[0: 
> hbase-version:1.4.3](org.apache.flink.tests.util.hbase.SQLClientHBaseITCase) 
> failed with:
> java.io.IOException: Process failed due to timeout.
>   at 
> org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:130)
>   at 
> org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:108)
>   at 
> org.apache.flink.tests.util.flink.FlinkDistribution.submitSQLJob(FlinkDistribution.java:221)
>   at 
> org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource$StandaloneClusterController.submitSQLJob(LocalStandaloneFlinkResource.java:196)
>   at 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.executeSqlStatements(SQLClientHBaseITCase.java:215)
>   at 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.testHBase(SQLClientHBaseITCase.java:152)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20006) FileSinkITCase.testFileSink: The record 0 should occur 4 times, but only occurs 8time expected:<4> but was:<8>

2020-11-07 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227893#comment-17227893
 ] 

Dian Fu commented on FLINK-20006:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=298e20ef-7951-5965-0e79-ea664ddc435e=b4cd3436-dbe8-556d-3bca-42f92c3cbf2f

> FileSinkITCase.testFileSink: The record 0 should occur 4 times,  but only 
> occurs 8time expected:<4> but was:<8>
> ---
>
> Key: FLINK-20006
> URL: https://issues.apache.org/jira/browse/FLINK-20006
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9082=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> {code}
> 2020-11-05T13:31:16.7006473Z [ERROR] Tests run: 4, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 5.565 s <<< FAILURE! - in 
> org.apache.flink.connector.file.sink.FileSinkITCase
> 2020-11-05T13:31:16.7007237Z [ERROR] testFileSink[executionMode = STREAMING, 
> triggerFailover = true](org.apache.flink.connector.file.sink.FileSinkITCase)  
> Time elapsed: 0.548 s  <<< FAILURE!
> 2020-11-05T13:31:16.7007897Z java.lang.AssertionError: The record 0 should 
> occur 4 times,  but only occurs 8time expected:<4> but was:<8>
> 2020-11-05T13:31:16.7008317Z  at org.junit.Assert.fail(Assert.java:88)
> 2020-11-05T13:31:16.7008644Z  at 
> org.junit.Assert.failNotEquals(Assert.java:834)
> 2020-11-05T13:31:16.7008987Z  at 
> org.junit.Assert.assertEquals(Assert.java:645)
> 2020-11-05T13:31:16.7009392Z  at 
> org.apache.flink.connector.file.sink.FileSinkITCase.checkResult(FileSinkITCase.java:218)
> 2020-11-05T13:31:16.7009889Z  at 
> org.apache.flink.connector.file.sink.FileSinkITCase.testFileSink(FileSinkITCase.java:132)
> 2020-11-05T13:31:16.7010316Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20006) FileSinkITCase.testFileSink: The record 0 should occur 4 times, but only occurs 8time expected:<4> but was:<8>

2020-11-07 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227892#comment-17227892
 ] 

Dian Fu commented on FLINK-20006:
-

Another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=420bd9ec-164e-562e-8947-0dacde3cec91

{code}
2020-11-07T22:18:57.6776989Z [ERROR] testFileSink[triggerFailover = 
false](org.apache.flink.connector.file.sink.StreamingExecutionFileSinkITCase)  
Time elapsed: 2.773 s  <<< FAILURE!
2020-11-07T22:18:57.6777584Z java.lang.AssertionError: expected:<2500> but 
was:<0>
2020-11-07T22:18:57.6777929Zat org.junit.Assert.fail(Assert.java:88)
2020-11-07T22:18:57.6778280Zat 
org.junit.Assert.failNotEquals(Assert.java:834)
2020-11-07T22:18:57.6778647Zat 
org.junit.Assert.assertEquals(Assert.java:645)
2020-11-07T22:18:57.6778997Zat 
org.junit.Assert.assertEquals(Assert.java:631)
2020-11-07T22:18:57.6779449Zat 
org.apache.flink.connector.file.sink.FileSinkITBase.checkResult(FileSinkITBase.java:152)
2020-11-07T22:18:57.6780189Zat 
org.apache.flink.connector.file.sink.FileSinkITBase.testFileSink(FileSinkITBase.java:104)
{code}

> FileSinkITCase.testFileSink: The record 0 should occur 4 times,  but only 
> occurs 8time expected:<4> but was:<8>
> ---
>
> Key: FLINK-20006
> URL: https://issues.apache.org/jira/browse/FLINK-20006
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9082=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> {code}
> 2020-11-05T13:31:16.7006473Z [ERROR] Tests run: 4, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 5.565 s <<< FAILURE! - in 
> org.apache.flink.connector.file.sink.FileSinkITCase
> 2020-11-05T13:31:16.7007237Z [ERROR] testFileSink[executionMode = STREAMING, 
> triggerFailover = true](org.apache.flink.connector.file.sink.FileSinkITCase)  
> Time elapsed: 0.548 s  <<< FAILURE!
> 2020-11-05T13:31:16.7007897Z java.lang.AssertionError: The record 0 should 
> occur 4 times,  but only occurs 8time expected:<4> but was:<8>
> 2020-11-05T13:31:16.7008317Z  at org.junit.Assert.fail(Assert.java:88)
> 2020-11-05T13:31:16.7008644Z  at 
> org.junit.Assert.failNotEquals(Assert.java:834)
> 2020-11-05T13:31:16.7008987Z  at 
> org.junit.Assert.assertEquals(Assert.java:645)
> 2020-11-05T13:31:16.7009392Z  at 
> org.apache.flink.connector.file.sink.FileSinkITCase.checkResult(FileSinkITCase.java:218)
> 2020-11-05T13:31:16.7009889Z  at 
> org.apache.flink.connector.file.sink.FileSinkITCase.testFileSink(FileSinkITCase.java:132)
> 2020-11-05T13:31:16.7010316Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (FLINK-20006) FileSinkITCase.testFileSink: The record 0 should occur 4 times, but only occurs 8time expected:<4> but was:<8>

2020-11-07 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reopened FLINK-20006:
-

> FileSinkITCase.testFileSink: The record 0 should occur 4 times,  but only 
> occurs 8time expected:<4> but was:<8>
> ---
>
> Key: FLINK-20006
> URL: https://issues.apache.org/jira/browse/FLINK-20006
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9082=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> {code}
> 2020-11-05T13:31:16.7006473Z [ERROR] Tests run: 4, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 5.565 s <<< FAILURE! - in 
> org.apache.flink.connector.file.sink.FileSinkITCase
> 2020-11-05T13:31:16.7007237Z [ERROR] testFileSink[executionMode = STREAMING, 
> triggerFailover = true](org.apache.flink.connector.file.sink.FileSinkITCase)  
> Time elapsed: 0.548 s  <<< FAILURE!
> 2020-11-05T13:31:16.7007897Z java.lang.AssertionError: The record 0 should 
> occur 4 times,  but only occurs 8time expected:<4> but was:<8>
> 2020-11-05T13:31:16.7008317Z  at org.junit.Assert.fail(Assert.java:88)
> 2020-11-05T13:31:16.7008644Z  at 
> org.junit.Assert.failNotEquals(Assert.java:834)
> 2020-11-05T13:31:16.7008987Z  at 
> org.junit.Assert.assertEquals(Assert.java:645)
> 2020-11-05T13:31:16.7009392Z  at 
> org.apache.flink.connector.file.sink.FileSinkITCase.checkResult(FileSinkITCase.java:218)
> 2020-11-05T13:31:16.7009889Z  at 
> org.apache.flink.connector.file.sink.FileSinkITCase.testFileSink(FileSinkITCase.java:132)
> 2020-11-05T13:31:16.7010316Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20046) StreamTableAggregateTests.test_map_view_iterate is instable

2020-11-07 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-20046:

Labels: test-stability  (was: )

> StreamTableAggregateTests.test_map_view_iterate is instable
> ---
>
> Key: FLINK-20046
> URL: https://issues.apache.org/jira/browse/FLINK-20046
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Major
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=821b528f-1eed-5598-a3b4-7f748b13f261=4fad9527-b9a5-5015-1b70-8356e5c91490
> {code}
> 2020-11-07T22:50:57.4180758Z ___ 
> StreamTableAggregateTests.test_map_view_iterate 
> 2020-11-07T22:50:57.4181301Z 
> 2020-11-07T22:50:57.4181965Z self = 
>  testMethod=test_map_view_iterate>
> 2020-11-07T22:50:57.4182348Z 
> 2020-11-07T22:50:57.4182535Z def test_map_view_iterate(self):
> 2020-11-07T22:50:57.4182812Z test_iterate = 
> udaf(TestIterateAggregateFunction())
> 2020-11-07T22:50:57.4183320Z 
> self.t_env.get_config().set_idle_state_retention(datetime.timedelta(days=1))
> 2020-11-07T22:50:57.4183763Z 
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4297555Z "python.fn-execution.bundle.size", 
> "2")
> 2020-11-07T22:50:57.4297922Z # trigger the cache eviction in a bundle.
> 2020-11-07T22:50:57.4308028Z 
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4308653Z "python.state.cache-size", "2")
> 2020-11-07T22:50:57.4308945Z 
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4309382Z "python.map-state.read-cache-size", 
> "2")
> 2020-11-07T22:50:57.4309676Z 
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4310428Z "python.map-state.write-cache-size", 
> "2")
> 2020-11-07T22:50:57.4310701Z 
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4311130Z 
> "python.map-state.iterate-response-batch-size", "2")
> 2020-11-07T22:50:57.4311361Z t = self.t_env.from_elements(
> 2020-11-07T22:50:57.4311691Z [(1, 'Hi_', 'hi'),
> 2020-11-07T22:50:57.4312004Z  (1, 'Hi', 'hi'),
> 2020-11-07T22:50:57.4312316Z  (2, 'hello', 'hello'),
> 2020-11-07T22:50:57.4312639Z  (3, 'Hi_', 'hi'),
> 2020-11-07T22:50:57.4312975Z  (3, 'Hi', 'hi'),
> 2020-11-07T22:50:57.4313285Z  (4, 'hello', 'hello'),
> 2020-11-07T22:50:57.4313609Z  (5, 'Hi2_', 'hi'),
> 2020-11-07T22:50:57.4313908Z  (5, 'Hi2', 'hi'),
> 2020-11-07T22:50:57.4314238Z  (6, 'hello2', 'hello'),
> 2020-11-07T22:50:57.4314558Z  (7, 'Hi', 'hi'),
> 2020-11-07T22:50:57.4315053Z  (8, 'hello', 'hello'),
> 2020-11-07T22:50:57.4315396Z  (9, 'Hi2', 'hi'),
> 2020-11-07T22:50:57.4315773Z  (13, 'Hi3', 'hi')], ['a', 'b', 'c'])
> 2020-11-07T22:50:57.4316023Z 
> self.t_env.create_temporary_view("source", t)
> 2020-11-07T22:50:57.4316299Z table_with_retract_message = 
> self.t_env.sql_query(
> 2020-11-07T22:50:57.4316615Z "select LAST_VALUE(b) as b, 
> LAST_VALUE(c) as c from source group by a")
> 2020-11-07T22:50:57.4316919Z result = 
> table_with_retract_message.group_by(t.c) \
> 2020-11-07T22:50:57.4317197Z 
> .select(test_iterate(t.b).alias("a"), t.c) \
> 2020-11-07T22:50:57.4317619Z .select(col("a").get(0).alias("a"),
> 2020-11-07T22:50:57.4318111Z col("a").get(1).alias("b"),
> 2020-11-07T22:50:57.4318357Z col("a").get(2).alias("c"),
> 2020-11-07T22:50:57.4318586Z col("a").get(3).alias("d"),
> 2020-11-07T22:50:57.4318814Z t.c.alias("e"))
> 2020-11-07T22:50:57.4319023Z assert_frame_equal(
> 2020-11-07T22:50:57.4319208Z >   result.to_pandas(),
> 2020-11-07T22:50:57.4319408Z pd.DataFrame([
> 2020-11-07T22:50:57.4319872Z ["hello,hello2", "1,3", 
> 'hello:3,hello2:1', 2, "hello"],
> 2020-11-07T22:50:57.4320398Z ["Hi,Hi2,Hi3", "1,2,3", 
> "Hi:3,Hi2:2,Hi3:1", 3, "hi"]],
> 2020-11-07T22:50:57.4321047Z columns=['a', 'b', 'c', 'd', 
> 'e']))
> 2020-11-07T22:50:57.4321198Z 
> 2020-11-07T22:50:57.4321385Z pyflink/table/tests/test_aggregate.py:468: 
> 2020-11-07T22:50:57.4321648Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> 2020-11-07T22:50:57.4322040Z pyflink/table/table.py:807: in to_pandas
> 

[jira] [Commented] (FLINK-20046) StreamTableAggregateTests.test_map_view_iterate is instable

2020-11-07 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227891#comment-17227891
 ] 

Dian Fu commented on FLINK-20046:
-

cc [~zhongwei]

> StreamTableAggregateTests.test_map_view_iterate is instable
> ---
>
> Key: FLINK-20046
> URL: https://issues.apache.org/jira/browse/FLINK-20046
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Major
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=821b528f-1eed-5598-a3b4-7f748b13f261=4fad9527-b9a5-5015-1b70-8356e5c91490
> {code}
> 2020-11-07T22:50:57.4180758Z ___ 
> StreamTableAggregateTests.test_map_view_iterate 
> 2020-11-07T22:50:57.4181301Z 
> 2020-11-07T22:50:57.4181965Z self = 
>  testMethod=test_map_view_iterate>
> 2020-11-07T22:50:57.4182348Z 
> 2020-11-07T22:50:57.4182535Z def test_map_view_iterate(self):
> 2020-11-07T22:50:57.4182812Z test_iterate = 
> udaf(TestIterateAggregateFunction())
> 2020-11-07T22:50:57.4183320Z 
> self.t_env.get_config().set_idle_state_retention(datetime.timedelta(days=1))
> 2020-11-07T22:50:57.4183763Z 
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4297555Z "python.fn-execution.bundle.size", 
> "2")
> 2020-11-07T22:50:57.4297922Z # trigger the cache eviction in a bundle.
> 2020-11-07T22:50:57.4308028Z 
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4308653Z "python.state.cache-size", "2")
> 2020-11-07T22:50:57.4308945Z 
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4309382Z "python.map-state.read-cache-size", 
> "2")
> 2020-11-07T22:50:57.4309676Z 
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4310428Z "python.map-state.write-cache-size", 
> "2")
> 2020-11-07T22:50:57.4310701Z 
> self.t_env.get_config().get_configuration().set_string(
> 2020-11-07T22:50:57.4311130Z 
> "python.map-state.iterate-response-batch-size", "2")
> 2020-11-07T22:50:57.4311361Z t = self.t_env.from_elements(
> 2020-11-07T22:50:57.4311691Z [(1, 'Hi_', 'hi'),
> 2020-11-07T22:50:57.4312004Z  (1, 'Hi', 'hi'),
> 2020-11-07T22:50:57.4312316Z  (2, 'hello', 'hello'),
> 2020-11-07T22:50:57.4312639Z  (3, 'Hi_', 'hi'),
> 2020-11-07T22:50:57.4312975Z  (3, 'Hi', 'hi'),
> 2020-11-07T22:50:57.4313285Z  (4, 'hello', 'hello'),
> 2020-11-07T22:50:57.4313609Z  (5, 'Hi2_', 'hi'),
> 2020-11-07T22:50:57.4313908Z  (5, 'Hi2', 'hi'),
> 2020-11-07T22:50:57.4314238Z  (6, 'hello2', 'hello'),
> 2020-11-07T22:50:57.4314558Z  (7, 'Hi', 'hi'),
> 2020-11-07T22:50:57.4315053Z  (8, 'hello', 'hello'),
> 2020-11-07T22:50:57.4315396Z  (9, 'Hi2', 'hi'),
> 2020-11-07T22:50:57.4315773Z  (13, 'Hi3', 'hi')], ['a', 'b', 'c'])
> 2020-11-07T22:50:57.4316023Z 
> self.t_env.create_temporary_view("source", t)
> 2020-11-07T22:50:57.4316299Z table_with_retract_message = 
> self.t_env.sql_query(
> 2020-11-07T22:50:57.4316615Z "select LAST_VALUE(b) as b, 
> LAST_VALUE(c) as c from source group by a")
> 2020-11-07T22:50:57.4316919Z result = 
> table_with_retract_message.group_by(t.c) \
> 2020-11-07T22:50:57.4317197Z 
> .select(test_iterate(t.b).alias("a"), t.c) \
> 2020-11-07T22:50:57.4317619Z .select(col("a").get(0).alias("a"),
> 2020-11-07T22:50:57.4318111Z col("a").get(1).alias("b"),
> 2020-11-07T22:50:57.4318357Z col("a").get(2).alias("c"),
> 2020-11-07T22:50:57.4318586Z col("a").get(3).alias("d"),
> 2020-11-07T22:50:57.4318814Z t.c.alias("e"))
> 2020-11-07T22:50:57.4319023Z assert_frame_equal(
> 2020-11-07T22:50:57.4319208Z >   result.to_pandas(),
> 2020-11-07T22:50:57.4319408Z pd.DataFrame([
> 2020-11-07T22:50:57.4319872Z ["hello,hello2", "1,3", 
> 'hello:3,hello2:1', 2, "hello"],
> 2020-11-07T22:50:57.4320398Z ["Hi,Hi2,Hi3", "1,2,3", 
> "Hi:3,Hi2:2,Hi3:1", 3, "hi"]],
> 2020-11-07T22:50:57.4321047Z columns=['a', 'b', 'c', 'd', 
> 'e']))
> 2020-11-07T22:50:57.4321198Z 
> 2020-11-07T22:50:57.4321385Z pyflink/table/tests/test_aggregate.py:468: 
> 2020-11-07T22:50:57.4321648Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> 2020-11-07T22:50:57.4322040Z pyflink/table/table.py:807: in to_pandas
> 

[jira] [Created] (FLINK-20046) StreamTableAggregateTests.test_map_view_iterate is instable

2020-11-07 Thread Dian Fu (Jira)
Dian Fu created FLINK-20046:
---

 Summary: StreamTableAggregateTests.test_map_view_iterate is 
instable
 Key: FLINK-20046
 URL: https://issues.apache.org/jira/browse/FLINK-20046
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.12.0
Reporter: Dian Fu
 Fix For: 1.12.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=821b528f-1eed-5598-a3b4-7f748b13f261=4fad9527-b9a5-5015-1b70-8356e5c91490

{code}
2020-11-07T22:50:57.4180758Z ___ 
StreamTableAggregateTests.test_map_view_iterate 
2020-11-07T22:50:57.4181301Z 
2020-11-07T22:50:57.4181965Z self = 

2020-11-07T22:50:57.4182348Z 
2020-11-07T22:50:57.4182535Z def test_map_view_iterate(self):
2020-11-07T22:50:57.4182812Z test_iterate = 
udaf(TestIterateAggregateFunction())
2020-11-07T22:50:57.4183320Z 
self.t_env.get_config().set_idle_state_retention(datetime.timedelta(days=1))
2020-11-07T22:50:57.4183763Z 
self.t_env.get_config().get_configuration().set_string(
2020-11-07T22:50:57.4297555Z "python.fn-execution.bundle.size", "2")
2020-11-07T22:50:57.4297922Z # trigger the cache eviction in a bundle.
2020-11-07T22:50:57.4308028Z 
self.t_env.get_config().get_configuration().set_string(
2020-11-07T22:50:57.4308653Z "python.state.cache-size", "2")
2020-11-07T22:50:57.4308945Z 
self.t_env.get_config().get_configuration().set_string(
2020-11-07T22:50:57.4309382Z "python.map-state.read-cache-size", 
"2")
2020-11-07T22:50:57.4309676Z 
self.t_env.get_config().get_configuration().set_string(
2020-11-07T22:50:57.4310428Z "python.map-state.write-cache-size", 
"2")
2020-11-07T22:50:57.4310701Z 
self.t_env.get_config().get_configuration().set_string(
2020-11-07T22:50:57.4311130Z 
"python.map-state.iterate-response-batch-size", "2")
2020-11-07T22:50:57.4311361Z t = self.t_env.from_elements(
2020-11-07T22:50:57.4311691Z [(1, 'Hi_', 'hi'),
2020-11-07T22:50:57.4312004Z  (1, 'Hi', 'hi'),
2020-11-07T22:50:57.4312316Z  (2, 'hello', 'hello'),
2020-11-07T22:50:57.4312639Z  (3, 'Hi_', 'hi'),
2020-11-07T22:50:57.4312975Z  (3, 'Hi', 'hi'),
2020-11-07T22:50:57.4313285Z  (4, 'hello', 'hello'),
2020-11-07T22:50:57.4313609Z  (5, 'Hi2_', 'hi'),
2020-11-07T22:50:57.4313908Z  (5, 'Hi2', 'hi'),
2020-11-07T22:50:57.4314238Z  (6, 'hello2', 'hello'),
2020-11-07T22:50:57.4314558Z  (7, 'Hi', 'hi'),
2020-11-07T22:50:57.4315053Z  (8, 'hello', 'hello'),
2020-11-07T22:50:57.4315396Z  (9, 'Hi2', 'hi'),
2020-11-07T22:50:57.4315773Z  (13, 'Hi3', 'hi')], ['a', 'b', 'c'])
2020-11-07T22:50:57.4316023Z self.t_env.create_temporary_view("source", 
t)
2020-11-07T22:50:57.4316299Z table_with_retract_message = 
self.t_env.sql_query(
2020-11-07T22:50:57.4316615Z "select LAST_VALUE(b) as b, 
LAST_VALUE(c) as c from source group by a")
2020-11-07T22:50:57.4316919Z result = 
table_with_retract_message.group_by(t.c) \
2020-11-07T22:50:57.4317197Z .select(test_iterate(t.b).alias("a"), 
t.c) \
2020-11-07T22:50:57.4317619Z .select(col("a").get(0).alias("a"),
2020-11-07T22:50:57.4318111Z col("a").get(1).alias("b"),
2020-11-07T22:50:57.4318357Z col("a").get(2).alias("c"),
2020-11-07T22:50:57.4318586Z col("a").get(3).alias("d"),
2020-11-07T22:50:57.4318814Z t.c.alias("e"))
2020-11-07T22:50:57.4319023Z assert_frame_equal(
2020-11-07T22:50:57.4319208Z >   result.to_pandas(),
2020-11-07T22:50:57.4319408Z pd.DataFrame([
2020-11-07T22:50:57.4319872Z ["hello,hello2", "1,3", 
'hello:3,hello2:1', 2, "hello"],
2020-11-07T22:50:57.4320398Z ["Hi,Hi2,Hi3", "1,2,3", 
"Hi:3,Hi2:2,Hi3:1", 3, "hi"]],
2020-11-07T22:50:57.4321047Z columns=['a', 'b', 'c', 'd', 'e']))
2020-11-07T22:50:57.4321198Z 
2020-11-07T22:50:57.4321385Z pyflink/table/tests/test_aggregate.py:468: 
2020-11-07T22:50:57.4321648Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
2020-11-07T22:50:57.4322040Z pyflink/table/table.py:807: in to_pandas
2020-11-07T22:50:57.4322299Z .collectAsPandasDataFrame(self._j_table, 
max_arrow_batch_size)
2020-11-07T22:50:57.4322794Z 
.tox/py35-cython/lib/python3.5/site-packages/py4j/java_gateway.py:1286: in 
__call__
2020-11-07T22:50:57.4323103Z answer, self.gateway_client, self.target_id, 
self.name)
2020-11-07T22:50:57.4323351Z pyflink/util/exceptions.py:147: in deco
2020-11-07T22:50:57.4323537Z return f(*a, **kw)
2020-11-07T22:50:57.4323783Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

[jira] [Commented] (FLINK-19863) SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process failed due to timeout"

2020-11-07 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227890#comment-17227890
 ] 

Dian Fu commented on FLINK-19863:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=91bf6583-3fb2-592f-e4d4-d79d79c3230a=3425d8ba-5f03-540a-c64b-51b8481bf7d6

> SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process 
> failed due to timeout"
> ---
>
> Key: FLINK-19863
> URL: https://issues.apache.org/jira/browse/FLINK-19863
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541=logs=91bf6583-3fb2-592f-e4d4-d79d79c3230a=3425d8ba-5f03-540a-c64b-51b8481bf7d6
> {code}
> 00:50:02,589 [main] INFO  
> org.apache.flink.tests.util.flink.FlinkDistribution  [] - Stopping 
> Flink cluster.
> 00:50:04,106 [main] INFO  
> org.apache.flink.tests.util.flink.FlinkDistribution  [] - Stopping 
> Flink cluster.
> 00:50:04,741 [main] INFO  
> org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource [] - Backed up 
> logs to 
> /home/vsts/work/1/s/flink-end-to-end-tests/artifacts/flink-b3924665-1ac9-4309-8255-20f0dc94e7b9.
> 00:50:04,788 [main] INFO  
> org.apache.flink.tests.util.hbase.LocalStandaloneHBaseResource [] - Stopping 
> HBase Cluster
> 00:50:16,243 [main] ERROR 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase   [] - 
> 
> Test testHBase[0: 
> hbase-version:1.4.3](org.apache.flink.tests.util.hbase.SQLClientHBaseITCase) 
> failed with:
> java.io.IOException: Process failed due to timeout.
>   at 
> org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:130)
>   at 
> org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:108)
>   at 
> org.apache.flink.tests.util.flink.FlinkDistribution.submitSQLJob(FlinkDistribution.java:221)
>   at 
> org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource$StandaloneClusterController.submitSQLJob(LocalStandaloneFlinkResource.java:196)
>   at 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.executeSqlStatements(SQLClientHBaseITCase.java:215)
>   at 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.testHBase(SQLClientHBaseITCase.java:152)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-19545) Add e2e test for native Kubernetes HA

2020-11-07 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-19545:
--
Comment: was deleted

(was: We also need to add some integration tests for the KubernetesHAService, 
{{KubernetesLeaderElectionService}}, {{KubernetesLeaderRetrievalService}}, 
{{KubernetesJobGraphStore}}, etc.)

> Add e2e test for native Kubernetes HA
> -
>
> Key: FLINK-19545
> URL: https://issues.apache.org/jira/browse/FLINK-19545
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Yang Wang
>Assignee: Yang Wang
>Priority: Major
> Fix For: 1.12.0
>
>
> We could use minikube for the E2E tests. Start a Flink session/application 
> cluster on K8s, kill one TaskManager pod or JobManager Pod and wait for the 
> job recovered from the latest checkpoint successfully.
> {panel}
> {panel}
> |{{kubectl }}{{exec}} {{-it \{pod_name} -- }}{{/bin/sh}} {{-c }}{{"kill 1"}}|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19545) Add e2e test for native Kubernetes HA

2020-11-07 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227889#comment-17227889
 ] 

Yang Wang commented on FLINK-19545:
---

We already have some IT cases for the critical methods. Then we just need to 
add a E2E test in this issue.
{code:java}
# Run the ITCases
run_mvn test 
-Dtest=org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClientITCase
run_mvn test 
-Dtest=org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElectorITCase
run_mvn test 
-Dtest=org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionAndRetrievalITCase
run_mvn test 
-Dtest=org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreITCase
{code}

> Add e2e test for native Kubernetes HA
> -
>
> Key: FLINK-19545
> URL: https://issues.apache.org/jira/browse/FLINK-19545
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Yang Wang
>Assignee: Yang Wang
>Priority: Major
> Fix For: 1.12.0
>
>
> We could use minikube for the E2E tests. Start a Flink session/application 
> cluster on K8s, kill one TaskManager pod or JobManager Pod and wait for the 
> job recovered from the latest checkpoint successfully.
> {panel}
> {panel}
> |{{kubectl }}{{exec}} {{-it \{pod_name} -- }}{{/bin/sh}} {{-c }}{{"kill 1"}}|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19842) PyFlinkStreamUserDefinedTableFunctionTests.test_table_function_with_sql_query is unstable

2020-11-07 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227888#comment-17227888
 ] 

Dian Fu commented on FLINK-19842:
-

Seems another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3

{code}
2020-11-07T23:17:06.7806808Z self = 

2020-11-07T23:17:06.7807102Z 
2020-11-07T23:17:06.7807304Z def test_table_function(self):
2020-11-07T23:17:06.7807580Z self._register_table_sink(
2020-11-07T23:17:06.7807959Z ['a', 'b', 'c'],
2020-11-07T23:17:06.7808316Z [DataTypes.BIGINT(), 
DataTypes.BIGINT(), DataTypes.BIGINT()])
2020-11-07T23:17:06.7808665Z 
2020-11-07T23:17:06.7808944Z multi_emit = udtf(MultiEmit(), 
result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()])
2020-11-07T23:17:06.7809335Z multi_num = udf(MultiNum(), 
result_type=DataTypes.BIGINT())
2020-11-07T23:17:06.7809595Z 
2020-11-07T23:17:06.7810056Z t = self.t_env.from_elements([(1, 1, 3), 
(2, 1, 6), (3, 2, 9)], ['a', 'b', 'c'])
2020-11-07T23:17:06.7810613Z t = t.join_lateral(multi_emit(t.a, 
multi_num(t.b)).alias('x', 'y'))
2020-11-07T23:17:06.7811170Z t = 
t.left_outer_join_lateral(condition_multi_emit(t.x, t.y).alias('m')) \
2020-11-07T23:17:06.7811479Z .select("x, y, m")
2020-11-07T23:17:06.7812029Z t = 
t.left_outer_join_lateral(identity(t.m).alias('n')) \
2020-11-07T23:17:06.7812326Z .select("x, y, n")
2020-11-07T23:17:06.7812560Z actual = self._get_output(t)
2020-11-07T23:17:06.7812820Z self.assert_equals(actual,
2020-11-07T23:17:06.7813237Z["1,0,null", 
"1,1,null", "2,0,null", "2,1,null", "3,0,0", "3,0,1",
2020-11-07T23:17:06.7813628Z >   "3,0,2", "3,1,1", 
"3,1,2", "3,2,2", "3,3,null"])
{code}

> PyFlinkStreamUserDefinedTableFunctionTests.test_table_function_with_sql_query 
> is unstable
> -
>
> Key: FLINK-19842
> URL: https://issues.apache.org/jira/browse/FLINK-19842
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8401=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3
> {code}
> === FAILURES 
> ===
> _ 
> PyFlinkStreamUserDefinedTableFunctionTests.test_table_function_with_sql_query 
> _
> self = 
>  testMethod=test_table_function_with_sql_query>
> def test_table_function_with_sql_query(self):
> self._register_table_sink(
> ['a', 'b', 'c'],
> [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
> 
> self.t_env.create_temporary_system_function(
> "multi_emit", udtf(MultiEmit(), result_types=[DataTypes.BIGINT(), 
> DataTypes.BIGINT()]))
> 
> t = self.t_env.from_elements([(1, 1, 3), (2, 1, 6), (3, 2, 9)], ['a', 
> 'b', 'c'])
> self.t_env.register_table("MyTable", t)
> t = self.t_env.sql_query(
> "SELECT a, x, y FROM MyTable LEFT JOIN LATERAL 
> TABLE(multi_emit(a, b)) as T(x, y)"
> " ON TRUE")
> actual = self._get_output(t)
> >   self.assert_equals(actual, ["1,1,0", "2,2,0", "3,3,0", "3,3,1"])
> pyflink/table/tests/test_udtf.py:61: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> cls =  'pyflink.table.tests.test_udtf.PyFlinkStreamUserDefinedTableFunctionTests'>
> actual = JavaObject id=o37759, expected = ['1,1,0', '2,2,0', '3,3,0', '3,3,1']
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on pull request #13871: [FLINK-19544][k8s] Implement CheckpointRecoveryFactory based on Kubernetes API

2020-11-07 Thread GitBox


wangyang0918 commented on pull request #13871:
URL: https://github.com/apache/flink/pull/13871#issuecomment-723518654


   @tillrohrmann @xintongsong Thanks all for the elaborative review and help 
with merging.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-18262) PyFlink end-to-end test stalls

2020-11-07 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227886#comment-17227886
 ] 

Dian Fu edited comment on FLINK-18262 at 11/8/20, 1:38 AM:
---

Seems another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9278=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529

{code}
2020-11-07T21:48:40.0076169Z Nov 07 21:48:40 We now have 2 NodeManagers up.
2020-11-07T22:15:02.5561863Z 
==
2020-11-07T22:15:02.5563141Z === WARNING: This E2E Run took already 80% of the 
allocated time budget of 250 minutes ===
2020-11-07T22:15:02.5564157Z 
==
2020-11-07T22:54:02.5518469Z 
==
2020-11-07T22:54:02.5520427Z === WARNING: This E2E Run will time out in the 
next few minutes. Starting to upload the log output ===
2020-11-07T22:54:02.5521385Z 
==
2020-11-07T23:05:09.8660803Z ##[error]The task has timed out.
{code}


was (Author: dian.fu):
Seems another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9278=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529

> PyFlink end-to-end test stalls
> --
>
> Key: FLINK-18262
> URL: https://issues.apache.org/jira/browse/FLINK-18262
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=3299=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5
> {code}
> 2020-06-11T17:25:37.2919869Z We now have 2 NodeManagers up.
> 2020-06-11T18:22:11.3398295Z ##[error]The operation was canceled.
> 2020-06-11T18:22:11.3411819Z ##[section]Finishing: Run e2e tests
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (FLINK-18262) PyFlink end-to-end test stalls

2020-11-07 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reopened FLINK-18262:
-

> PyFlink end-to-end test stalls
> --
>
> Key: FLINK-18262
> URL: https://issues.apache.org/jira/browse/FLINK-18262
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=3299=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5
> {code}
> 2020-06-11T17:25:37.2919869Z We now have 2 NodeManagers up.
> 2020-06-11T18:22:11.3398295Z ##[error]The operation was canceled.
> 2020-06-11T18:22:11.3411819Z ##[section]Finishing: Run e2e tests
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


lirui-apache commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519243418



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
##
@@ -46,29 +58,98 @@
 
private static final long serialVersionUID = 1L;
 
+   private final JobConfWrapper jobConfWrapper;
+   private final List partitionKeys;
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+   private final ObjectPath tablePath;
+
HiveSource(
JobConf jobConf,
+   ObjectPath tablePath,
CatalogTable catalogTable,
List partitions,
@Nullable Long limit,
String hiveVersion,
boolean useMapRedReader,
-   boolean isStreamingSource,
+   @Nullable ContinuousEnumerationSettings 
continuousEnumerationSettings,
+   ContinuousPartitionFetcher fetcher,
+   
HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext,
RowType producedRowType) {
super(
new org.apache.flink.core.fs.Path[1],
new 
HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)),
-   DEFAULT_SPLIT_ASSIGNER,
+   continuousEnumerationSettings == null ? 
DEFAULT_SPLIT_ASSIGNER : SimpleSplitAssigner::new,

Review comment:
   In streaming read, we would want the splits to be consumed in order. The 
`DEFAULT_SPLIT_ASSIGNER` is locality-aware and therefore doesn't meet the 
requirement. But perhaps I should only do this for partitioned table, because 
non-partitioned table still reuse super class's enumerator and the splits are 
not generated in order in the first place.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-18262) PyFlink end-to-end test stalls

2020-11-07 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227886#comment-17227886
 ] 

Dian Fu commented on FLINK-18262:
-

Seems another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9278=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529

> PyFlink end-to-end test stalls
> --
>
> Key: FLINK-18262
> URL: https://issues.apache.org/jira/browse/FLINK-18262
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=3299=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5
> {code}
> 2020-06-11T17:25:37.2919869Z We now have 2 NodeManagers up.
> 2020-06-11T18:22:11.3398295Z ##[error]The operation was canceled.
> 2020-06-11T18:22:11.3411819Z ##[section]Finishing: Run e2e tests
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] lirui-apache commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


lirui-apache commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519243125



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.event.RequestSplitEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A continuously monitoring {@link SplitEnumerator} for hive source.
+ */
+public class ContinuousHiveSplitEnumerator> implements 
SplitEnumerator> {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class);
+
+   private final SplitEnumeratorContext enumeratorContext;
+   private final LinkedHashMap readersAwaitingSplit;
+   private final FileSplitAssigner splitAssigner;
+   private final long discoveryInterval;
+
+   private final JobConf jobConf;
+   private final ObjectPath tablePath;
+
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+
+   private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+   // the maximum partition read offset seen so far
+   private volatile T currentReadOffset;
+   // the partitions that have been processed for a given read offset
+   private final Set> seenPartitions;
+
+   public ContinuousHiveSplitEnumerator(
+   SplitEnumeratorContext 
enumeratorContext,
+   T currentReadOffset,
+   Collection> seenPartitions,
+   FileSplitAssigner splitAssigner,
+   long discoveryInterval,
+   JobConf jobConf,
+   ObjectPath tablePath,
+   ContinuousPartitionFetcher fetcher,
+   
HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext) {
+   this.enumeratorContext = enumeratorContext;
+   this.currentReadOffset = currentReadOffset;
+   this.seenPartitions = new HashSet<>(seenPartitions);
+   this.splitAssigner = splitAssigner;
+   this.discoveryInterval = discoveryInterval;
+   this.jobConf = jobConf;
+   this.tablePath = tablePath;
+   this.fetcher = fetcher;
+   this.fetcherContext = fetcherContext;
+   readersAwaitingSplit = new LinkedHashMap<>();
+   }
+
+   @Override
+   public void start() {
+   try {
+   fetcherContext.open();
+   enumeratorContext.callAsync(
+   this::monitorAndGetSplits,
+   this::handleNewSplits,
+   discoveryInterval,
+

[GitHub] [flink] dianfu commented on pull request #13930: [FLINK-19974][python][e2e] Fix the bug that kafka related services are not teardown properly after tests finished

2020-11-07 Thread GitBox


dianfu commented on pull request #13930:
URL: https://github.com/apache/flink/pull/13930#issuecomment-723517930


   Actually the tests have passed, not sure why the status is still "PENDING".



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-706309837


   
   ## CI report:
   
   * 95a2052bdb6586122683e9531f157230ec75cd37 UNKNOWN
   * a9439ddaafd89e49968c1f17f0f02f21293b8343 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9270)
 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9285)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-20045) ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval failed with "TimeoutException: Contender was not elected as the leader within 200000ms"

2020-11-07 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-20045:

Labels: test-stability  (was: )

> ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval failed with 
> "TimeoutException: Contender was not elected as the leader within 20ms"
> 
>
> Key: FLINK-20045
> URL: https://issues.apache.org/jira/browse/FLINK-20045
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9251=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0
> {code}
> 2020-11-07T10:34:07.5063203Z [ERROR] 
> testZooKeeperLeaderElectionRetrieval(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest)
>   Time elapsed: 202.445 s  <<< ERROR!
> 2020-11-07T10:34:07.5064331Z java.util.concurrent.TimeoutException: Contender 
> was not elected as the leader within 20ms
> 2020-11-07T10:34:07.5064946Z  at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:153)
> 2020-11-07T10:34:07.5065762Z  at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:139)
> 2020-11-07T10:34:07.5066565Z  at 
> org.apache.flink.runtime.leaderelection.TestingLeaderBase.waitForLeader(TestingLeaderBase.java:48)
> 2020-11-07T10:34:07.5067185Z  at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval(ZooKeeperLeaderElectionTest.java:144)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20045) ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval failed with "TimeoutException: Contender was not elected as the leader within 200000ms"

2020-11-07 Thread Dian Fu (Jira)
Dian Fu created FLINK-20045:
---

 Summary: 
ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval failed with 
"TimeoutException: Contender was not elected as the leader within 20ms"
 Key: FLINK-20045
 URL: https://issues.apache.org/jira/browse/FLINK-20045
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Dian Fu


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9251=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0

{code}
2020-11-07T10:34:07.5063203Z [ERROR] 
testZooKeeperLeaderElectionRetrieval(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest)
  Time elapsed: 202.445 s  <<< ERROR!
2020-11-07T10:34:07.5064331Z java.util.concurrent.TimeoutException: Contender 
was not elected as the leader within 20ms
2020-11-07T10:34:07.5064946Zat 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:153)
2020-11-07T10:34:07.5065762Zat 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:139)
2020-11-07T10:34:07.5066565Zat 
org.apache.flink.runtime.leaderelection.TestingLeaderBase.waitForLeader(TestingLeaderBase.java:48)
2020-11-07T10:34:07.5067185Zat 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval(ZooKeeperLeaderElectionTest.java:144)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19699) PrometheusReporterEndToEndITCase crashes with exit code 143

2020-11-07 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227884#comment-17227884
 ] 

Dian Fu commented on FLINK-19699:
-

Another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9257=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529

> PrometheusReporterEndToEndITCase crashes with exit code 143
> ---
>
> Key: FLINK-19699
> URL: https://issues.apache.org/jira/browse/FLINK-19699
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7814=logs=68a897ab-3047-5660-245a-cce8f83859f6=16ca2cca-2f63-5cce-12d2-d519b930a729]
> {code}
> 2020-10-18T23:46:04.9667443Z [ERROR] The forked VM terminated without 
> properly saying goodbye. VM crash or System.exit called?
> 2020-10-18T23:46:04.9669237Z [ERROR] Command was /bin/sh -c cd 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target
>  && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m 
> -Dmvn.forkNumber=2 -XX:+UseG1GC -jar 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire/surefirebooter6797466627443523305.jar
>  
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire
>  2020-10-18T23-44-09_467-jvmRun2 surefire930806459376622178tmp 
> surefire_41970585275084524978tmp
> 2020-10-18T23:46:04.9670440Z [ERROR] Error occurred in starting fork, check 
> output in log
> 2020-10-18T23:46:04.9671283Z [ERROR] Process Exit Code: 143
> 2020-10-18T23:46:04.9671614Z [ERROR] Crashed tests:
> 2020-10-18T23:46:04.9672025Z [ERROR] 
> org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase
> 2020-10-18T23:46:04.9672649Z [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: The forked VM 
> terminated without properly saying goodbye. VM crash or System.exit called?
> 2020-10-18T23:46:04.9674834Z [ERROR] Command was /bin/sh -c cd 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target
>  && /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre/bin/java -Xms256m -Xmx2048m 
> -Dmvn.forkNumber=2 -XX:+UseG1GC -jar 
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire/surefirebooter6797466627443523305.jar
>  
> /home/vsts/work/1/s/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/surefire
>  2020-10-18T23-44-09_467-jvmRun2 surefire930806459376622178tmp 
> surefire_41970585275084524978tmp
> 2020-10-18T23:46:04.9676153Z [ERROR] Error occurred in starting fork, check 
> output in log
> 2020-10-18T23:46:04.9676556Z [ERROR] Process Exit Code: 143
> 2020-10-18T23:46:04.9676882Z [ERROR] Crashed tests:
> 2020-10-18T23:46:04.9677288Z [ERROR] 
> org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase
> 2020-10-18T23:46:04.9677827Z [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.fork(ForkStarter.java:669)
> 2020-10-18T23:46:04.9678408Z [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:282)
> 2020-10-18T23:46:04.9678965Z [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:245)
> 2020-10-18T23:46:04.9679575Z [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183)
> 2020-10-18T23:46:04.9680983Z [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011)
> 2020-10-18T23:46:04.9681749Z [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857)
> 2020-10-18T23:46:04.9682246Z [ERROR] at 
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
> 2020-10-18T23:46:04.9682728Z [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> 2020-10-18T23:46:04.9683179Z [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> 2020-10-18T23:46:04.9683609Z [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> 2020-10-18T23:46:04.9684102Z [ERROR] at 
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
> 2020-10-18T23:46:04.9684639Z [ERROR] at 
> 

[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


JingsongLi commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519241982



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpointSerializer.java
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpointSerializer;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * SerDe for {@link ContinuousHivePendingSplitsCheckpoint}.
+ */
+public class ContinuousHivePendingSplitsCheckpointSerializer implements 
SimpleVersionedSerializer> {
+
+   private static final int VERSION = 1;
+
+   private final PendingSplitsCheckpointSerializer 
superSerDe;
+
+   public 
ContinuousHivePendingSplitsCheckpointSerializer(SimpleVersionedSerializer
 splitSerDe) {
+   superSerDe = new 
PendingSplitsCheckpointSerializer<>(splitSerDe);
+   }
+
+   @Override
+   public int getVersion() {
+   return VERSION;
+   }
+
+   @Override
+   public byte[] serialize(PendingSplitsCheckpoint 
checkpoint) throws IOException {
+   Preconditions.checkArgument(checkpoint.getClass() == 
ContinuousHivePendingSplitsCheckpoint.class,
+   "Only supports %s", 
ContinuousHivePendingSplitsCheckpoint.class.getName());
+
+   ContinuousHivePendingSplitsCheckpoint hiveCheckpoint = 
(ContinuousHivePendingSplitsCheckpoint) checkpoint;
+   PendingSplitsCheckpoint superCP = 
PendingSplitsCheckpoint.fromCollectionSnapshot(
+   hiveCheckpoint.getSplits(), 
hiveCheckpoint.getAlreadyProcessedPaths());
+   byte[] superBytes = superSerDe.serialize(superCP);
+   ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+   try (ObjectOutputStream outputStream = new 
ObjectOutputStream(byteArrayOutputStream)) {
+   outputStream.writeInt(superBytes.length);
+   outputStream.write(superBytes);
+   
outputStream.writeObject(hiveCheckpoint.getCurrentReadOffset());
+   
outputStream.writeInt(hiveCheckpoint.getSeenPartitions().size());

Review comment:
   It is better to use `ListSer`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


JingsongLi commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519241963



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpointSerializer.java
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpointSerializer;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * SerDe for {@link ContinuousHivePendingSplitsCheckpoint}.
+ */
+public class ContinuousHivePendingSplitsCheckpointSerializer implements 
SimpleVersionedSerializer> {
+
+   private static final int VERSION = 1;
+
+   private final PendingSplitsCheckpointSerializer 
superSerDe;
+
+   public 
ContinuousHivePendingSplitsCheckpointSerializer(SimpleVersionedSerializer
 splitSerDe) {
+   superSerDe = new 
PendingSplitsCheckpointSerializer<>(splitSerDe);
+   }
+
+   @Override
+   public int getVersion() {
+   return VERSION;
+   }
+
+   @Override
+   public byte[] serialize(PendingSplitsCheckpoint 
checkpoint) throws IOException {
+   Preconditions.checkArgument(checkpoint.getClass() == 
ContinuousHivePendingSplitsCheckpoint.class,
+   "Only supports %s", 
ContinuousHivePendingSplitsCheckpoint.class.getName());
+
+   ContinuousHivePendingSplitsCheckpoint hiveCheckpoint = 
(ContinuousHivePendingSplitsCheckpoint) checkpoint;
+   PendingSplitsCheckpoint superCP = 
PendingSplitsCheckpoint.fromCollectionSnapshot(
+   hiveCheckpoint.getSplits(), 
hiveCheckpoint.getAlreadyProcessedPaths());
+   byte[] superBytes = superSerDe.serialize(superCP);
+   ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+   try (ObjectOutputStream outputStream = new 
ObjectOutputStream(byteArrayOutputStream)) {
+   outputStream.writeInt(superBytes.length);
+   outputStream.write(superBytes);
+   
outputStream.writeObject(hiveCheckpoint.getCurrentReadOffset());

Review comment:
   It is better to pass serializer here





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-20011) PageRankITCase.testPrintWithRMatGraph hangs

2020-11-07 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227883#comment-17227883
 ] 

Dian Fu commented on FLINK-20011:
-

Another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9257=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5

> PageRankITCase.testPrintWithRMatGraph hangs
> ---
>
> Key: FLINK-20011
> URL: https://issues.apache.org/jira/browse/FLINK-20011
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Graph Processing (Gelly), Runtime / 
> Coordination, Runtime / Network
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Major
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9121=logs=1fc6e7bf-633c-5081-c32a-9dea24b05730=80a658d1-f7f6-5d93-2758-53ac19fd5b19]
> {code}
> 2020-11-05T22:42:34.4186647Z "main" #1 prio=5 os_prio=0 
> tid=0x7fa98c00b800 nid=0x32f8 waiting on condition [0x7fa995c12000] 
> 2020-11-05T22:42:34.4187168Z java.lang.Thread.State: WAITING (parking) 
> 2020-11-05T22:42:34.4187563Z at sun.misc.Unsafe.park(Native Method) 
> 2020-11-05T22:42:34.4188246Z - parking to wait for <0x8736d120> (a 
> java.util.concurrent.CompletableFuture$Signaller) 
> 2020-11-05T22:42:34.411Z at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
> 2020-11-05T22:42:34.4189351Z at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>  2020-11-05T22:42:34.4189930Z at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) 
> 2020-11-05T22:42:34.4190509Z at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>  2020-11-05T22:42:34.4191059Z at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) 
> 2020-11-05T22:42:34.4191591Z at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:893)
>  2020-11-05T22:42:34.4192208Z at 
> org.apache.flink.graph.asm.dataset.DataSetAnalyticBase.execute(DataSetAnalyticBase.java:55)
>  2020-11-05T22:42:34.4192787Z at 
> org.apache.flink.graph.drivers.output.Print.write(Print.java:48) 
> 2020-11-05T22:42:34.4193373Z at 
> org.apache.flink.graph.Runner.execute(Runner.java:454) 
> 2020-11-05T22:42:34.4194156Z at 
> org.apache.flink.graph.Runner.main(Runner.java:507) 
> 2020-11-05T22:42:34.4194618Z at 
> org.apache.flink.graph.drivers.DriverBaseITCase.getSystemOutput(DriverBaseITCase.java:208)
>  2020-11-05T22:42:34.4195192Z at 
> org.apache.flink.graph.drivers.DriverBaseITCase.expectedCount(DriverBaseITCase.java:100)
>  2020-11-05T22:42:34.4195914Z at 
> org.apache.flink.graph.drivers.PageRankITCase.testPrintWithRMatGraph(PageRankITCase.java:60)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


JingsongLi commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519241825



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.event.RequestSplitEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A continuously monitoring {@link SplitEnumerator} for hive source.
+ */
+public class ContinuousHiveSplitEnumerator> implements 
SplitEnumerator> {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class);
+
+   private final SplitEnumeratorContext enumeratorContext;
+   private final LinkedHashMap readersAwaitingSplit;
+   private final FileSplitAssigner splitAssigner;
+   private final long discoveryInterval;
+
+   private final JobConf jobConf;
+   private final ObjectPath tablePath;
+
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+
+   private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+   // the maximum partition read offset seen so far
+   private volatile T currentReadOffset;
+   // the partitions that have been processed for a given read offset
+   private final Set> seenPartitions;
+
+   public ContinuousHiveSplitEnumerator(
+   SplitEnumeratorContext 
enumeratorContext,
+   T currentReadOffset,
+   Collection> seenPartitions,
+   FileSplitAssigner splitAssigner,
+   long discoveryInterval,
+   JobConf jobConf,
+   ObjectPath tablePath,
+   ContinuousPartitionFetcher fetcher,
+   
HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext) {
+   this.enumeratorContext = enumeratorContext;
+   this.currentReadOffset = currentReadOffset;
+   this.seenPartitions = new HashSet<>(seenPartitions);
+   this.splitAssigner = splitAssigner;
+   this.discoveryInterval = discoveryInterval;
+   this.jobConf = jobConf;
+   this.tablePath = tablePath;
+   this.fetcher = fetcher;
+   this.fetcherContext = fetcherContext;
+   readersAwaitingSplit = new LinkedHashMap<>();
+   }
+
+   @Override
+   public void start() {
+   try {
+   fetcherContext.open();
+   enumeratorContext.callAsync(
+   this::monitorAndGetSplits,
+   this::handleNewSplits,
+   discoveryInterval,
+  

[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


JingsongLi commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519241603



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.event.RequestSplitEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A continuously monitoring {@link SplitEnumerator} for hive source.
+ */
+public class ContinuousHiveSplitEnumerator> implements 
SplitEnumerator> {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class);
+
+   private final SplitEnumeratorContext enumeratorContext;
+   private final LinkedHashMap readersAwaitingSplit;
+   private final FileSplitAssigner splitAssigner;
+   private final long discoveryInterval;
+
+   private final JobConf jobConf;
+   private final ObjectPath tablePath;
+
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+
+   private final ReadWriteLock stateLock = new ReentrantReadWriteLock();

Review comment:
   Do we really need this lock? I think thread safety should be ensured by 
framework





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-11-07 Thread GitBox


becketqin commented on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-723516469


   @flinkbot run azure



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


JingsongLi commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519241466



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
##
@@ -151,23 +139,63 @@ public boolean isBounded() {
catalogTable,
hiveShim,
remainingPartitions);
+   Configuration configuration = 
Configuration.fromMap(catalogTable.getOptions());
 
-   @SuppressWarnings("unchecked")
-   TypeInformation typeInfo =
-   (TypeInformation) 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
+   Duration monitorInterval = null;
+   ContinuousPartitionFetcher fetcher = null;
+   HiveContinuousPartitionFetcherContext fetcherContext = null;
+   if (isStreamingSource()) {
+   monitorInterval = 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+   ? DEFAULT_SCAN_MONITOR_INTERVAL
+   : 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+   fetcher = new HiveContinuousPartitionFetcher();
+
+   final String defaultPartitionName = 
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+   
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+   fetcherContext = new 
HiveContinuousPartitionFetcherContext(
+   tablePath,
+   hiveShim,
+   new JobConfWrapper(jobConf),
+   catalogTable.getPartitionKeys(),
+   
getProducedTableSchema().getFieldDataTypes(),
+   
getProducedTableSchema().getFieldNames(),
+   configuration,
+   defaultPartitionName);
+   }
 
-   HiveTableInputFormat inputFormat = getInputFormat(
+   HiveSource hiveSource = new HiveSource(
+   jobConf,
+   tablePath,
+   catalogTable,
allHivePartitions,
-   
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
+   limit,
+   hiveVersion,
+   
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER),
+   isStreamingSource() ? new 
ContinuousEnumerationSettings(monitorInterval) : null,
+   fetcher,
+   fetcherContext,
+   (RowType) getProducedDataType().getLogicalType()
+   );
+   DataStreamSource source = execEnv.fromSource(
+   hiveSource, WatermarkStrategy.noWatermarks(), 
"HiveSource-" + tablePath.getFullName());
 
if (isStreamingSource()) {

Review comment:
   Why not move these check to upper?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


JingsongLi commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519241369



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
##
@@ -46,29 +58,98 @@
 
private static final long serialVersionUID = 1L;
 
+   private final JobConfWrapper jobConfWrapper;
+   private final List partitionKeys;
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+   private final ObjectPath tablePath;
+
HiveSource(
JobConf jobConf,
+   ObjectPath tablePath,
CatalogTable catalogTable,
List partitions,
@Nullable Long limit,
String hiveVersion,
boolean useMapRedReader,
-   boolean isStreamingSource,
+   @Nullable ContinuousEnumerationSettings 
continuousEnumerationSettings,

Review comment:
   It is better to create a builder for `HiveSource` instead of passing all 
arguments.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


JingsongLi commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519241002



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
##
@@ -46,29 +58,98 @@
 
private static final long serialVersionUID = 1L;
 
+   private final JobConfWrapper jobConfWrapper;
+   private final List partitionKeys;
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+   private final ObjectPath tablePath;
+
HiveSource(
JobConf jobConf,
+   ObjectPath tablePath,
CatalogTable catalogTable,
List partitions,
@Nullable Long limit,
String hiveVersion,
boolean useMapRedReader,
-   boolean isStreamingSource,
+   @Nullable ContinuousEnumerationSettings 
continuousEnumerationSettings,
+   ContinuousPartitionFetcher fetcher,
+   
HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext,
RowType producedRowType) {
super(
new org.apache.flink.core.fs.Path[1],
new 
HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)),
-   DEFAULT_SPLIT_ASSIGNER,
+   continuousEnumerationSettings == null ? 
DEFAULT_SPLIT_ASSIGNER : SimpleSplitAssigner::new,
createBulkFormat(new JobConf(jobConf), 
catalogTable, hiveVersion, producedRowType, useMapRedReader, limit),
-   null);
-   Preconditions.checkArgument(!isStreamingSource, "HiveSource 
currently only supports bounded mode");
+   continuousEnumerationSettings);
+   this.jobConfWrapper = new JobConfWrapper(jobConf);
+   this.tablePath = tablePath;
+   this.partitionKeys = catalogTable.getPartitionKeys();
+   this.fetcher = fetcher;
+   this.fetcherContext = fetcherContext;
}
 
@Override
public SimpleVersionedSerializer getSplitSerializer() {
return HiveSourceSplitSerializer.INSTANCE;
}
 
+   @Override
+   public 
SimpleVersionedSerializer> 
getEnumeratorCheckpointSerializer() {
+   if (needContinuousSplitEnumerator()) {
+   return new 
ContinuousHivePendingSplitsCheckpointSerializer(getSplitSerializer());
+   } else {
+   return super.getEnumeratorCheckpointSerializer();
+   }
+   }
+
+   @Override
+   public SplitEnumerator> createEnumerator(
+   SplitEnumeratorContext enumContext) {
+   if (needContinuousSplitEnumerator()) {
+   return createContinuousSplitEnumerator(
+   enumContext, 
fetcherContext.getConsumeStartOffset(), Collections.emptyList(), 
Collections.emptyList());
+   } else {
+   return super.createEnumerator(enumContext);
+   }
+   }
+
+   @Override
+   public SplitEnumerator> restoreEnumerator(
+   SplitEnumeratorContext enumContext, 
PendingSplitsCheckpoint checkpoint) {
+   if (needContinuousSplitEnumerator()) {
+   Preconditions.checkState(checkpoint instanceof 
ContinuousHivePendingSplitsCheckpoint,
+   "Illegal type of splits checkpoint %s 
for streaming read partitioned table", checkpoint.getClass().getName());
+   ContinuousHivePendingSplitsCheckpoint hiveCheckpoint = 
(ContinuousHivePendingSplitsCheckpoint) checkpoint;
+   return createContinuousSplitEnumerator(
+   enumContext, 
hiveCheckpoint.getCurrentReadOffset(), hiveCheckpoint.getSeenPartitions(), 
hiveCheckpoint.getSplits());
+   } else {
+   return super.restoreEnumerator(enumContext, checkpoint);
+   }
+   }
+
+   private boolean needContinuousSplitEnumerator() {
+   return getBoundedness() == Boundedness.CONTINUOUS_UNBOUNDED && 
!partitionKeys.isEmpty();
+   }
+
+   private SplitEnumerator> createContinuousSplitEnumerator(
+   SplitEnumeratorContext enumContext,
+   Comparable currentReadOffset,
+   Collection> seenPartitions,
+   Collection splits) {
+   return new ContinuousHiveSplitEnumerator(

Review comment:
   Ditto





[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


JingsongLi commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519240975



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
##
@@ -46,29 +58,98 @@
 
private static final long serialVersionUID = 1L;
 
+   private final JobConfWrapper jobConfWrapper;
+   private final List partitionKeys;
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+   private final ObjectPath tablePath;
+
HiveSource(
JobConf jobConf,
+   ObjectPath tablePath,
CatalogTable catalogTable,
List partitions,
@Nullable Long limit,
String hiveVersion,
boolean useMapRedReader,
-   boolean isStreamingSource,
+   @Nullable ContinuousEnumerationSettings 
continuousEnumerationSettings,
+   ContinuousPartitionFetcher fetcher,
+   
HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext,
RowType producedRowType) {
super(
new org.apache.flink.core.fs.Path[1],
new 
HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)),
-   DEFAULT_SPLIT_ASSIGNER,
+   continuousEnumerationSettings == null ? 
DEFAULT_SPLIT_ASSIGNER : SimpleSplitAssigner::new,
createBulkFormat(new JobConf(jobConf), 
catalogTable, hiveVersion, producedRowType, useMapRedReader, limit),
-   null);
-   Preconditions.checkArgument(!isStreamingSource, "HiveSource 
currently only supports bounded mode");
+   continuousEnumerationSettings);
+   this.jobConfWrapper = new JobConfWrapper(jobConf);
+   this.tablePath = tablePath;
+   this.partitionKeys = catalogTable.getPartitionKeys();
+   this.fetcher = fetcher;
+   this.fetcherContext = fetcherContext;
}
 
@Override
public SimpleVersionedSerializer getSplitSerializer() {
return HiveSourceSplitSerializer.INSTANCE;
}
 
+   @Override
+   public 
SimpleVersionedSerializer> 
getEnumeratorCheckpointSerializer() {
+   if (needContinuousSplitEnumerator()) {
+   return new 
ContinuousHivePendingSplitsCheckpointSerializer(getSplitSerializer());
+   } else {
+   return super.getEnumeratorCheckpointSerializer();
+   }
+   }
+
+   @Override
+   public SplitEnumerator> createEnumerator(
+   SplitEnumeratorContext enumContext) {
+   if (needContinuousSplitEnumerator()) {
+   return createContinuousSplitEnumerator(
+   enumContext, 
fetcherContext.getConsumeStartOffset(), Collections.emptyList(), 
Collections.emptyList());
+   } else {
+   return super.createEnumerator(enumContext);
+   }
+   }
+
+   @Override
+   public SplitEnumerator> restoreEnumerator(
+   SplitEnumeratorContext enumContext, 
PendingSplitsCheckpoint checkpoint) {
+   if (needContinuousSplitEnumerator()) {
+   Preconditions.checkState(checkpoint instanceof 
ContinuousHivePendingSplitsCheckpoint,
+   "Illegal type of splits checkpoint %s 
for streaming read partitioned table", checkpoint.getClass().getName());
+   ContinuousHivePendingSplitsCheckpoint hiveCheckpoint = 
(ContinuousHivePendingSplitsCheckpoint) checkpoint;
+   return createContinuousSplitEnumerator(
+   enumContext, 
hiveCheckpoint.getCurrentReadOffset(), hiveCheckpoint.getSeenPartitions(), 
hiveCheckpoint.getSplits());
+   } else {
+   return super.restoreEnumerator(enumContext, checkpoint);
+   }
+   }
+
+   private boolean needContinuousSplitEnumerator() {

Review comment:
   `continuousPartitionedEnumerator`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


JingsongLi commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519240879



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
##
@@ -46,29 +58,98 @@
 
private static final long serialVersionUID = 1L;
 
+   private final JobConfWrapper jobConfWrapper;
+   private final List partitionKeys;
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+   private final ObjectPath tablePath;
+
HiveSource(
JobConf jobConf,
+   ObjectPath tablePath,
CatalogTable catalogTable,
List partitions,
@Nullable Long limit,
String hiveVersion,
boolean useMapRedReader,
-   boolean isStreamingSource,
+   @Nullable ContinuousEnumerationSettings 
continuousEnumerationSettings,
+   ContinuousPartitionFetcher fetcher,
+   
HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext,
RowType producedRowType) {
super(
new org.apache.flink.core.fs.Path[1],
new 
HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)),
-   DEFAULT_SPLIT_ASSIGNER,
+   continuousEnumerationSettings == null ? 
DEFAULT_SPLIT_ASSIGNER : SimpleSplitAssigner::new,

Review comment:
   Why not `DEFAULT_SPLIT_ASSIGNER `?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


JingsongLi commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519240794



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.event.RequestSplitEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A continuously monitoring {@link SplitEnumerator} for hive source.
+ */
+public class ContinuousHiveSplitEnumerator> implements 
SplitEnumerator> {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class);
+
+   private final SplitEnumeratorContext enumeratorContext;
+   private final LinkedHashMap readersAwaitingSplit;
+   private final FileSplitAssigner splitAssigner;
+   private final long discoveryInterval;
+
+   private final JobConf jobConf;
+   private final ObjectPath tablePath;
+
+   private final ContinuousPartitionFetcher fetcher;
+   private final HiveTableSource.HiveContinuousPartitionFetcherContext 
fetcherContext;
+
+   private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+   // the maximum partition read offset seen so far
+   private volatile T currentReadOffset;
+   // the partitions that have been processed for a given read offset
+   private final Set> seenPartitions;
+
+   public ContinuousHiveSplitEnumerator(
+   SplitEnumeratorContext 
enumeratorContext,
+   T currentReadOffset,
+   Collection> seenPartitions,
+   FileSplitAssigner splitAssigner,
+   long discoveryInterval,
+   JobConf jobConf,
+   ObjectPath tablePath,
+   ContinuousPartitionFetcher fetcher,
+   
HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext) {
+   this.enumeratorContext = enumeratorContext;
+   this.currentReadOffset = currentReadOffset;
+   this.seenPartitions = new HashSet<>(seenPartitions);
+   this.splitAssigner = splitAssigner;
+   this.discoveryInterval = discoveryInterval;
+   this.jobConf = jobConf;
+   this.tablePath = tablePath;
+   this.fetcher = fetcher;
+   this.fetcherContext = fetcherContext;
+   readersAwaitingSplit = new LinkedHashMap<>();
+   }
+
+   @Override
+   public void start() {
+   try {
+   fetcherContext.open();
+   enumeratorContext.callAsync(
+   this::monitorAndGetSplits,
+   this::handleNewSplits,
+   discoveryInterval,
+  

[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


JingsongLi commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519240289



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpoint.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The checkpoint of current state of continuous hive source reading.
+ */
+public class ContinuousHivePendingSplitsCheckpoint extends 
PendingSplitsCheckpoint {
+
+   private final Comparable currentReadOffset;
+   private final Collection> seenPartitions;
+
+   public 
ContinuousHivePendingSplitsCheckpoint(Collection splits,

Review comment:
   Code style:
   ```
   public ContinuousHivePendingSplitsCheckpoint(
Collection splits,
Comparable currentReadOffset,
Collection> seenPartitions) {
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


JingsongLi commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519240181



##
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
##
@@ -75,12 +75,12 @@
 
private final FileEnumerator.Provider enumeratorFactory;
 
-   private final FileSplitAssigner.Provider assignerFactory;
+   protected final FileSplitAssigner.Provider assignerFactory;

Review comment:
   +1 Some communities directly open mandatory checkstyle checks, providing 
only setters and getters instead of protected fields.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13845:
URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990


   
   ## CI report:
   
   * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN
   * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN
   * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN
   * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN
   * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN
   * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN
   * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN
   * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN
   * 215a25d83260a048877bdf8462a06473676efb8a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9223)
 
   * fe94615bb4be9771757f98e93ff11e8d765231f2 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9282)
 
   * 23323a7b983fac19fdd620b0cc82eace73cc587f UNKNOWN
   * d4c46ab021f7d464004ca5f58589b25a5334f147 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9284)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13845:
URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990


   
   ## CI report:
   
   * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN
   * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN
   * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN
   * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN
   * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN
   * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN
   * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN
   * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN
   * 215a25d83260a048877bdf8462a06473676efb8a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9223)
 
   * fe94615bb4be9771757f98e93ff11e8d765231f2 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9282)
 
   * 23323a7b983fac19fdd620b0cc82eace73cc587f UNKNOWN
   * d4c46ab021f7d464004ca5f58589b25a5334f147 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on a change in pull request #13963: [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for streaming

2020-11-07 Thread GitBox


StephanEwen commented on a change in pull request #13963:
URL: https://github.com/apache/flink/pull/13963#discussion_r519231464



##
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
##
@@ -75,12 +75,12 @@
 
private final FileEnumerator.Provider enumeratorFactory;
 
-   private final FileSplitAssigner.Provider assignerFactory;
+   protected final FileSplitAssigner.Provider assignerFactory;

Review comment:
   I have a slight preference to keep these fields `private` and add a 
public getter (`public FileSplitAssigner.Provider getAssigner()`.

##
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
##
@@ -75,12 +75,12 @@
 
private final FileEnumerator.Provider enumeratorFactory;
 
-   private final FileSplitAssigner.Provider assignerFactory;
+   protected final FileSplitAssigner.Provider assignerFactory;
 
private final BulkFormat readerFormat;
 
@Nullable
-   private final ContinuousEnumerationSettings 
continuousEnumerationSettings;
+   protected final ContinuousEnumerationSettings 
continuousEnumerationSettings;

Review comment:
   Similar as above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-11-07 Thread GitBox


StephanEwen commented on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-723506656


   Thanks a lot, @becketqin for taking these comments into account.
   The current solution looks good to me. +1 to merge this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13845:
URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990


   
   ## CI report:
   
   * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN
   * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN
   * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN
   * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN
   * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN
   * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN
   * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN
   * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN
   * 215a25d83260a048877bdf8462a06473676efb8a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9223)
 
   * fe94615bb4be9771757f98e93ff11e8d765231f2 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9282)
 
   * 23323a7b983fac19fdd620b0cc82eace73cc587f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


AHeise commented on a change in pull request #13845:
URL: https://github.com/apache/flink/pull/13845#discussion_r519229516



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Demultiplexer.java
##
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
+import org.apache.flink.runtime.checkpoint.RescaledChannelsMapping;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.VirtualChannelSelector;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link RecordDeserializer}-like interface for recovery. To avoid additional 
virtual method calls on the
+ * non-recovery hotpath, this interface is not extending RecordDeserializer.
+ */
+interface Demultiplexer extends AutoCloseable {
+   RecordDeserializer.DeserializationResult 
getNextRecord(DeserializationDelegate deserializationDelegate) 
throws IOException;
+
+   void setNextBuffer(Buffer buffer) throws IOException;
+
+   void select(VirtualChannelSelector event);
+
+   @Override
+   void close();
+}
+
+class NoDataDemultiplexer implements Demultiplexer {
+   private final InputChannelInfo channelInfo;
+
+   public NoDataDemultiplexer(InputChannelInfo channelInfo) {
+   this.channelInfo = channelInfo;
+   }
+
+   @Override
+   public RecordDeserializer.DeserializationResult 
getNextRecord(DeserializationDelegate deserializationDelegate) {
+   throw getException();
+   }
+
+   @Override
+   public void setNextBuffer(Buffer buffer) {
+   throw getException();
+   }
+
+   @Override
+   public void select(VirtualChannelSelector event) {
+   throw getException();
+   }
+
+   private IllegalStateException getException() {
+   return new IllegalStateException(channelInfo + " should not 
receive any data/events during recovery");
+   }
+
+   @Override
+   public void close() {
+   }
+}
+
+/**
+ * Parameter structure to pass all relevant information to the factory methods 
of @{@link Demultiplexer}.
+ */
+class DemultiplexParameters {
+   final IOManager ioManager;
+   final InflightDataRescalingDescriptor channelMapping;
+   final Function> gatePartitionerRetriever;
+   final SerializationDelegate delegate;
+   final int numberOfChannels;
+   final int subtaskIndex;
+
+   @SuppressWarnings("unchecked")
+   DemultiplexParameters(
+   TypeSerializer inputSerializer,
+   IOManager ioManager,
+   InflightDataRescalingDescriptor channelMapping,
+   

[GitHub] [flink] StephanEwen commented on pull request #13885: [FLINK-19911] Read checkpoint stream with buffer to speedup restore

2020-11-07 Thread GitBox


StephanEwen commented on pull request #13885:
URL: https://github.com/apache/flink/pull/13885#issuecomment-723504501


   Are the HDFS input streams generally not buffered? Would it make sense to 
adjust the `HadoopDataInputStream` class to be buffered? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


AHeise commented on a change in pull request #13845:
URL: https://github.com/apache/flink/pull/13845#discussion_r519229050



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RescalingStreamTaskNetworkInput.java
##
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.VirtualChannelSelector;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link StreamTaskNetworkInput} implementation that demultiplexes virtual 
channels.
+ *
+ * The demultiplexing works in two dimensions for the following cases.
+ * 
+ *  Subtasks of the current operator have been collapsed in a 
round-robin fashion.
+ *  The connected output operator has been rescaled (up and down!) and 
there is an overlap of channels (mostly
+ * relevant to keyed exchanges).
+ * 
+ * In both cases, records from multiple old channels are received over one new 
physical channel, which need to
+ * demultiplex the record to correctly restore spanning records (similar to 
how StreamTaskNetworkInput works).
+ *
+ * Note that when both cases occur at the same time (downscaling of several 
operators), there is the cross product of
+ * channels. So if two subtasks are collapsed and two channels overlap from 
the output side, there is a total of 4
+ * virtual channels.
+ */
+@Internal
+public final class RescalingStreamTaskNetworkInput implements 
RecoverableStreamTaskInput {

Review comment:
   The man motivation was to avoid a secondary implementations of 
`RecordDeserializer` as that would translate to virtual calls in the regular 
`StreamTaskNetworkInput`. I have not measured the impact but it was a major 
concern of @pnowojski .
   Now, it would be very well possible to subclass `StreamTaskNetworkInput` or 
extract a common super class. However, because `recordDeserializers` is of a 
different type without common ancestor (for CHA), it's hard to generalize.
   I had hoped for some input on how to solve it.

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RescalingStreamTaskNetworkInput.java
##
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ 

[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


AHeise commented on a change in pull request #13845:
URL: https://github.com/apache/flink/pull/13845#discussion_r519229083



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
##
@@ -88,38 +127,66 @@ private RecoveredInputChannel getChannel(InputChannelInfo 
info) {
private final ResultPartitionWriter[] writers;
private final boolean notifyAndBlockOnCompletion;
 
-   ResultSubpartitionRecoveredStateHandler(ResultPartitionWriter[] 
writers, boolean notifyAndBlockOnCompletion) {
+   private final InflightDataRescalingDescriptor channelMapping;
+
+   private final Map> rescaledChannels = new HashMap<>();
+
+   ResultSubpartitionRecoveredStateHandler(ResultPartitionWriter[] 
writers, boolean notifyAndBlockOnCompletion, InflightDataRescalingDescriptor 
channelMapping) {
this.writers = writers;
+   this.channelMapping = channelMapping;
this.notifyAndBlockOnCompletion = notifyAndBlockOnCompletion;
}
 
@Override
public BufferWithContext> 
getBuffer(ResultSubpartitionInfo subpartitionInfo) throws IOException, 
InterruptedException {
-   BufferBuilder bufferBuilder = 
getSubpartition(subpartitionInfo).requestBufferBuilderBlocking();
+   final List channels = 
getMappedChannels(subpartitionInfo);
+   BufferBuilder bufferBuilder = 
channels.get(0).requestBufferBuilderBlocking();
return new BufferWithContext<>(wrap(bufferBuilder), 
Tuple2.of(bufferBuilder, bufferBuilder.createBufferConsumer()));
}
 
@Override
-   public void recover(ResultSubpartitionInfo subpartitionInfo, 
Tuple2 bufferBuilderAndConsumer) throws 
IOException {
+   public void recover(
+   ResultSubpartitionInfo subpartitionInfo,
+   int oldSubtaskIndex,
+   Tuple2 
bufferBuilderAndConsumer) throws IOException {
bufferBuilderAndConsumer.f0.finish();
if (bufferBuilderAndConsumer.f1.isDataAvailable()) {
-   boolean added = 
getSubpartition(subpartitionInfo).add(bufferBuilderAndConsumer.f1, 
Integer.MIN_VALUE);
-   if (!added) {
-   throw new IOException("Buffer consumer couldn't 
be added to ResultSubpartition");
+   final List channels = 
getMappedChannels(subpartitionInfo);
+   for (final CheckpointedResultSubpartition channel : 
channels) {
+   // channel selector is created from the 
downstream's point of view: the subtask of downstream = subpartition index of 
recovered buffer
+   final VirtualChannelSelector channelSelector = 
new VirtualChannelSelector(subpartitionInfo.getSubPartitionIdx(), 
oldSubtaskIndex);
+   
channel.add(EventSerializer.toBufferConsumer(channelSelector, false), 
Integer.MIN_VALUE);
+   boolean added = 
channel.add(bufferBuilderAndConsumer.f1.copy(), Integer.MIN_VALUE);
+   if (!added) {
+   throw new IOException("Buffer consumer 
couldn't be added to ResultSubpartition");
+   }
}
-   } else {
-   bufferBuilderAndConsumer.f1.close();
}
+   bufferBuilderAndConsumer.f1.close();
}
 
-   private CheckpointedResultSubpartition 
getSubpartition(ResultSubpartitionInfo subpartitionInfo) {
-   ResultPartitionWriter writer = 
writers[subpartitionInfo.getPartitionIdx()];
-   if (writer instanceof CheckpointedResultPartition) {
-   return ((CheckpointedResultPartition) 
writer).getCheckpointedSubpartition(subpartitionInfo.getSubPartitionIdx());
-   } else {
-   throw new IllegalStateException(
-   "Cannot restore state to a non-checkpointable 
partition type: " + writer);
+   private CheckpointedResultSubpartition getSubpartition(int 
partitionIndex, int subPartitionIdx) {
+   ResultPartitionWriter writer = writers[partitionIndex];
+   if (!(writer instanceof CheckpointedResultPartition)) {
+   throw new IllegalStateException("Cannot restore state 
to a non-checkpointable partition type: " + writer);
}
+   return ((CheckpointedResultPartition) 
writer).getCheckpointedSubpartition(subPartitionIdx);
+   }
+
+   private List 
getMappedChannels(ResultSubpartitionInfo subpartitionInfo) {
+   return rescaledChannels.computeIfAbsent(subpartitionInfo, 
this::calculateMapping);
+   }
+
+   private static final Logger LOG = 

[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13845:
URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990


   
   ## CI report:
   
   * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN
   * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN
   * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN
   * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN
   * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN
   * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN
   * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN
   * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN
   * 215a25d83260a048877bdf8462a06473676efb8a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9223)
 
   * fe94615bb4be9771757f98e93ff11e8d765231f2 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9282)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


AHeise commented on a change in pull request #13845:
URL: https://github.com/apache/flink/pull/13845#discussion_r519228672



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Demultiplexer.java
##
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
+import org.apache.flink.runtime.checkpoint.RescaledChannelsMapping;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.VirtualChannelSelector;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link RecordDeserializer}-like interface for recovery. To avoid additional 
virtual method calls on the
+ * non-recovery hotpath, this interface is not extending RecordDeserializer.
+ */
+interface Demultiplexer extends AutoCloseable {

Review comment:
   Good idea. Would it be okay to keep the implementation names as is to 
avoid super-long names?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


AHeise commented on a change in pull request #13845:
URL: https://github.com/apache/flink/pull/13845#discussion_r519228359



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Demultiplexer.java
##
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
+import org.apache.flink.runtime.checkpoint.RescaledChannelsMapping;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.VirtualChannelSelector;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link RecordDeserializer}-like interface for recovery. To avoid additional 
virtual method calls on the
+ * non-recovery hotpath, this interface is not extending RecordDeserializer.
+ */
+interface Demultiplexer extends AutoCloseable {
+   RecordDeserializer.DeserializationResult 
getNextRecord(DeserializationDelegate deserializationDelegate) 
throws IOException;
+
+   void setNextBuffer(Buffer buffer) throws IOException;
+
+   void select(VirtualChannelSelector event);
+
+   @Override
+   void close();
+}
+
+class NoDataDemultiplexer implements Demultiplexer {
+   private final InputChannelInfo channelInfo;
+
+   public NoDataDemultiplexer(InputChannelInfo channelInfo) {
+   this.channelInfo = channelInfo;
+   }
+
+   @Override
+   public RecordDeserializer.DeserializationResult 
getNextRecord(DeserializationDelegate deserializationDelegate) {
+   throw getException();
+   }
+
+   @Override
+   public void setNextBuffer(Buffer buffer) {
+   throw getException();
+   }
+
+   @Override
+   public void select(VirtualChannelSelector event) {
+   throw getException();
+   }
+
+   private IllegalStateException getException() {
+   return new IllegalStateException(channelInfo + " should not 
receive any data/events during recovery");
+   }
+
+   @Override
+   public void close() {
+   }
+}
+
+/**
+ * Parameter structure to pass all relevant information to the factory methods 
of @{@link Demultiplexer}.
+ */
+class DemultiplexParameters {
+   final IOManager ioManager;
+   final InflightDataRescalingDescriptor channelMapping;
+   final Function> gatePartitionerRetriever;
+   final SerializationDelegate delegate;
+   final int numberOfChannels;
+   final int subtaskIndex;
+
+   @SuppressWarnings("unchecked")
+   DemultiplexParameters(
+   TypeSerializer inputSerializer,
+   IOManager ioManager,
+   InflightDataRescalingDescriptor channelMapping,
+   

[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


AHeise commented on a change in pull request #13845:
URL: https://github.com/apache/flink/pull/13845#discussion_r519228182



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Demultiplexer.java
##
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
+import org.apache.flink.runtime.checkpoint.RescaledChannelsMapping;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.VirtualChannelSelector;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link RecordDeserializer}-like interface for recovery. To avoid additional 
virtual method calls on the
+ * non-recovery hotpath, this interface is not extending RecordDeserializer.
+ */
+interface Demultiplexer extends AutoCloseable {
+   RecordDeserializer.DeserializationResult 
getNextRecord(DeserializationDelegate deserializationDelegate) 
throws IOException;
+
+   void setNextBuffer(Buffer buffer) throws IOException;
+
+   void select(VirtualChannelSelector event);
+
+   @Override
+   void close();
+}
+
+class NoDataDemultiplexer implements Demultiplexer {
+   private final InputChannelInfo channelInfo;
+
+   public NoDataDemultiplexer(InputChannelInfo channelInfo) {
+   this.channelInfo = channelInfo;
+   }
+
+   @Override
+   public RecordDeserializer.DeserializationResult 
getNextRecord(DeserializationDelegate deserializationDelegate) {
+   throw getException();
+   }
+
+   @Override
+   public void setNextBuffer(Buffer buffer) {
+   throw getException();
+   }
+
+   @Override
+   public void select(VirtualChannelSelector event) {
+   throw getException();
+   }
+
+   private IllegalStateException getException() {
+   return new IllegalStateException(channelInfo + " should not 
receive any data/events during recovery");
+   }
+
+   @Override
+   public void close() {
+   }
+}
+
+/**
+ * Parameter structure to pass all relevant information to the factory methods 
of @{@link Demultiplexer}.
+ */
+class DemultiplexParameters {
+   final IOManager ioManager;
+   final InflightDataRescalingDescriptor channelMapping;
+   final Function> gatePartitionerRetriever;
+   final SerializationDelegate delegate;
+   final int numberOfChannels;
+   final int subtaskIndex;
+
+   @SuppressWarnings("unchecked")
+   DemultiplexParameters(
+   TypeSerializer inputSerializer,
+   IOManager ioManager,
+   InflightDataRescalingDescriptor channelMapping,
+   

[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


AHeise commented on a change in pull request #13845:
URL: https://github.com/apache/flink/pull/13845#discussion_r519227877



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RescalingStreamTaskNetworkInput.java
##
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.VirtualChannelSelector;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link StreamTaskNetworkInput} implementation that demultiplexes virtual 
channels.
+ *
+ * The demultiplexing works in two dimensions for the following cases.
+ * 
+ *  Subtasks of the current operator have been collapsed in a 
round-robin fashion.
+ *  The connected output operator has been rescaled (up and down!) and 
there is an overlap of channels (mostly
+ * relevant to keyed exchanges).
+ * 
+ * In both cases, records from multiple old channels are received over one new 
physical channel, which need to
+ * demultiplex the record to correctly restore spanning records (similar to 
how StreamTaskNetworkInput works).
+ *
+ * Note that when both cases occur at the same time (downscaling of several 
operators), there is the cross product of
+ * channels. So if two subtasks are collapsed and two channels overlap from 
the output side, there is a total of 4
+ * virtual channels.
+ */
+@Internal
+public final class RescalingStreamTaskNetworkInput implements 
RecoverableStreamTaskInput {
+
+   private final CheckpointedInputGate checkpointedInputGate;
+
+   private final DeserializationDelegate 
deserializationDelegate;
+
+   private final Demultiplexer[] channelDemultiplexers;
+
+   /** Valve that controls how watermarks and stream statuses are 
forwarded. */
+   private final StatusWatermarkValve statusWatermarkValve;
+
+   private final int inputIndex;
+
+   private final Map channelIndexes;

Review comment:
   `StatusWatermarkValve` operators on indexes, so we need this 
indirection. We should probably refactor that (it's the same in 
`StreamTaskNetworkInput`).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


AHeise commented on a change in pull request #13845:
URL: https://github.com/apache/flink/pull/13845#discussion_r519227412



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Demultiplexer.java
##
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
+import org.apache.flink.runtime.checkpoint.RescaledChannelsMapping;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.VirtualChannelSelector;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link RecordDeserializer}-like interface for recovery. To avoid additional 
virtual method calls on the
+ * non-recovery hotpath, this interface is not extending RecordDeserializer.
+ */
+interface Demultiplexer extends AutoCloseable {
+   RecordDeserializer.DeserializationResult 
getNextRecord(DeserializationDelegate deserializationDelegate) 
throws IOException;
+
+   void setNextBuffer(Buffer buffer) throws IOException;
+
+   void select(VirtualChannelSelector event);
+
+   @Override
+   void close();
+}
+
+class NoDataDemultiplexer implements Demultiplexer {
+   private final InputChannelInfo channelInfo;
+
+   public NoDataDemultiplexer(InputChannelInfo channelInfo) {
+   this.channelInfo = channelInfo;
+   }
+
+   @Override
+   public RecordDeserializer.DeserializationResult 
getNextRecord(DeserializationDelegate deserializationDelegate) {
+   throw getException();
+   }
+
+   @Override
+   public void setNextBuffer(Buffer buffer) {
+   throw getException();
+   }
+
+   @Override
+   public void select(VirtualChannelSelector event) {
+   throw getException();
+   }
+
+   private IllegalStateException getException() {
+   return new IllegalStateException(channelInfo + " should not 
receive any data/events during recovery");
+   }
+
+   @Override
+   public void close() {
+   }
+}
+
+/**
+ * Parameter structure to pass all relevant information to the factory methods 
of @{@link Demultiplexer}.
+ */
+class DemultiplexParameters {
+   final IOManager ioManager;
+   final InflightDataRescalingDescriptor channelMapping;
+   final Function> gatePartitionerRetriever;
+   final SerializationDelegate delegate;
+   final int numberOfChannels;
+   final int subtaskIndex;
+
+   @SuppressWarnings("unchecked")
+   DemultiplexParameters(
+   TypeSerializer inputSerializer,
+   IOManager ioManager,
+   InflightDataRescalingDescriptor channelMapping,
+   

[GitHub] [flink] flinkbot edited a comment on pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


flinkbot edited a comment on pull request #13845:
URL: https://github.com/apache/flink/pull/13845#issuecomment-718853990


   
   ## CI report:
   
   * f63fa7264bda0bafc3e97ac79b920eb34cae3e95 UNKNOWN
   * 3d418c42a5ffe60a99d17a22a6aaba9955e347f9 UNKNOWN
   * 6cc4a796b26b66b761f50730f7534c36afad5afa UNKNOWN
   * dff9f25ac4086acf4b2dbe650a0ed80dd0385ddb UNKNOWN
   * 6ff570d417423ac84ad5d906900758fbce2b8f43 UNKNOWN
   * 894e952378dd3ae2c7e92f65b90689ec6c989c8b UNKNOWN
   * f3aff0c1259abd05d4c18d404874e687431a157c UNKNOWN
   * e07c51a3c4cf17ad447312889227e33e4b13d4f3 UNKNOWN
   * 215a25d83260a048877bdf8462a06473676efb8a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9223)
 
   * fe94615bb4be9771757f98e93ff11e8d765231f2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13845: [FLINK-19801] Adding virtual channels for rescaling unaligned checkpoints.

2020-11-07 Thread GitBox


AHeise commented on a change in pull request #13845:
URL: https://github.com/apache/flink/pull/13845#discussion_r519226903



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Demultiplexer.java
##
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
+import org.apache.flink.runtime.checkpoint.RescaledChannelsMapping;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.VirtualChannelSelector;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link RecordDeserializer}-like interface for recovery. To avoid additional 
virtual method calls on the
+ * non-recovery hotpath, this interface is not extending RecordDeserializer.
+ */
+interface Demultiplexer extends AutoCloseable {
+   RecordDeserializer.DeserializationResult 
getNextRecord(DeserializationDelegate deserializationDelegate) 
throws IOException;
+
+   void setNextBuffer(Buffer buffer) throws IOException;
+
+   void select(VirtualChannelSelector event);
+
+   @Override
+   void close();
+}
+
+class NoDataDemultiplexer implements Demultiplexer {
+   private final InputChannelInfo channelInfo;
+
+   public NoDataDemultiplexer(InputChannelInfo channelInfo) {
+   this.channelInfo = channelInfo;
+   }
+
+   @Override
+   public RecordDeserializer.DeserializationResult 
getNextRecord(DeserializationDelegate deserializationDelegate) {
+   throw getException();
+   }
+
+   @Override
+   public void setNextBuffer(Buffer buffer) {
+   throw getException();
+   }
+
+   @Override
+   public void select(VirtualChannelSelector event) {
+   throw getException();
+   }
+
+   private IllegalStateException getException() {
+   return new IllegalStateException(channelInfo + " should not 
receive any data/events during recovery");
+   }
+
+   @Override
+   public void close() {
+   }
+}
+
+/**
+ * Parameter structure to pass all relevant information to the factory methods 
of @{@link Demultiplexer}.
+ */
+class DemultiplexParameters {
+   final IOManager ioManager;
+   final InflightDataRescalingDescriptor channelMapping;
+   final Function> gatePartitionerRetriever;
+   final SerializationDelegate delegate;
+   final int numberOfChannels;
+   final int subtaskIndex;
+
+   @SuppressWarnings("unchecked")
+   DemultiplexParameters(
+   TypeSerializer inputSerializer,
+   IOManager ioManager,
+   InflightDataRescalingDescriptor channelMapping,
+   

  1   2   3   >