[GitHub] [flink] KarmaGYZ commented on a change in pull request #14897: [FLINK-21221][runtime] Deduplication for multiple ResourceCounters

2021-02-18 Thread GitBox


KarmaGYZ commented on a change in pull request #14897:
URL: https://github.com/apache/flink/pull/14897#discussion_r578870795



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##
@@ -600,61 +603,69 @@ private void allocateSlot(
 FutureUtils.assertNoException(slotAllocationResponseProcessingFuture);
 }
 
-private void tryFulfillRequirementsWithPendingSlots(
+private ResourceCounter tryFulfillRequirementsWithPendingSlots(
 JobID jobId,
-Map missingResources,
+Collection> missingResources,
 ResourceCounter pendingSlots) {
-for (Map.Entry missingResource : 
missingResources.entrySet()) {
+for (Map.Entry missingResource : 
missingResources) {
 ResourceProfile profile = missingResource.getKey();
 for (int i = 0; i < missingResource.getValue(); i++) {
-if (!tryFulfillWithPendingSlots(profile, pendingSlots)) {
-boolean couldAllocateWorkerAndReserveSlot =
+final Tuple2 matchingResult =
+tryFulfillWithPendingSlots(profile, pendingSlots);
+pendingSlots = matchingResult.f1;
+if (!matchingResult.f0) {
+final Tuple2 allocationResult =
 tryAllocateWorkerAndReserveSlot(profile, 
pendingSlots);
+pendingSlots = allocationResult.f1;
+boolean couldAllocateWorkerAndReserveSlot = 
allocationResult.f0;
 if (!couldAllocateWorkerAndReserveSlot && 
sendNotEnoughResourceNotifications) {
 LOG.warn("Could not fulfill resource requirements of 
job {}.", jobId);
 resourceActions.notifyNotEnoughResourcesAvailable(
 jobId, 
resourceTracker.getAcquiredResources(jobId));
-return;
+return pendingSlots;
 }
 }
 }
 }
+return pendingSlots;
 }
 
-private boolean tryFulfillWithPendingSlots(
+private Tuple2 tryFulfillWithPendingSlots(

Review comment:
   That's a good point.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14868: [FLINK-21326][runtime] Optimize building topology when initializing ExecutionGraph

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14868:
URL: https://github.com/apache/flink/pull/14868#issuecomment-773192044


   
   ## CI report:
   
   * 85af2bb6354ce46e90d607ee0f784a91266b80b1 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12922)
 
   * 997d4dddfc4d6ae2f45d80102655d02f09168d1b Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13480)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-21319) hadoop-mapreduce jars are not loaded into classpath when submiting flink on yarn jobs.

2021-02-18 Thread Tang Yan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286807#comment-17286807
 ] 

Tang Yan commented on FLINK-21319:
--

Thanks for your reply. [~trohrmann] Could you check my code in Description? 

> hadoop-mapreduce jars are not loaded into classpath when submiting flink on 
> yarn jobs.
> --
>
> Key: FLINK-21319
> URL: https://issues.apache.org/jira/browse/FLINK-21319
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.12.1
>Reporter: Tang Yan
>Priority: Major
>
> My code is to query hive:
> {code:java}
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
> String name            = "myhive";
> String defaultDatabase = "test"; 
> String hiveConfDir     = "/etc/hive/conf";
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); 
> tableEnv.registerCatalog(name, hive);
> tableEnv.useDatabase(defaultDatabase);
> String testsql="select count(1) from mytable"; 
> tableEnv.executeSql(testsql); 
> {code}
> Env: Flink 1.12.1 + CDH6.3.0
> My submit command:
>  
> {code:java}
> export HADOOP_CLASSPATH=`hadoop classpath`
> /opt/flink-1.12.1/bin/flink run -m yarn-cluster -p 2 -c 
> com..flink.test.HiveConnTestJob /home/path/flinkTestCDH6-0.0.1-SNAPSHOT.jar
> {code}
>  
> Job ERROR:
>  
> {code:java}
> java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf
> {code}
>  
> If I do hadoop classpath on the server, I can see hadoop-mapreduce jars 
> folder is included as below, but when I check the flink job logs, it's not 
> included there.
>  
> {code:java}
> [root@my_server1 lib]# hadoop classpath
> /etc/hadoop/conf:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/../../hadoop/lib/*:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/../../hadoop/.//*:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/../../hadoop-hdfs/./:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/.//*:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/../../hadoop-yarn/.//*:/opt/cloudera/parcels/GPLEXTRAS-6.3.0-1.gplextras6.3.0.p0.1279813/lib/hadoop/lib/COPYING.hadoop-lzo:/opt/cloudera/parcels/GPLEXTRAS-6.3.0-1.gplextras6.3.0.p0.1279813/lib/hadoop/lib/hadoop-lzo-0.4.15-cdh6.3.0.jar:/opt/cloudera/parcels/GPLEXTRAS-6.3.0-1.gplextras6.3.0.p0.1279813/lib/hadoop/lib/hadoop-lzo.jar:/opt/cloudera/parcels/GPLEXTRAS-6.3.0-1.gplextras6.3.0.p0.1279813/lib/hadoop/lib/native
> {code}
>  
> Flink job logs:
>  
> {code:java}
> 2021-02-08 05:26:42,590 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Classpath: 
> :flinkTestCDH6-0.0.1-SNAPSHOT.jar:lib:lib/flink-connector-base-1.12.1.jar:lib/flink-connector-cassandra_2.11-1.12.1.jar:lib/flink-connector-files-1.12.1.jar:lib/flink-connector-jdbc_2.11-1.12.1.jar:lib/flink-csv-1.12.1.jar:lib/flink-file-sink-common-1.12.1.jar:lib/flink-json-1.12.1.jar:lib/flink-shaded-zookeeper-3.4.5-cdh6.3.0.jar:lib/flink-sql-connector-elasticsearch7_2.11-1.12.1.jar:lib/flink-sql-connector-hbase-2.2_2.11-1.12.1.jar:lib/flink-sql-connector-hive-2.1.1-cdh6.3.0_2.11-1.12.1.jar:lib/flink-sql-connector-kafka_2.11-1.12.1.jar:lib/flink-table-blink_2.11-1.12.1.jar:lib/flink-table_2.11-1.12.1.jar:lib/log4j-1.2-api-2.12.1.jar:lib/log4j-api-2.12.1.jar:lib/log4j-core-2.12.1.jar:lib/log4j-slf4j-impl-2.12.1.jar:flink-dist_2.11-1.12.1.jar:job.graph:flink-conf.yaml::/etc/hadoop/conf.cloudera.yarn:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/gcs-connector-hadoop3-shaded.jar:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/hadoop-common.jar:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/hadoop-nfs.jar:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/hadoop-kms.jar:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/hadoop-azure-datalake.jar:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/gcs-connector-hadoop3-1.9.10-cdh6.3.0-shaded.jar:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/hadoop-auth.jar:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/hadoop-aws.jar:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/hadoop-common-tests.jar:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/hadoop-azure.jar:/opt/cloudera/parcels/CDH-6.3.0

[GitHub] [flink] flinkbot edited a comment on pull request #14868: [FLINK-21326][runtime] Optimize building topology when initializing ExecutionGraph

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14868:
URL: https://github.com/apache/flink/pull/14868#issuecomment-773192044


   
   ## CI report:
   
   * 85af2bb6354ce46e90d607ee0f784a91266b80b1 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12922)
 
   * 997d4dddfc4d6ae2f45d80102655d02f09168d1b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-21411) The components on which Flink depends may contain vulnerabilities. If yes, fix them.

2021-02-18 Thread dgbro (Jira)
dgbro created FLINK-21411:
-

 Summary: The components on which Flink depends may contain 
vulnerabilities. If yes, fix them.
 Key: FLINK-21411
 URL: https://issues.apache.org/jira/browse/FLINK-21411
 Project: Flink
  Issue Type: Improvement
Reporter: dgbro


In Flink v1.11.3 contains mesos(version: 1.0.1) okhttp(version: 3.7.0) 
log4j(version:2.12.1) netty(version:3.10.6) jackson-databind(2.10.1) 
jackson(version:2.10.1) and bzip2(version:1.0.6). There are many 
vulnerabilities, like 
CVE-2017-7687,CVE-2017-9790,CVE-2018-8023,CVE-2018-20200,CVE-2020-9488,CVE-2019-20444,CVE-2019-20445,CVE-2019-16869,CVE-2020-25649,CVE-2020-25649,CVE-2019-12900,CVE-2016-3189
 etc. please confirm these version and fix. thx



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-17061) Unset process/flink memory size from configuration once dynamic worker resource is activated.

2021-02-18 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-17061.

Fix Version/s: 1.13.0
   Resolution: Fixed

Fixed via:
* master (1.13): 082fcebc33383a21d6f88e4d52d2e334e12fe085

> Unset process/flink memory size from configuration once dynamic worker 
> resource is activated.
> -
>
> Key: FLINK-17061
> URL: https://issues.apache.org/jira/browse/FLINK-17061
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration, Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> With FLINK-14106, memory of a TaskExecutor is decided in two steps on active 
> resource managers.
> - {{SlotManager}} decides {{WorkerResourceSpec}}, including memory used by 
> Flink tasks: task heap, task off-heap, network and managed memory.
> - {{ResourceManager}} derives {{TaskExecutorProcessSpec}} from 
> {{WorkerResourceSpec}} and the configuration, deciding sizes of memory used 
> by Flink framework and JVM: framework heap, framework off-heap, jvm metaspace 
> and jvm overhead.
> This works fine for now, because both {{WorkerResourceSpec}} and 
> {{TaskExecutorProcessSpec}} are derived from the same configurations. 
> However, it might cause problem if later we have new {{SlotManager}} 
> implementations that decides {{WorkerResourceSpec}} dynamically. In such 
> cases, the process/flink sizes in configuration should be ignored, or it may 
> easily lead to configuration conflicts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong closed pull request #14954: [FLINK-17061][runtime] Unset TM total process/flink memory size for fine-grained resource management

2021-02-18 Thread GitBox


xintongsong closed pull request #14954:
URL: https://github.com/apache/flink/pull/14954


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14964: [FLINK-21410][docs] Document checkpoint interval trade-offs

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14964:
URL: https://github.com/apache/flink/pull/14964#issuecomment-781637879


   
   ## CI report:
   
   * ed1dc2c2399cd7a9a0e8c5c68f4b39470e7ec10e Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13476)
 
   * 63cafd0fe1c8c1c43e69f5acf5faf1256073dcea Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13477)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-21133) FLIP-27 Source does not work with synchronous savepoint

2021-02-18 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286380#comment-17286380
 ] 

Jiangjie Qin edited comment on FLINK-21133 at 2/19/21, 12:41 AM:
-

It looks that we are aiming to get a long term solution here. My two cents:

At a high level, in Flink any control logic initiated from JM eventually goes 
to one of the following two types / mechanisms.
 * independent point-to-point control (e.g. all JM to TM RPC)
 ** This is an "out-of-band" control flow, pretty much requests / responses 
between JM and TMs.
 ** In order to make sure the tasks will respond to the control command, the 
mailbox thread may have to be interrupted from a blocking call.
 * job-graph wide coordinated / orderly control (e.g. checkpoint, 
EndOfPartition)
 ** This is a control flow combining "out-of-band" and "in-band" mechanism
 ** JM sends the command to Sources via RPC (i.e. out-of-band)
 ** Sources execute the command, then propagate the command to downstream 
operators in data stream (in-band)
 ** The downstream operators receives the command from its input data stream 
(in-band)
 ** JM receives the ack from all the operators and complete the control logic. 
(out-of-band)
 ** [Optional] JM notifies all the TMs about the execution result (out-of-band) 

When it comes to {{StopWithSavePoint}}, to me the most intuitive semantic is 
that the tasks naturally stop right after taking the savepoint without any side 
effects. That means:
 # The operators / tasks stop processing records right after the savepoint is 
taken.
 # The operators / tasks do not receive {{EndOfPartition}} / {{EndOfInput}} 
because these events have already been assigned other meanings.
 # {{StreamOperator.close()}} should not be invoked to confuse the operators. 
Instead, {{Stream}}{{Operator.dispose()}} should be invoked directly.

At this point, I think the second control flow would just work. As long as we 
adjust the behavior of {{StreamTask}} a little bit.

Implementation wise, would the following work?
 # The JM sends to Sources a "StopWithSavepoint" RPC
 # The Sources take a snapshot, sends a checkpoint barrier with a flag 
indicating stop after checkpointing, then it stops processing data but just 
blocks on mailbox. This could be done either via a state machine for the task 
or simply a flag.
 # The downstream tasks align the checkpoint barrier, take their own snapshots, 
send the barrier to downstream tasks, then also block on mailbox.
 # After the JM finalize the checkpoint, it notifies the tasks of the 
completion of the checkpoint and the tasks will then exit.
 # When the tasks exit, the operators are not closed, but disposed directly.

For the legacy sources, I agree with [~kezhuw] that special treatment might be 
inevitable.


was (Author: becket_qin):
It looks that we are aiming to get a long term solution here. My two cents:

At a high level, in Flink any control logic initiated from JM eventually goes 
to one of the following two types / mechanisms.
 * independent point-to-point control (e.g. all JM to TM RPC)
 ** This is an "out-of-band" control flow, pretty much requests / responses 
between JM and TMs.
 ** In order to make sure the tasks will respond to the control command, the 
mailbox thread may have to be interrupted from a blocking call.
 * job-graph wide coordinated / orderly control (e.g. checkpoint, 
EndOfPartition)
 ** This is a control flow combining "out-of-band" and "in-band" mechanism
 ** JM sends the command to Sources via RPC (i.e. out-of-band)
 ** Sources execute the command, then propagate the command to downstream 
operators in data stream (in-band)
 ** The downstream operators receives the command from its input data stream 
(in-band)
 ** JM receives the ack from all the operators and complete the control logic. 
(out-of-band)
 ** [Optional] JM notifies all the TMs about the execution result (out-of-band) 

When it comes to {{StopWithSavePoint}}, to me the most intuitive semantic that 
the tasks are naturally stopped right after the savepoint without any side 
effects. That means:
 # The operators / tasks stop processing records right after the savepoint is 
taken.
 # The operators / tasks do not receive {{EndOfPartition}} / {{EndOfInput}} 
because these events have already been assigned other meanings.
 # {{StreamOperator.close()}} should not be invoked to confuse the operators. 
Instead, {{Stream}}{{Operator.dispose()}} should be invoked directly.

At this point, I think the second control flow would just work. As long as we 
adjust the behavior of {{StreamTask}} a little bit.

Implementation wise, would the following work?
 # The JM sends to Sources a "StopWithSavepoint" RPC
 # The Sources take a snapshot, sends a checkpoint barrier with a flag 
indicating stop after checkpointing, then it stops processing data but just 
blocks on mailbox. This could be done either 

[GitHub] [flink] flinkbot edited a comment on pull request #14951: [FLINK-21380][coordination] Hide terminal ExecutionGraph in StateWithExecutionGraph

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14951:
URL: https://github.com/apache/flink/pull/14951#issuecomment-780579743


   
   ## CI report:
   
   * 75d0067ce378ce84d7e8edbe1810cf85bdcab9fb Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13470)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14798:
URL: https://github.com/apache/flink/pull/14798#issuecomment-769223911


   
   ## CI report:
   
   * af8de65a4905ae47ab704a75acfcd1b2e897d915 UNKNOWN
   * 772e076ed9d773e16713236c942f4e30b659d6eb UNKNOWN
   * 8e732bfb2bddc38ec7422f482dcda4be3d296408 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13413)
 
   * 854b645872b5596d2cc618c65dac0228c616eec2 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13478)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14928: [FLINK-21360][coordination] Make resource timeout configurable

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14928:
URL: https://github.com/apache/flink/pull/14928#issuecomment-14828


   
   ## CI report:
   
   * 20915f746d6fd359f00a6f7a77aa3b7c349a94fb Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13469)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14798:
URL: https://github.com/apache/flink/pull/14798#issuecomment-769223911


   
   ## CI report:
   
   * af8de65a4905ae47ab704a75acfcd1b2e897d915 UNKNOWN
   * 772e076ed9d773e16713236c942f4e30b659d6eb UNKNOWN
   * 8e732bfb2bddc38ec7422f482dcda4be3d296408 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13413)
 
   * 854b645872b5596d2cc618c65dac0228c616eec2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dannycranmer commented on a change in pull request #14737: [FLINK-19667] Add AWS Glue Schema Registry integration

2021-02-18 Thread GitBox


dannycranmer commented on a change in pull request #14737:
URL: https://github.com/apache/flink/pull/14737#discussion_r578805957



##
File path: flink-end-to-end-tests/test-scripts/test_glue_schema_registry.sh
##
@@ -0,0 +1,72 @@
+#!/usr/bin/env bash

Review comment:
   @LinyuYao1021 Please update 
[run-nightly-tests.sh](https://github.com/apache/flink/blob/master/flink-end-to-end-tests/run-nightly-tests.sh)
 to include this test





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zentol commented on a change in pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-02-18 Thread GitBox


zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r578794106



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/MockExecutionGraphBase.java
##
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.scheduler.InternalFailuresListener;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/** Base class for mocking the ExecutionGraph. */
+public class MockExecutionGraphBase implements ExecutionGraph {
+
+@Override
+public String getJsonPlan() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public JobID getJobID() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public String getJobName() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public JobStatus getState() {
+throw new UnsupportedOperationException();
+}
+
+@Nullable
+@Override
+public ErrorInfo getFailureInfo() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public long getStatusTimestamp(JobStatus status) {
+throw new UnsupportedOperationException();
+}
+
+@Nullable
+@Override
+public CheckpointCoordinatorConfiguration 
getCheckpointCoordinatorConfiguration() {
+throw new UnsupportedOperationException();
+}
+
+@Nullable
+@Override
+public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
+throw new UnsupportedOperationException();
+}
+
+@Nullable
+@Override
+public ArchivedExecutionConfig getArchivedExecutionConfig() {
+   

[GitHub] [flink] zentol commented on a change in pull request #14963: [FLINK-21362][coordination] Move State.onEnter() logic into constructor

2021-02-18 Thread GitBox


zentol commented on a change in pull request #14963:
URL: https://github.com/apache/flink/pull/14963#discussion_r578791931



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/RestartingTest.java
##
@@ -209,6 +210,7 @@ public void close() throws Exception {
 NoOpExecutionDeploymentListener.get(),
 (execution, newState) -> {},
 0L);
+setJsonPlan("");

Review comment:
   .





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r578787023



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -635,6 +641,33 @@ public void cancel() {
 return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
 }
 
+protected void archiveGlobalFailure(Throwable failure) {
+taskFailureHistory.add(
+new ErrorInfo(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED)));
+log.debug("Archive global failure.", failure);
+}
+
+protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+Optional executionOptional =
+failureHandlingResult
+.getExecutionVertexIdOfFailedTask()
+.map(this::getExecutionVertex)
+.map(ExecutionVertex::getCurrentExecutionAttempt);

Review comment:
   You're right. I remember looking into that. Back then, I mistook the 
task as only focusing on "task failures". That was a misunderstanding. I added 
the test for global failover and the test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r578786368



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -635,6 +641,33 @@ public void cancel() {
 return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
 }
 
+protected void archiveGlobalFailure(Throwable failure) {
+taskFailureHistory.add(
+new ErrorInfo(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED)));
+log.debug("Archive global failure.", failure);
+}
+
+protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+Optional executionOptional =
+failureHandlingResult
+.getExecutionVertexIdOfFailedTask()
+.map(this::getExecutionVertex)
+.map(ExecutionVertex::getCurrentExecutionAttempt);
+
+executionOptional.ifPresent(
+execution ->
+execution
+.getFailureInfo()
+.ifPresent(
+failureInfo -> {
+
taskFailureHistory.add(failureInfo);
+log.debug(
+"Archive local failure 
causing attempt {} to fail: {}",
+execution.getAttemptId(),
+
failureInfo.getExceptionAsString());
+}));

Review comment:
   But looking into that again, I realize that there is, indeed, the 
`failureInfo` in `ExecutionGraph` that is set during 
[handleTaskFailure](https://github.com/XComp/flink/blob/8e732bfb2bddc38ec7422f482dcda4be3d296408/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L218)
 and 
[handleGlobalFailure](https://github.com/XComp/flink/blob/8e732bfb2bddc38ec7422f482dcda4be3d296408/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L236).
 This is used to forward the current root cause of the `ExecutionGraph` to the 
Web UI. I could have used that. But the drawback is that it's using 
`System.currentMillis`.
   
   Instead, I'm gonna work on removing the `ExecutionGraph.failureInfo` (and 
related methods) as part of FLINK-21188. It becomes redundant after we enable 
to exception history since it's just the exception history's last entry.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dannycranmer commented on pull request #14737: [FLINK-19667] Add AWS Glue Schema Registry integration

2021-02-18 Thread GitBox


dannycranmer commented on pull request #14737:
URL: https://github.com/apache/flink/pull/14737#issuecomment-781667422


   As a follow up we should add support for SQL client and Table API by:
   - Adding a shaded module for SQL client, similar to 
[flink-sql-avro-confluent-registry](https://github.com/apache/flink/tree/master/flink-formats/flink-sql-avro-confluent-registry)
   - Create a 
[FormatFactory](https://github.com/apache/flink/blob/master/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java)
   - Add a 
[service](https://github.com/apache/flink/blob/master/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14964: [FLINK-21410][docs] Document checkpoint interval trade-offs

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14964:
URL: https://github.com/apache/flink/pull/14964#issuecomment-781637879


   
   ## CI report:
   
   * ed1dc2c2399cd7a9a0e8c5c68f4b39470e7ec10e Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13476)
 
   * 63cafd0fe1c8c1c43e69f5acf5faf1256073dcea Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13477)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14847:
URL: https://github.com/apache/flink/pull/14847#issuecomment-772387941


   
   ## CI report:
   
   * 9a2ea20ce0803e48edfc3ab7bcc02078b7410fbf UNKNOWN
   * 4aecfae5f4889de59c7f1d71d39647b6ee6f9ad8 UNKNOWN
   * 36b3d657e3fa3151aac181d709706dadf58c9d0f Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13464)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14964: [FLINK-21410][docs] Document checkpoint interval trade-offs

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14964:
URL: https://github.com/apache/flink/pull/14964#issuecomment-781637879


   
   ## CI report:
   
   * ed1dc2c2399cd7a9a0e8c5c68f4b39470e7ec10e Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13476)
 
   * 63cafd0fe1c8c1c43e69f5acf5faf1256073dcea UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #14964: [FLINK-21410][docs] Document checkpoint interval trade-offs

2021-02-18 Thread GitBox


flinkbot commented on pull request #14964:
URL: https://github.com/apache/flink/pull/14964#issuecomment-781637879


   
   ## CI report:
   
   * ed1dc2c2399cd7a9a0e8c5c68f4b39470e7ec10e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #14964: [FLINK-21410][docs] Document checkpoint interval trade-offs

2021-02-18 Thread GitBox


flinkbot commented on pull request #14964:
URL: https://github.com/apache/flink/pull/14964#issuecomment-781635100


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit ed1dc2c2399cd7a9a0e8c5c68f4b39470e7ec10e (Thu Feb 18 
21:07:16 UTC 2021)
   
   **Warnings:**
* Documentation files were touched, but no `.zh.md` files: Update Chinese 
documentation or file Jira ticket.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-21410) Document checkpoint interval trade-offs

2021-02-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-21410:
---
Labels: pull-request-available  (was: )

> Document checkpoint interval trade-offs
> ---
>
> Key: FLINK-21410
> URL: https://issues.apache.org/jira/browse/FLINK-21410
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Checkpointing
>Reporter: Chesnay Schepler
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> The checkpointing documentation only mentions the interval in passing, 
> without mentioning any trade-offs or considerations to take (more checkpoints 
> -> less data to re-process but more overhead), nor mentions common 
> configurations as a point-of-reference.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] sjwiesman opened a new pull request #14964: [FLINK-21410][docs] Document checkpoint interval trade-offs

2021-02-18 Thread GitBox


sjwiesman opened a new pull request #14964:
URL: https://github.com/apache/flink/pull/14964


   ## What is the purpose of the change
   
   Document the trade-offs of setting different checkpoint interval values and 
provide guidance on how to determine the appropriate interval for a given job. 
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r578737299



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -635,6 +641,33 @@ public void cancel() {
 return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
 }
 
+protected void archiveGlobalFailure(Throwable failure) {
+taskFailureHistory.add(
+new ErrorInfo(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED)));
+log.debug("Archive global failure.", failure);
+}
+
+protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+Optional executionOptional =
+failureHandlingResult
+.getExecutionVertexIdOfFailedTask()
+.map(this::getExecutionVertex)
+.map(ExecutionVertex::getCurrentExecutionAttempt);
+
+executionOptional.ifPresent(
+execution ->
+execution
+.getFailureInfo()
+.ifPresent(
+failureInfo -> {
+
taskFailureHistory.add(failureInfo);
+log.debug(
+"Archive local failure 
causing attempt {} to fail: {}",
+execution.getAttemptId(),
+
failureInfo.getExceptionAsString());
+}));

Review comment:
   Because of the timestamp in case of local failure: The `Execution` fails 
but does not necessarily trigger the `ExecutionGraph` to switch to fail. Hence, 
no timestamp is record for this kind of failure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14963: [FLINK-21362][coordination] Move State.onEnter() logic into constructor

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14963:
URL: https://github.com/apache/flink/pull/14963#issuecomment-781452668


   
   ## CI report:
   
   * 80874c3472c2d74de07c651d56c728e210e9db6d UNKNOWN
   * d6bbdeabce59f5556c0d1be6e88e098df84d7f98 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13462)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-21410) Document checkpoint interval trade-offs

2021-02-18 Thread Seth Wiesman (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seth Wiesman reassigned FLINK-21410:


Assignee: Seth Wiesman

> Document checkpoint interval trade-offs
> ---
>
> Key: FLINK-21410
> URL: https://issues.apache.org/jira/browse/FLINK-21410
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Checkpointing
>Reporter: Chesnay Schepler
>Assignee: Seth Wiesman
>Priority: Major
> Fix For: 1.13.0
>
>
> The checkpointing documentation only mentions the interval in passing, 
> without mentioning any trade-offs or considerations to take (more checkpoints 
> -> less data to re-process but more overhead), nor mentions common 
> configurations as a point-of-reference.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] rkhachatryan commented on a change in pull request #14740: [FLINK-21067][runtime][checkpoint] Modify the logic of computing which tasks to trigger/ack/commit to support finished tasks

2021-02-18 Thread GitBox


rkhachatryan commented on a change in pull request #14740:
URL: https://github.com/apache/flink/pull/14740#discussion_r578709529



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -651,39 +681,52 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
 }
 }
 
+private CompletableFuture calculateCheckpointPlan() {
+return checkpointPlanCalculator
+.calculateCheckpointPlan()
+// Disable checkpoints after tasks finished according to the 
flag.
+.thenApplyAsync(
+plan -> {
+if (!allowCheckpointsAfterTasksFinished
+&& !plan.getFinishedTasks().isEmpty()) {
+throw new CompletionException(

Review comment:
   Should we do this check **before** computing the plan? 
   Otherwise, 
   1. the flag doesn't prevent existing deployments from potential performance 
degradation
   2. the plan is computed for no purpose
   
   I think it can also be put it planCalculator if it's easier.
   
   WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14740: [FLINK-21067][runtime][checkpoint] Modify the logic of computing which tasks to trigger/ack/commit to support finished tasks

2021-02-18 Thread GitBox


rkhachatryan commented on a change in pull request #14740:
URL: https://github.com/apache/flink/pull/14740#discussion_r578709529



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -651,39 +681,52 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
 }
 }
 
+private CompletableFuture calculateCheckpointPlan() {
+return checkpointPlanCalculator
+.calculateCheckpointPlan()
+// Disable checkpoints after tasks finished according to the 
flag.
+.thenApplyAsync(
+plan -> {
+if (!allowCheckpointsAfterTasksFinished
+&& !plan.getFinishedTasks().isEmpty()) {
+throw new CompletionException(

Review comment:
   Should we do this check before computing the plan? 
   Otherwise, 
   1. the flag doesn't prevent existing deployments from potential performance 
degradation
   2. the plan is computed for no purpose
   
   I think it can also be put it planCalculator if it's easier.
   
   WDYT?

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculator.java
##
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Default implementation for {@link CheckpointPlanCalculator}. If all tasks 
are running, it
+ * directly marks all the sources as tasks to trigger, otherwise it would try 
to find the running
+ * tasks without running processors as tasks to trigger.
+ */
+public class DefaultCheckpointPlanCalculator implements 
CheckpointPlanCalculator {
+
+private final JobID jobId;
+
+private final CheckpointPlanCalculatorContext context;
+
+private final List jobVerticesInTopologyOrder = new 
ArrayList<>();
+
+private final List allTasks = new ArrayList<>();
+
+private final List sourceTasks = new ArrayList<>();
+
+public DefaultCheckpointPlanCalculator(
+JobID jobId,
+CheckpointPlanCalculatorContext context,
+Iterable jobVerticesInTopologyOrderIterable) {
+
+this.jobId = checkNotNull(jobId);
+this.context = checkNotNull(context);
+
+checkNotNull(jobVerticesInTopologyOrderIterable);
+jobVerticesInTopologyOrderIterable.forEach(
+jobVertex -> {
+jobVerticesInTopologyOrder.add(jobVertex);
+
allTasks.addAll(Arrays.asList(jobVertex.getTaskVertices()));
+
+if (jobVertex.getJobVertex().isInputVertex()) {
+
sourceTasks.addAll(Arrays.asList(jobVertex.getTaskVertices()));
+}
+});
+}
+
+@Override
+public CompletableFuture calculateCheckpointPlan() {
+return CompletableFuture.supplyAsync(
+() -> {
+try {
+checkAllTasksInitiated();
+
+CheckpointPlan result =
+context.hasFinishedTasks()
+ 

[jira] [Created] (FLINK-21410) Document checkpoint interval trade-offs

2021-02-18 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21410:


 Summary: Document checkpoint interval trade-offs
 Key: FLINK-21410
 URL: https://issues.apache.org/jira/browse/FLINK-21410
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Checkpointing
Reporter: Chesnay Schepler
 Fix For: 1.13.0


The checkpointing documentation only mentions the interval in passing, without 
mentioning any trade-offs or considerations to take (more checkpoints -> less 
data to re-process but more overhead), nor mentions common configurations as a 
point-of-reference.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16947) ArtifactResolutionException: Could not transfer artifact. Entry [...] has not been leased from this pool

2021-02-18 Thread Robert Metzger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger closed FLINK-16947.
--
Fix Version/s: 1.13.0
   Resolution: Fixed

Maybe a fix, merged to master only for now: 
https://github.com/apache/flink/commit/3279c85233a7d696b344d64d804fe473599bdd01

> ArtifactResolutionException: Could not transfer artifact.  Entry [...] has 
> not been leased from this pool
> -
>
> Key: FLINK-16947
> URL: https://issues.apache.org/jira/browse/FLINK-16947
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Reporter: Piotr Nowojski
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.13.0
>
>
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6982&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5
> Build of flink-metrics-availability-test failed with:
> {noformat}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test (end-to-end-tests) 
> on project flink-metrics-availability-test: Unable to generate classpath: 
> org.apache.maven.artifact.resolver.ArtifactResolutionException: Could not 
> transfer artifact org.apache.maven.surefire:surefire-grouper:jar:2.22.1 
> from/to google-maven-central 
> (https://maven-central-eu.storage-download.googleapis.com/maven2/): Entry 
> [id:13][route:{s}->https://maven-central-eu.storage-download.googleapis.com:443][state:null]
>  has not been leased from this pool
> [ERROR] org.apache.maven.surefire:surefire-grouper:jar:2.22.1
> [ERROR] 
> [ERROR] from the specified remote repositories:
> [ERROR] google-maven-central 
> (https://maven-central-eu.storage-download.googleapis.com/maven2/, 
> releases=true, snapshots=false),
> [ERROR] apache.snapshots (https://repository.apache.org/snapshots, 
> releases=false, snapshots=true)
> [ERROR] Path to dependency:
> [ERROR] 1) dummy:dummy:jar:1.0
> [ERROR] 2) org.apache.maven.surefire:surefire-junit47:jar:2.22.1
> [ERROR] 3) org.apache.maven.surefire:common-junit48:jar:2.22.1
> [ERROR] 4) org.apache.maven.surefire:surefire-grouper:jar:2.22.1
> [ERROR] -> [Help 1]
> [ERROR] 
> [ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
> switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR] 
> [ERROR] For more information about the errors and possible solutions, please 
> read the following articles:
> [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
> [ERROR] 
> [ERROR] After correcting the problems, you can resume the build with the 
> command
> [ERROR]   mvn  -rf :flink-metrics-availability-test
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] rmetzger merged pull request #14959: [FLINK-16947][Azure] Attempt to fix maven network issues

2021-02-18 Thread GitBox


rmetzger merged pull request #14959:
URL: https://github.com/apache/flink/pull/14959


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rmetzger commented on pull request #14959: [FLINK-16947][Azure] Attempt to fix maven network issues

2021-02-18 Thread GitBox


rmetzger commented on pull request #14959:
URL: https://github.com/apache/flink/pull/14959#issuecomment-781587879


   Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14951: [FLINK-21380][coordination] Hide terminal ExecutionGraph in StateWithExecutionGraph

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14951:
URL: https://github.com/apache/flink/pull/14951#issuecomment-780579743


   
   ## CI report:
   
   * 0ce7b0c76e548703b5aaa367785bb19634c37072 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13408)
 
   * 75d0067ce378ce84d7e8edbe1810cf85bdcab9fb Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13470)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14928: [FLINK-21360][coordination] Make resource timeout configurable

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14928:
URL: https://github.com/apache/flink/pull/14928#issuecomment-14828


   
   ## CI report:
   
   * 6be910fe164322d9862bad9c0d80f8ff4ee5fe62 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13444)
 
   * 20915f746d6fd359f00a6f7a77aa3b7c349a94fb Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13469)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14951: [FLINK-21380][coordination] Hide terminal ExecutionGraph in StateWithExecutionGraph

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14951:
URL: https://github.com/apache/flink/pull/14951#issuecomment-780579743


   
   ## CI report:
   
   * 0ce7b0c76e548703b5aaa367785bb19634c37072 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13408)
 
   * 75d0067ce378ce84d7e8edbe1810cf85bdcab9fb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14928: [FLINK-21360][coordination] Make resource timeout configurable

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14928:
URL: https://github.com/apache/flink/pull/14928#issuecomment-14828


   
   ## CI report:
   
   * 6be910fe164322d9862bad9c0d80f8ff4ee5fe62 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13444)
 
   * 20915f746d6fd359f00a6f7a77aa3b7c349a94fb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-21409) Add Avro to DataTypes & Serialization docs

2021-02-18 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21409:


 Summary: Add Avro to DataTypes & Serialization docs
 Key: FLINK-21409
 URL: https://issues.apache.org/jira/browse/FLINK-21409
 Project: Flink
  Issue Type: Improvement
  Components: API / Type Serialization System, Documentation, Formats 
(JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Chesnay Schepler
 Fix For: 1.13.0


The "Data Types & Serialization" barely mention Avro, which is surprising given 
how common it is.
Even basic things like how to create a correct TypeInformation for 
GenericRecords is missing, or special cases like FLINK-21386 which likely just 
won't work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21408) Clarify which DataStream sources support Batch execution

2021-02-18 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21408:


 Summary: Clarify which DataStream sources support Batch execution
 Key: FLINK-21408
 URL: https://issues.apache.org/jira/browse/FLINK-21408
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream, Documentation
Reporter: Chesnay Schepler
 Fix For: 1.13.0


The DataStream "Execution Mode" documentation goes to great lengths to describe 
the differences between the modes and impact on various aspects of Flink like 
checkpointing.

However the topic of connectors, and specifically which for Batch mode, or 
whether there even are any that don't, is not mentioned at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21407) Clarify which sources and APIs support which formats

2021-02-18 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21407:


 Summary: Clarify which sources and APIs support which formats
 Key: FLINK-21407
 URL: https://issues.apache.org/jira/browse/FLINK-21407
 Project: Flink
  Issue Type: New Feature
  Components: API / DataSet, API / DataStream, Documentation
Reporter: Chesnay Schepler
 Fix For: 1.13.0


The DataSet connectors documentation is essentially an empty desert amounting 
to "you can read files".

The DataStream connectors documentation do not mention formats like 
avro/parquet anywhere, nor the possibility to read from filesystems (only the 
sinks are documented).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21406) Add AvroParquetFileRecordFormat

2021-02-18 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21406:


 Summary: Add AvroParquetFileRecordFormat
 Key: FLINK-21406
 URL: https://issues.apache.org/jira/browse/FLINK-21406
 Project: Flink
  Issue Type: New Feature
  Components: API / DataStream, Formats (JSON, Avro, Parquet, ORC, 
SequenceFile)
Reporter: Chesnay Schepler
 Fix For: 1.13.0


There is currently no easy way to read avro GenericRecords from parquet via the 
new {{FileSource}}.
While helping out a user I started writing FileRecordFormat that could do that, 
but it requires some finalization.

The implementation is similar to our ParquetAvroWriters class, in that we just 
wrap some parquet classes and bridge our FileSystem with their IO abstraction.

The main goal was to have a format that reads data through our FileSystems, and 
not work directly against Hadoop to prevent a ClassLoader leak from the 
S3AFileSystem (threads in a thread pool can keep references to the user 
classloader).

According to the user it appears to be working, but it will need some cleanup, 
ideally support for specific records, support for checkpointing (which should 
be fairly easy I believe), maybe splitting files (not sure whether this works 
properly with Parquet) and of course tests + documentation.

{code}
public class ParquetAvroFileRecordFormat implements 
FileRecordFormat {
private final transient Schema schema;

public ParquetAvroFileRecordFormat(Schema schema) {
this.schema = schema;
}

@Override
public Reader createReader(
Configuration config, Path filePath, long splitOffset, long 
splitLength)
throws IOException {

final FileSystem fs = filePath.getFileSystem();
final FileStatus status = fs.getFileStatus(filePath);
final FSDataInputStream in = fs.open(filePath);

return new MyReader(
AvroParquetReader.builder(new 
InputFileWrapper(in, status.getLen()))
.withDataModel(GenericData.get())
.build());
}

@Override
public Reader restoreReader(
Configuration config,
Path filePath,
long restoredOffset,
long splitOffset,
long splitLength) {
// not called if checkpointing isn't used
return null;
}

@Override
public boolean isSplittable() {
// let's not worry about this for now
return false;
}

@Override
public TypeInformation getProducedType() {
return new GenericRecordAvroTypeInfo(schema);
}

private static class MyReader implements 
FileRecordFormat.Reader {

private final ParquetReader parquetReader;

private MyReader(ParquetReader parquetReader) {
this.parquetReader = parquetReader;
}

@Nullable
@Override
public GenericRecord read() throws IOException {
return parquetReader.read();
}

@Override
public void close() throws IOException {
parquetReader.close();
}
}

private static class InputFileWrapper implements InputFile {

private final FSDataInputStream inputStream;
private final long length;

private InputFileWrapper(FSDataInputStream inputStream, long length) {
this.inputStream = inputStream;
this.length = length;
}

@Override
public long getLength() {
return length;
}

@Override
public SeekableInputStream newStream() {
return new SeekableInputStreamAdapter(inputStream);
}
}

private static class SeekableInputStreamAdapter extends 
DelegatingSeekableInputStream {

private final FSDataInputStream inputStream;

private SeekableInputStreamAdapter(FSDataInputStream inputStream) {
super(inputStream);
this.inputStream = inputStream;
}

@Override
public long getPos() throws IOException {
return inputStream.getPos();
}

@Override
public void seek(long newPos) throws IOException {
inputStream.seek(newPos);
}
}
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zentol commented on a change in pull request #14963: [FLINK-21362][coordination] Move State.onEnter() logic into constructor

2021-02-18 Thread GitBox


zentol commented on a change in pull request #14963:
URL: https://github.com/apache/flink/pull/14963#discussion_r578652262



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java
##
@@ -57,10 +57,15 @@ public void testTransitionToExecuting() throws Exception {
 try (MockContext ctx = new MockContext()) {
 ctx.setHasEnoughResources(() -> true);
 
-WaitingForResources wfr =
-new WaitingForResources(ctx, log, RESOURCE_COUNTER, 
Duration.ZERO);
 ctx.setExpectExecuting(assertNonNull());
-wfr.onEnter();
+
+new WaitingForResources(ctx, log, RESOURCE_COUNTER, Duration.ZERO);
+// run delayed actions
+for (ScheduledRunnable scheduledRunnable : 
ctx.getScheduledRunnables()) {
+if (!ctx.hasStateTransition()) {

Review comment:
   so basically you want to run actions until you hit a state transition?
   I think you could make this more obvious by checking for hasStateTransition 
after running the action and existing the loop

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
##
@@ -907,20 +909,37 @@ public void runIfState(State expectedState, Runnable 
action, Duration delay) {
 
 // 
 
+/** Note: Do not call this method from a State constructor. */
 @VisibleForTesting
-void transitionToState(State newState) {
-if (state != newState) {
-LOG.debug(
-"Transition from state {} to {}.",
-state.getClass().getSimpleName(),
-newState.getClass().getSimpleName());
-
-State oldState = state;
-oldState.onLeave(newState.getClass());
-
-state = newState;
-newState.onEnter();
-}
+ void transitionToState(StateFactory targetState) {
+Preconditions.checkState(
+state != null, "State transitions are now allowed while 
construcing a state.");

Review comment:
   ```suggestion
   state != null, "State transitions are not allowed while 
constructing a state.");
   ```

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
##
@@ -907,20 +909,37 @@ public void runIfState(State expectedState, Runnable 
action, Duration delay) {
 
 // 
 
+/** Note: Do not call this method from a State constructor. */
 @VisibleForTesting
-void transitionToState(State newState) {
-if (state != newState) {
-LOG.debug(
-"Transition from state {} to {}.",
-state.getClass().getSimpleName(),
-newState.getClass().getSimpleName());
-
-State oldState = state;
-oldState.onLeave(newState.getClass());
-
-state = newState;
-newState.onEnter();
-}
+ void transitionToState(StateFactory targetState) {
+Preconditions.checkState(
+state != null, "State transitions are now allowed while 
construcing a state.");
+Preconditions.checkState(
+state.getClass() != targetState.getStateClass(),
+"Attempted to transition into the very state the scheduler is 
already in.");
+
+LOG.debug(
+"Transition from state {} to {}.",
+state.getClass().getSimpleName(),
+targetState.getStateClass().getSimpleName());
+
+State oldState = state;
+oldState.onLeave(targetState.getStateClass());
+
+// Guard against state transitions while constructing state objects.
+//
+// Consider the following scenario:
+// Scheduler is in state Restarting, once the cancellation is 
complete, we enter the
+// transitionToState(WaitingForResources) method.
+// In the constructor of WaitingForResources, we call 
`notifyNewResourcesAvailable()`, which
+// finds resources and enters transitionsToState(Executing). We are in 
state Executing. Then
+// we return from the methods and go back in our call stack to the
+// transitionToState(WaitingForResources) call, where we overwrite 
Executing with
+// WaitingForResources. And there we have it, a deployed execution 
graph, and a scheduler
+// that is in WaitingForResources.
+state = null;

Review comment:
   hmm well this is annoying; these kind of loops are driving me crazy in 
the slot procotol...
   
   what if all goToX methods would schedule the state transition instead, or if 
transitionToState just schedules it? well that would be annoying during tests 
wouldn't it...
   

##
File path: 
fl

[jira] [Closed] (FLINK-21381) Kubernetes HA documentation does not state required service account and role

2021-02-18 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-21381.
-
Fix Version/s: (was: 1.12.2)
   1.12.3
   Resolution: Fixed

Fixed via

1.13.0: dbf62f25f762f0b4206823913a8c1441fd415cab
1.12.3: 3bbfb73908b239f957169805bf591c69ce949ccb

> Kubernetes HA documentation does not state required service account and role
> 
>
> Key: FLINK-21381
> URL: https://issues.apache.org/jira/browse/FLINK-21381
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Coordination
>Affects Versions: 1.12.1, 1.13.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.3
>
>
> The existing Kubernetes HA documentation leaves out the fact that one needs a 
> service account with edit rights to use it. The reason for this is that the 
> Kubernetes HA services create and update config maps.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tillrohrmann closed pull request #14947: [BP-1.12][FLINK-21381][docs] Add information about service account permissions to K8s HA service documentation

2021-02-18 Thread GitBox


tillrohrmann closed pull request #14947:
URL: https://github.com/apache/flink/pull/14947


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on pull request #14947: [BP-1.12][FLINK-21381][docs] Add information about service account permissions to K8s HA service documentation

2021-02-18 Thread GitBox


tillrohrmann commented on pull request #14947:
URL: https://github.com/apache/flink/pull/14947#issuecomment-781547092


   Manually merged via 3bbfb73908b239f957169805bf591c69ce949ccb



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-21405) SchedulerNGFactoryFactoryTest fails if DeclarativeScheduler is enabled via property

2021-02-18 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21405:


 Summary: SchedulerNGFactoryFactoryTest fails if 
DeclarativeScheduler is enabled via property
 Key: FLINK-21405
 URL: https://issues.apache.org/jira/browse/FLINK-21405
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Reporter: Chesnay Schepler
 Fix For: 1.13.0


The DeclarativeScheduler can be enabled via a custom system property.

The SchedulerNGFactoryFactoryTest contains 1 test that checks the default value 
for some option is returned if it was not configured, which fails if the system 
property is used.
I'd suggest to remove the test because it doesn't seem that useful to me.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21404) YARNITCase runs indefinitely

2021-02-18 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21404:


 Summary: YARNITCase runs indefinitely
 Key: FLINK-21404
 URL: https://issues.apache.org/jira/browse/FLINK-21404
 Project: Flink
  Issue Type: Sub-task
  Components: Deployment / YARN, Runtime / Coordination, Tests
Reporter: Chesnay Schepler
 Fix For: 1.13.0


The YARNITCase does not terminate when running with the DeclarativeScheduler.

There are also some improvements to be made for reporting the error cause of 
tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21403) Some tests expect to fail if parallelism cannot be met

2021-02-18 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21403:


 Summary: Some tests expect to fail if parallelism cannot be met
 Key: FLINK-21403
 URL: https://issues.apache.org/jira/browse/FLINK-21403
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Affects Versions: 1.13.0
Reporter: Chesnay Schepler


The TableSinkITCase contains tests to ensure that a job submission fails if the 
parallelism is exceedingly large.
The declarative scheduler is having none of that and just downscales the job to 
make it runnable, causing the tests to fail.

It may make sense to add a switch to disable downscaling (until we have a 
proper range of accepted parallelism), as this issue will also affect e2e tests 
where the start of processes takes so long that the resource timeout fires and 
the job is run with a lower parallelism, which the tests don't handle well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21402) Consolidate Scheduler/SlotPool factories

2021-02-18 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21402:


 Summary: Consolidate Scheduler/SlotPool factories
 Key: FLINK-21402
 URL: https://issues.apache.org/jira/browse/FLINK-21402
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Chesnay Schepler
 Fix For: 1.13.0


We have run into the unfortunate situation where different schedulers 
effectively work against different slot pool implementations and interfaces.
This makes it quite difficult to understand which slot pool factories can work 
with which scheduler.

We may be able to alleviate these issues by wrapping the factories for both 
into a single factory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tillrohrmann commented on pull request #14946: [FLINK-21381][docs] Add information about service account permissions to K8s HA service documentation

2021-02-18 Thread GitBox


tillrohrmann commented on pull request #14946:
URL: https://github.com/apache/flink/pull/14946#issuecomment-781536646


   Thanks for the review. Merging this PR now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-21401) Tests that manually build a JobGraph cannot use DeclarativeScheduler

2021-02-18 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21401:


 Summary: Tests that manually build a JobGraph cannot use 
DeclarativeScheduler
 Key: FLINK-21401
 URL: https://issues.apache.org/jira/browse/FLINK-21401
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Reporter: Chesnay Schepler
 Fix For: 1.13.0


The default job type in the JobGraph is set to Batch, as a result of which all 
tests that manualyl assemble a JobGraph by default cannot be run with the 
DeclarativeScheduler.

For example, this also affects the JobMasterTest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tillrohrmann closed pull request #14946: [FLINK-21381][docs] Add information about service account permissions to K8s HA service documentation

2021-02-18 Thread GitBox


tillrohrmann closed pull request #14946:
URL: https://github.com/apache/flink/pull/14946


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-21400) Attempt numbers are not maintained across restarts

2021-02-18 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21400:


 Summary: Attempt numbers are not maintained across restarts
 Key: FLINK-21400
 URL: https://issues.apache.org/jira/browse/FLINK-21400
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Chesnay Schepler
 Fix For: 1.13.0


The DeclarativeScheduler discards the ExecutionGraph on each restart attempt, 
as a result of which the attempt number remains 0.

Various tests use the attempt number to determine whether an exception should 
be thrown, and thus continue to throw exceptions on each restart.

Affected tests:
UnalignedCheckpointTestBase
UnalignedCheckpointITCase
ProcessingTimeWindowCheckpointingITCase
LocalRecoveryITCase
EventTimeWindowCheckpointingITCase
EventTimeAllWindowCheckpointingITCase
FileSinkITBase#testFileSink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-20580) Missing null value handling for SerializedValue's getByteArray()

2021-02-18 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-20580.
-
Fix Version/s: (was: 1.11.4)
   Resolution: Fixed

Fixed via

1.13.0:
bbdd769253b25b4093a1759c835c6ff1d99d390d
ed981be6601d3600d23b301d70d2b8aad3e6

1.12.3:
c8c790f6d726fd6942a2569dd97ed7a116092c4e
1d8a8e3529956f930725777499155d51aeb79045



> Missing null value handling for SerializedValue's getByteArray() 
> -
>
> Key: FLINK-20580
> URL: https://issues.apache.org/jira/browse/FLINK-20580
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.13.0
>Reporter: Matthias
>Assignee: Kezhu Wang
>Priority: Minor
>  Labels: pull-request-available, starter
> Fix For: 1.13.0, 1.12.3
>
>
> {{SerializedValue}} allows to wrap {{null}} values. Because of this, 
> {{SerializedValue.getByteArray()}} might return {{null}} which is not 
> properly handled in different locations (it's probably the best to use the 
> IDEs "Find usages" to identify these locations). The most recent findings 
> (for now) are listed in the comments.
> We should add null handling in these cases and add tests for these cases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21399) Adjust JobMasterTest#testRequestNextInputSplitWith*Failover

2021-02-18 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21399:


 Summary: Adjust 
JobMasterTest#testRequestNextInputSplitWith*Failover
 Key: FLINK-21399
 URL: https://issues.apache.org/jira/browse/FLINK-21399
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Reporter: Chesnay Schepler
 Fix For: 1.13.0


The testRequestNextInputSplitWithLocalFailover and 
testRequestNextInputSplitWithGlobalFailover tests fail because they request 
input splits right after starting the JobMaster, assuming splits to already be 
queryable.
The test should ensure that the job can actually be deployed, because only at 
that point would splits be requested in production, and there is no practical 
need for splits to exist earlier than that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #14944: [FLINK-21297] Support 'LOAD/UNLOAD MODULE' syntax

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14944:
URL: https://github.com/apache/flink/pull/14944#issuecomment-779315396


   
   ## CI report:
   
   * 2ee9ac64332d3547bf934d7cd17f73ee5404c8b1 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13459)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14943: [FLINK-21354] Implement a StateChangelogStateBackend to forward state changes to St…

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14943:
URL: https://github.com/apache/flink/pull/14943#issuecomment-779315285


   
   ## CI report:
   
   * f80b97c479ec82663e971553c54c8a6cda3122ff UNKNOWN
   * c9be932fec6390ffb103c000550cbc85747c266e Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13461)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-21398) Adjsut JobMasterTest#testRestoring[...]FromSavepoint

2021-02-18 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21398:


 Summary: Adjsut JobMasterTest#testRestoring[...]FromSavepoint
 Key: FLINK-21398
 URL: https://issues.apache.org/jira/browse/FLINK-21398
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Reporter: Chesnay Schepler
 Fix For: 1.13.0


The testRestoringFromSavepoint and testRestoringModifiedJobFromSavepoint tests 
fail with the DeclarativeScheduler because they assume that checkpoints are 
immediately read when the JobMaster has started, but with the 
DeclarativeScheduler this happens only once the job is being deployed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21398) Adjust JobMasterTest#testRestoring[...]FromSavepoint

2021-02-18 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-21398:
-
Summary: Adjust JobMasterTest#testRestoring[...]FromSavepoint  (was: Adjsut 
JobMasterTest#testRestoring[...]FromSavepoint)

> Adjust JobMasterTest#testRestoring[...]FromSavepoint
> 
>
> Key: FLINK-21398
> URL: https://issues.apache.org/jira/browse/FLINK-21398
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Tests
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.13.0
>
>
> The testRestoringFromSavepoint and testRestoringModifiedJobFromSavepoint 
> tests fail with the DeclarativeScheduler because they assume that checkpoints 
> are immediately read when the JobMaster has started, but with the 
> DeclarativeScheduler this happens only once the job is being deployed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tillrohrmann closed pull request #14936: [FLINK-20580][core] Don't support nullable value for SerializedValue

2021-02-18 Thread GitBox


tillrohrmann closed pull request #14936:
URL: https://github.com/apache/flink/pull/14936


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-20580) Missing null value handling for SerializedValue's getByteArray()

2021-02-18 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-20580:
--
Fix Version/s: 1.12.3
   1.13.0
   1.11.4

> Missing null value handling for SerializedValue's getByteArray() 
> -
>
> Key: FLINK-20580
> URL: https://issues.apache.org/jira/browse/FLINK-20580
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.13.0
>Reporter: Matthias
>Assignee: Kezhu Wang
>Priority: Minor
>  Labels: pull-request-available, starter
> Fix For: 1.11.4, 1.13.0, 1.12.3
>
>
> {{SerializedValue}} allows to wrap {{null}} values. Because of this, 
> {{SerializedValue.getByteArray()}} might return {{null}} which is not 
> properly handled in different locations (it's probably the best to use the 
> IDEs "Find usages" to identify these locations). The most recent findings 
> (for now) are listed in the comments.
> We should add null handling in these cases and add tests for these cases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] HuangZhenQiu commented on pull request #14961: [FLINK-21388] [flink-parquet] support DECIMAL parquet logical type when parquet primitive type is INT32

2021-02-18 Thread GitBox


HuangZhenQiu commented on pull request #14961:
URL: https://github.com/apache/flink/pull/14961#issuecomment-781519693


   @JingsongLi
   Would you please review this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on a change in pull request #14936: [FLINK-20580][core] Don't support nullable value for SerializedValue

2021-02-18 Thread GitBox


tillrohrmann commented on a change in pull request #14936:
URL: https://github.com/apache/flink/pull/14936#discussion_r578618616



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValue.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+/** A self-contained serialized value to decouple from user values and 
transfer on wire. */
+class AkkaRpcSerializedValue implements Serializable {
+@Nullable private final byte[] serializedData;

Review comment:
   Good point. Will add it as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14847:
URL: https://github.com/apache/flink/pull/14847#issuecomment-772387941


   
   ## CI report:
   
   * 9a2ea20ce0803e48edfc3ab7bcc02078b7410fbf UNKNOWN
   * 4aecfae5f4889de59c7f1d71d39647b6ee6f9ad8 UNKNOWN
   * c52c6fc43eb3e9698eec19542c3d47be5a70c514 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13396)
 
   * 36b3d657e3fa3151aac181d709706dadf58c9d0f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13464)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-16444) Count the read/write/seek/next latency of RocksDB as metrics

2021-02-18 Thread Yu Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286615#comment-17286615
 ] 

Yu Li commented on FLINK-16444:
---

[~yunta] Besides these metrics, let's also check and confirm whether more 
metrics worth being introduced referring to 
[KIP-607|https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB].

> Count the read/write/seek/next latency of RocksDB as metrics
> 
>
> Key: FLINK-16444
> URL: https://issues.apache.org/jira/browse/FLINK-16444
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.13.0
>
>
> Currently, user cannot know the read/write/seek/next latency of RocksDB, we 
> could add these helpful metrics to know the overall state performance. To not 
> affect the action performance much, we could introduce counter to only record 
> the latency at interval of some actions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] XComp commented on pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on pull request #14847:
URL: https://github.com/apache/flink/pull/14847#issuecomment-781491052


   The following state diagram reflect the most recent code changes to support 
the code review.
   
   
![stop-with-savepoint-states](https://user-images.githubusercontent.com/1101012/108392692-3f7c7c00-7213-11eb-82c8-edfafd86d1da.png)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578584522



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+private final Logger log;
+
+private final SchedulerBase scheduler;
+private final CheckpointCoordinator checkpointCoordinator;
+private final JobID jobId;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+private String path;
+private Set unfinishedStates;
+
+public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+this.jobId = jobId;
+this.scheduler = scheduler;
+this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+this.log = log;
+}
+
+@Override
+public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+final StopWithSavepointState oldState = state;
+state = state.onSavepointCreation(this, path, throwable);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+oldState,
+state);
+}
+
+@Override
+public synchronized void handleExecutionTermination(
+Collection executionStates) {
+final StopWithSavepointState oldState = state;
+state = state.onExecutionsTermination(this, executionStates);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+oldState,
+state);
+}
+
+@Override
+public CompletableFuture getResult() {
+return result;
+}
+
+private StopWithSavepointState terminateExceptionWithGlobalFailover(
+Iterable unfinishedExecutionStates) {
+String errorMessage =
+String.format(
+"Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+scheduler.handleGlobalFailure(inconsistentFinalStateException);
+return terminateExceptionally(inconsistentFinalStateException);
+}
+
+private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+scheduler.startCheckpointScheduler(checkpointCoordinator);
+result.completeExceptionally(throwable);
+
+return StopWithSavepointState.Final;
+}
+
+private StopWithSavepointState terminateSuccessfully(String path) {
+result.complete(path);
+
+return StopWithSavepointState.Final;
+}
+
+private static Set extractUnfinishedStates(
+Collection executionStates) {
+return executionStates.stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+}
+
+/**
+ * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+ * operation.
+ *
+ * The state transitions are implemented in the following 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578584067



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointContextTest} tests the stop-with-savepoint 
functionality of {@link
+ * SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointContextTest extends TestLogger {
+
+private JobGraph jobGraph;
+private DefaultScheduler scheduler;
+
+private StopWithSavepointOperations testInstance;
+
+@Before
+public void setup() throws Exception {
+jobGraph = new JobGraph();
+
+final JobVertex jobVertex = new JobVertex("vertex #0");
+jobVertex.setInvokableClass(NoOpInvokable.class);
+jobGraph.addVertex(jobVertex);
+
+// checkpointInterval has to be set to a value lower than 
Long.MAX_VALUE to enable
+// periodic checkpointing - only then can we enable/disable the 
CheckpointCoordinator
+SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, 
Long.MAX_VALUE - 1);
+scheduler =
+SchedulerTestingUtils.createSchedulerBuilder(
+jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+.setFutureExecutor(new 
DirectScheduledExecutorService())
+.build();
+scheduler.startScheduling();
+
+// the checkpoint scheduler is stopped before triggering the 
stop-with-savepoint
+disableCheckpointScheduler();
+
+testInstance = new StopWithSavepointContext(jobGraph.getJobID(), 
scheduler, this.log);
+}
+
+@Test
+public void testHappyPathWithSavepointCreationBeforeTermination() throws 
Exception {
+assertHappyPath(
+(savepointPath) -> {
+testInstance.handleSavepointCreation(savepointPath, null);
+testInstance.handleExecutionTermination(
+
Collections.singletonList(ExecutionState.FINISHED));
+});
+}
+
+@Test
+public void testHappyPathWithSavepointCreationAfterTermination() throws 
Exception {
+assertHappyPath(
+(savepointPath) -> {
+testInstance.handleExecutionTermination(
+
Collections.singletonLi

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578582587



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointContextTest} tests the stop-with-savepoint 
functionality of {@link
+ * SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointContextTest extends TestLogger {
+
+private JobGraph jobGraph;
+private DefaultScheduler scheduler;
+
+private StopWithSavepointOperations testInstance;
+
+@Before
+public void setup() throws Exception {
+jobGraph = new JobGraph();
+
+final JobVertex jobVertex = new JobVertex("vertex #0");
+jobVertex.setInvokableClass(NoOpInvokable.class);
+jobGraph.addVertex(jobVertex);
+
+// checkpointInterval has to be set to a value lower than 
Long.MAX_VALUE to enable
+// periodic checkpointing - only then can we enable/disable the 
CheckpointCoordinator
+SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, 
Long.MAX_VALUE - 1);
+scheduler =
+SchedulerTestingUtils.createSchedulerBuilder(
+jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+.setFutureExecutor(new 
DirectScheduledExecutorService())
+.build();
+scheduler.startScheduling();
+
+// the checkpoint scheduler is stopped before triggering the 
stop-with-savepoint
+disableCheckpointScheduler();
+
+testInstance = new StopWithSavepointContext(jobGraph.getJobID(), 
scheduler, this.log);

Review comment:
   You're right. I refactored the `StopWithSavepointContextTest`. It uses 
`TestingSchedulerNG` now instead of `DefaultScheduler` which removes some 
complexity from the test code.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578581744



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointOperations.java
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code StopWithSavepointOperations} collects the steps for creating a 
savepoint and waiting for
+ * the job to stop.
+ */
+public interface StopWithSavepointOperations {
+
+/**
+ * Handles the Savepoint creation termination.
+ *
+ * @param path the path to the newly created savepoint.
+ * @param throwable the {@code Throwable} in case of failure.

Review comment:
   I followed your advice and added to methods for the two cases. That 
helps differentiating the two cases! 👍 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578580800



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+private final Logger log;
+
+private final SchedulerBase scheduler;
+private final CheckpointCoordinator checkpointCoordinator;
+private final JobID jobId;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+private String path;
+private Set unfinishedStates;
+
+public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+this.jobId = jobId;
+this.scheduler = scheduler;
+this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+this.log = log;
+}
+
+@Override
+public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+final StopWithSavepointState oldState = state;
+state = state.onSavepointCreation(this, path, throwable);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+oldState,
+state);
+}
+
+@Override
+public synchronized void handleExecutionTermination(
+Collection executionStates) {
+final StopWithSavepointState oldState = state;
+state = state.onExecutionsTermination(this, executionStates);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+oldState,
+state);
+}
+
+@Override
+public CompletableFuture getResult() {
+return result;
+}
+
+private StopWithSavepointState terminateExceptionWithGlobalFailover(
+Iterable unfinishedExecutionStates) {
+String errorMessage =
+String.format(
+"Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+scheduler.handleGlobalFailure(inconsistentFinalStateException);
+return terminateExceptionally(inconsistentFinalStateException);
+}
+
+private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+scheduler.startCheckpointScheduler(checkpointCoordinator);
+result.completeExceptionally(throwable);
+
+return StopWithSavepointState.Final;
+}
+
+private StopWithSavepointState terminateSuccessfully(String path) {
+result.complete(path);
+
+return StopWithSavepointState.Final;
+}
+
+private static Set extractUnfinishedStates(
+Collection executionStates) {
+return executionStates.stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+}
+
+/**
+ * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+ * operation.
+ *
+ * The state transitions are implemented in the following 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578581053



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+private final Logger log;
+
+private final SchedulerBase scheduler;
+private final CheckpointCoordinator checkpointCoordinator;
+private final JobID jobId;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+private String path;
+private Set unfinishedStates;
+
+public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+this.jobId = jobId;
+this.scheduler = scheduler;
+this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+this.log = log;
+}
+
+@Override
+public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+final StopWithSavepointState oldState = state;
+state = state.onSavepointCreation(this, path, throwable);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+oldState,
+state);
+}
+
+@Override
+public synchronized void handleExecutionTermination(
+Collection executionStates) {
+final StopWithSavepointState oldState = state;
+state = state.onExecutionsTermination(this, executionStates);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+oldState,
+state);
+}
+
+@Override
+public CompletableFuture getResult() {
+return result;
+}
+
+private StopWithSavepointState terminateExceptionWithGlobalFailover(
+Iterable unfinishedExecutionStates) {
+String errorMessage =
+String.format(
+"Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+scheduler.handleGlobalFailure(inconsistentFinalStateException);
+return terminateExceptionally(inconsistentFinalStateException);
+}
+
+private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+scheduler.startCheckpointScheduler(checkpointCoordinator);
+result.completeExceptionally(throwable);
+
+return StopWithSavepointState.Final;
+}
+
+private StopWithSavepointState terminateSuccessfully(String path) {
+result.complete(path);
+
+return StopWithSavepointState.Final;
+}
+
+private static Set extractUnfinishedStates(
+Collection executionStates) {
+return executionStates.stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+}
+
+/**
+ * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+ * operation.
+ *
+ * The state transitions are implemented in the following 

[jira] [Resolved] (FLINK-19503) Add StateChangelog interface and its in-memory implementation

2021-02-18 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan resolved FLINK-19503.
---
Resolution: Fixed

> Add StateChangelog interface and its in-memory implementation
> -
>
> Key: FLINK-19503
> URL: https://issues.apache.org/jira/browse/FLINK-19503
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Task
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> StateChangelog is a component proposed in FLIP-158 to store state changes for 
> incremental checkpoints.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] rkhachatryan merged pull request #14838: [FLINK-19503][state] Add StateChangelog API

2021-02-18 Thread GitBox


rkhachatryan merged pull request #14838:
URL: https://github.com/apache/flink/pull/14838


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on pull request #14838: [FLINK-19503][state] Add StateChangelog API

2021-02-18 Thread GitBox


rkhachatryan commented on pull request #14838:
URL: https://github.com/apache/flink/pull/14838#issuecomment-781481465


   Test failure unrelated (FLINK-21277), merging.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14963: [FLINK-21362][coordination] Move State.onEnter() logic into constructor

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14963:
URL: https://github.com/apache/flink/pull/14963#issuecomment-781452668


   
   ## CI report:
   
   * 80874c3472c2d74de07c651d56c728e210e9db6d UNKNOWN
   * d6bbdeabce59f5556c0d1be6e88e098df84d7f98 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13462)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14847:
URL: https://github.com/apache/flink/pull/14847#issuecomment-772387941


   
   ## CI report:
   
   * 9a2ea20ce0803e48edfc3ab7bcc02078b7410fbf UNKNOWN
   * 4aecfae5f4889de59c7f1d71d39647b6ee6f9ad8 UNKNOWN
   * c52c6fc43eb3e9698eec19542c3d47be5a70c514 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13396)
 
   * 36b3d657e3fa3151aac181d709706dadf58c9d0f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #14944: [FLINK-21297] Support 'LOAD/UNLOAD MODULE' syntax

2021-02-18 Thread GitBox


wuchong commented on a change in pull request #14944:
URL: https://github.com/apache/flink/pull/14944#discussion_r578568850



##
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
##
@@ -546,13 +555,16 @@ class TableEnvironmentTest {
 |)
   """.stripMargin
 
-val code = new Callable[TableResult] {
-  override def call(): TableResult = tableEnv.executeSql(statement1)
+try {
+  tableEnv.executeSql(statement1)
+} catch {
+  case e: TableException =>
+assertTrue(e.getMessage.contains("Could not execute LOAD MODULE: 
(moduleName: [Dummy], " +
+  "properties: [{dummy-version=1}]). Could not find a suitable table 
factory for " +
+  "'org.apache.flink.table.factories.ModuleFactory' in\nthe 
classpath."))
+  case e =>
+fail("This should not happen, " + e.getMessage)

Review comment:
   ```scala
   try {
 tableEnv.executeSql(statement1)
 fail("Expected an exception")
   } catch {
 case t: Throwable =>
   assertThat(t, containsMessage("Could not execute LOAD MODULE: 
(moduleName: [Dummy], " +
 "properties: [{dummy-version=1}]). Could not find a suitable table 
factory for " +
 "'org.apache.flink.table.factories.ModuleFactory' in\nthe 
classpath."))
   }
   ```
   
   We should add `fail` before catch and we can use 
`org.apache.flink.core.testutils.FlinkMatchers#containsMessage` here for a 
better error message when mismatching. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578568213



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -908,49 +909,38 @@ public void reportCheckpointMetrics(
 // will be restarted by the CheckpointCoordinatorDeActivator.
 checkpointCoordinator.stopCheckpointScheduler();
 
+final CompletableFuture> 
executionTerminationsFuture =
+getCombinedExecutionTerminationFuture();
+
 final CompletableFuture savepointFuture =
 checkpointCoordinator
 .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
 .thenApply(CompletedCheckpoint::getExternalPointer);
 
-final CompletableFuture terminationFuture =
-executionGraph
-.getTerminationFuture()
-.handle(
-(jobstatus, throwable) -> {
-if (throwable != null) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: {}",
-jobGraph.getJobID(),
-throwable.getMessage());
-throw new 
CompletionException(throwable);
-} else if (jobstatus != 
JobStatus.FINISHED) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-jobGraph.getJobID(),
-jobstatus);
-throw new CompletionException(
-new FlinkException(
-"Reached state "
-+ jobstatus
-+ " instead of 
FINISHED."));
-}
-return jobstatus;
-});
-
-return savepointFuture
-.thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
-.handleAsync(
-(path, throwable) -> {
-if (throwable != null) {
-// restart the checkpoint coordinator if 
stopWithSavepoint failed.
-
startCheckpointScheduler(checkpointCoordinator);
-throw new CompletionException(throwable);
-}
+StopWithSavepointContext stopWithSavepointContext =

Review comment:
   I renamed the class into `StopWithSavepointOperationsImpl`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578567820



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -817,6 +818,126 @@ public void 
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
 assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
 }
 
+@Test
+public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+// we allow restarts right from the start since the failure is going 
to happen in the first
+// phase (savepoint creation) of stop-with-savepoint
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+final JobGraph jobGraph = createTwoVertexJobGraph();
+// set checkpoint timeout to a low value to simulate checkpoint 
expiration
+enableCheckpointing(jobGraph, 10);
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// we have to set a listener that checks for the termination of the 
checkpoint handling
+OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+checkpointAbortionWasTriggered.trigger());
+
+// the failure handling has to happen in the same thread as the 
checkpoint coordination -
+// that's why we have to instantiate a separate ThreadExecutorService 
here
+final ScheduledExecutorService singleThreadExecutorService =
+Executors.newSingleThreadScheduledExecutor();
+final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+singleThreadExecutorService);
+
+final DefaultScheduler scheduler =
+CompletableFuture.supplyAsync(
+() ->
+createSchedulerAndStartScheduling(
+jobGraph, mainThreadExecutor),
+mainThreadExecutor)
+.get();
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+final CompletableFuture stopWithSavepointFuture =
+CompletableFuture.supplyAsync(
+() -> {
+// we have to make sure that the tasks are 
running before
+// stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+failingExecutionAttemptId,
+ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+
succeedingExecutionAttemptId,
+ExecutionState.RUNNING));
+
+return 
scheduler.stopWithSavepoint("savepoint-path", false);
+},
+mainThreadExecutor)
+.get();
+
+checkpointTriggeredLatch.await();
+
+final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+final AcknowledgeCheckpoint acknowledgeCheckpoint =
+new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+// we need to wait for the expired checkpoint to be handled
+checkpointAbortionWasTriggered.await();
+
+CompletableFuture.runAsync(
+() -> {
+scheduler.updateTaskExecutionState(
+ 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578567121



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -817,6 +818,126 @@ public void 
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
 assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
 }
 
+@Test
+public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+// we allow restarts right from the start since the failure is going 
to happen in the first
+// phase (savepoint creation) of stop-with-savepoint
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+final JobGraph jobGraph = createTwoVertexJobGraph();
+// set checkpoint timeout to a low value to simulate checkpoint 
expiration
+enableCheckpointing(jobGraph, 10);
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// we have to set a listener that checks for the termination of the 
checkpoint handling
+OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+checkpointAbortionWasTriggered.trigger());
+
+// the failure handling has to happen in the same thread as the 
checkpoint coordination -
+// that's why we have to instantiate a separate ThreadExecutorService 
here
+final ScheduledExecutorService singleThreadExecutorService =
+Executors.newSingleThreadScheduledExecutor();
+final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+singleThreadExecutorService);
+
+final DefaultScheduler scheduler =
+CompletableFuture.supplyAsync(
+() ->
+createSchedulerAndStartScheduling(
+jobGraph, mainThreadExecutor),
+mainThreadExecutor)
+.get();
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+final CompletableFuture stopWithSavepointFuture =
+CompletableFuture.supplyAsync(
+() -> {
+// we have to make sure that the tasks are 
running before
+// stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+failingExecutionAttemptId,
+ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+
succeedingExecutionAttemptId,
+ExecutionState.RUNNING));
+
+return 
scheduler.stopWithSavepoint("savepoint-path", false);
+},
+mainThreadExecutor)
+.get();
+
+checkpointTriggeredLatch.await();
+
+final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+final AcknowledgeCheckpoint acknowledgeCheckpoint =
+new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+// we need to wait for the expired checkpoint to be handled
+checkpointAbortionWasTriggered.await();
+
+CompletableFuture.runAsync(
+() -> {
+scheduler.updateTaskExecutionState(
+ 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578561800



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -835,8 +837,15 @@ public void updateAccumulators(final AccumulatorSnapshot 
accumulatorSnapshot) {
 mainThreadExecutor);
 }
 
-private void startCheckpointScheduler(final CheckpointCoordinator 
checkpointCoordinator) {
+@Override

Review comment:
   The `CheckpointCoordinator` does not change during the lifetime of an 
`ExecutionGraph`. That's why, I thought of making this change to introduce the 
`CheckpointScheduling` interface.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14963: [FLINK-21362][coordination] Move State.onEnter() logic into constructor

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14963:
URL: https://github.com/apache/flink/pull/14963#issuecomment-781452668


   
   ## CI report:
   
   * 80874c3472c2d74de07c651d56c728e210e9db6d UNKNOWN
   * d6bbdeabce59f5556c0d1be6e88e098df84d7f98 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shmyer commented on pull request #14084: [FLINK-15867][table-planner-blink] Support time-related types for FIRST_VALUE and LAST_VALUE aggregate functions

2021-02-18 Thread GitBox


shmyer commented on pull request #14084:
URL: https://github.com/apache/flink/pull/14084#issuecomment-781454473


   @leonardBang Sorry for the late response, I kind of lost sight of this PR :( 
I can't seem to find the test comment you mentioned, where would I find that?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #14963: [FLINK-21362][coordination] Move State.onEnter() logic into constructor

2021-02-18 Thread GitBox


flinkbot commented on pull request #14963:
URL: https://github.com/apache/flink/pull/14963#issuecomment-781452668


   
   ## CI report:
   
   * 80874c3472c2d74de07c651d56c728e210e9db6d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14954: [FLINK-17061][runtime] Unset TM total process/flink memory size for fine-grained resource management

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14954:
URL: https://github.com/apache/flink/pull/14954#issuecomment-781063841


   
   ## CI report:
   
   * 709ba9478b5c13b183bf66d73294d8133438a8f5 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13454)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14961: [FLINK-21388] [flink-parquet] support DECIMAL parquet logical type when parquet primitive type is INT32

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14961:
URL: https://github.com/apache/flink/pull/14961#issuecomment-781278553


   
   ## CI report:
   
   * a35e534ab18f7d4c80a8ee52d7dd70b7e3a0f2a0 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13455)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14960: fix scrolling issue in Firefox

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14960:
URL: https://github.com/apache/flink/pull/14960#issuecomment-781267077


   
   ## CI report:
   
   * c195cee17a3fa141bcf497f137495a54e55041f7 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13452)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14953: [FLINK-21351][checkpointing] Don't subsume last checkpoint

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14953:
URL: https://github.com/apache/flink/pull/14953#issuecomment-780752668


   
   ## CI report:
   
   * 941374ecdd03b5ceb9245e4d8a8370671b691a34 UNKNOWN
   * 07d008483dd496a4b89fe34e799511e3e2670fb8 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13451)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-21359) CompatibilityResult issues with Flink 1.9.0

2021-02-18 Thread Siva (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286558#comment-17286558
 ] 

Siva commented on FLINK-21359:
--

It is working fine with Flink 1.7.0 and emr 5.21.0

we are using BytesDeserializerSchema implements 
KeyedDeserializationSchema with Flink 1.7.0 and emr 5.21.0

 

It is *NOT* working fine with Flink 1.9.0 and emr 5.28.0

we are using BytesDeserializerSchema implements 
KafkaDeserializationSchema with Flink 1.9.0 and emr 5.28.0

> CompatibilityResult issues with Flink 1.9.0
> ---
>
> Key: FLINK-21359
> URL: https://issues.apache.org/jira/browse/FLINK-21359
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
> Environment: DEV
>Reporter: Siva
>Priority: Major
>
> I am using emr 5.28.0 and flink 1.9.0
>  
> Source code is working fine with emr 5.11.0 and flink 1.3.2, but the same 
> source code is throwing the following stack track with emr 5.28.0 and flink 
> 1.9.0
>  
> java.lang.NoClassDefFoundError: 
> org/apache/flink/api/common/typeutils/CompatibilityResult
>  at java.lang.Class.getDeclaredMethods0(Native Method)
>  at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>  at java.lang.Class.getDeclaredMethod(Class.java:2128)
>  at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1643)
>  at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
>  at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>  at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at java.io.ObjectStreamClass.(ObjectStreamClass.java:494)
>  at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
>  at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>  at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>  at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>  at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
>  at 
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:515)
>  at 
> org.apache.flink.streaming.api.graph.StreamConfig.setTypeSerializer(StreamConfig.java:193)
>  at 
> org.apache.flink.streaming.api.graph.StreamConfig.setTypeSerializerIn1(StreamConfig.java:143)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:438)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:272)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:238)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:238)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:243)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:207)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:159)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:94)
>  at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:737)
>  at 
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:88)
>  at 
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
>  at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
>  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: java.lang.ClassNotFoundException: 

[jira] [Issue Comment Deleted] (FLINK-21359) CompatibilityResult issues with Flink 1.9.0

2021-02-18 Thread Siva (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Siva updated FLINK-21359:
-
Comment: was deleted

(was: BytesDeserializerSchema cannot be cast to 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema

 

i am getting the above error when i am working with emr 5.21.0 and flink 1.7.0)

> CompatibilityResult issues with Flink 1.9.0
> ---
>
> Key: FLINK-21359
> URL: https://issues.apache.org/jira/browse/FLINK-21359
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
> Environment: DEV
>Reporter: Siva
>Priority: Major
>
> I am using emr 5.28.0 and flink 1.9.0
>  
> Source code is working fine with emr 5.11.0 and flink 1.3.2, but the same 
> source code is throwing the following stack track with emr 5.28.0 and flink 
> 1.9.0
>  
> java.lang.NoClassDefFoundError: 
> org/apache/flink/api/common/typeutils/CompatibilityResult
>  at java.lang.Class.getDeclaredMethods0(Native Method)
>  at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>  at java.lang.Class.getDeclaredMethod(Class.java:2128)
>  at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1643)
>  at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
>  at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>  at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at java.io.ObjectStreamClass.(ObjectStreamClass.java:494)
>  at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
>  at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>  at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>  at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>  at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
>  at 
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:515)
>  at 
> org.apache.flink.streaming.api.graph.StreamConfig.setTypeSerializer(StreamConfig.java:193)
>  at 
> org.apache.flink.streaming.api.graph.StreamConfig.setTypeSerializerIn1(StreamConfig.java:143)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:438)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:272)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:238)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:238)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:243)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:207)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:159)
>  at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:94)
>  at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:737)
>  at 
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:88)
>  at 
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
>  at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
>  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.api.common.typeutils.CompatibilityResult
>  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>  at java.lang.ClassLoader.lo

[jira] [Assigned] (FLINK-21101) Set up cron job to run CI with declarative scheduler

2021-02-18 Thread Robert Metzger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reassigned FLINK-21101:
--

Assignee: Robert Metzger

> Set up cron job to run CI with declarative scheduler
> 
>
> Key: FLINK-21101
> URL: https://issues.apache.org/jira/browse/FLINK-21101
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Till Rohrmann
>Assignee: Robert Metzger
>Priority: Major
> Fix For: 1.13.0
>
>
> Once the declarative scheduler has been merged, we should create a Cron job 
> to run all CI profiles with this scheduler in order to find all remaining 
> test failures.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #14963: [FLINK-21362][coordination] Move State.onEnter() logic into constructor

2021-02-18 Thread GitBox


flinkbot commented on pull request #14963:
URL: https://github.com/apache/flink/pull/14963#issuecomment-781429547


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 80874c3472c2d74de07c651d56c728e210e9db6d (Thu Feb 18 
15:35:56 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-21362) Remove State#onEnter

2021-02-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-21362:
---
Labels: pull-request-available  (was: )

> Remove State#onEnter
> 
>
> Key: FLINK-21362
> URL: https://issues.apache.org/jira/browse/FLINK-21362
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Chesnay Schepler
>Assignee: Robert Metzger
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> Since we no longer need to construct the new state before calling 
> {{State#onLeave}} we could remove {{State#onEnter}} and move all contained 
> logic into the constructor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] rmetzger opened a new pull request #14963: [FLINK-21362][coordination] Move State.onEnter() logic into constructor

2021-02-18 Thread GitBox


rmetzger opened a new pull request #14963:
URL: https://github.com/apache/flink/pull/14963


   ## What is the purpose of the change
   
   Increase robustness by moving the onEnter into the constructor of the states.
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-18634) FlinkKafkaProducerITCase.testRecoverCommittedTransaction failed with "Timeout expired after 60000milliseconds while awaiting InitProducerId"

2021-02-18 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286534#comment-17286534
 ] 

Dong Lin commented on FLINK-18634:
--

Sounds good. I will try to upgrade Kafka dependency to 2.5.1.

> FlinkKafkaProducerITCase.testRecoverCommittedTransaction failed with "Timeout 
> expired after 6milliseconds while awaiting InitProducerId"
> 
>
> Key: FLINK-18634
> URL: https://issues.apache.org/jira/browse/FLINK-18634
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.11.0, 1.12.0, 1.13.0
>Reporter: Dian Fu
>Assignee: Jiangjie Qin
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=4590&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=684b1416-4c17-504e-d5ab-97ee44e08a20
> {code}
> 2020-07-17T11:43:47.9693015Z [ERROR] Tests run: 12, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 269.399 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase
> 2020-07-17T11:43:47.9693862Z [ERROR] 
> testRecoverCommittedTransaction(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>   Time elapsed: 60.679 s  <<< ERROR!
> 2020-07-17T11:43:47.9694737Z org.apache.kafka.common.errors.TimeoutException: 
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 2020-07-17T11:43:47.9695376Z Caused by: 
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #14962: [FLINK-18789][sql-client] Use TableEnvironment#executeSql method to execute insert statement in sql client

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14962:
URL: https://github.com/apache/flink/pull/14962#issuecomment-781306516


   
   ## CI report:
   
   * fda5571674ea74fb5c104180e3790d45c600fbea Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13458)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-02-18 Thread GitBox


flinkbot edited a comment on pull request #14950:
URL: https://github.com/apache/flink/pull/14950#issuecomment-780431068


   
   ## CI report:
   
   * 1b43a411d22af19f12f14a79c1094c8dc5cfeb42 UNKNOWN
   * a8191e6800961467c726695c8ef7575d157a Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13449)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




<    1   2   3   4   5   >