[flink] branch master updated: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

2022-04-22 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new cb68ccf1b2c [FLINK-27187][state/changelog] Add changelog storage 
metric totalAttemptsPerUpload
cb68ccf1b2c is described below

commit cb68ccf1b2cb879148fb17d2fd6394e15d1ae46c
Author: wangfeifan 
AuthorDate: Tue Apr 12 16:49:45 2022 +0800

[FLINK-27187][state/changelog] Add changelog storage metric 
totalAttemptsPerUpload
---
 docs/content.zh/docs/ops/metrics.md|   5 +
 docs/content/docs/ops/metrics.md   |   5 +
 .../fs/BatchingStateChangeUploadScheduler.java |   5 +-
 .../changelog/fs/ChangelogStorageMetricGroup.java  |  11 +++
 .../flink/changelog/fs/RetryingExecutor.java   |  52 --
 .../fs/BatchingStateChangeUploadSchedulerTest.java |  34 ---
 .../changelog/fs/ChangelogStorageMetricsTest.java  | 110 -
 .../flink/changelog/fs/RetryingExecutorTest.java   |   8 +-
 8 files changed, 201 insertions(+), 29 deletions(-)

diff --git a/docs/content.zh/docs/ops/metrics.md 
b/docs/content.zh/docs/ops/metrics.md
index 85369590a5d..dbb6b1c3bd7 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1292,6 +1292,11 @@ Note that the metrics are only available via reporters.
   The number of attempts per upload
   Histogram
 
+
+  totalAttemptsPerUpload
+  The total count distributions of attempts for per upload
+  Histogram
+
 
   uploadBatchSizes
   The number of upload tasks (coming from one or more writers, i.e. 
backends/tasks) that were grouped together and form a single upload resulting 
in a single file
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index de466a6a27f..10e33513c23 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1285,6 +1285,11 @@ Note that the metrics are only available via reporters.
   The number of attempts per upload
   Histogram
 
+
+  totalAttemptsPerUpload
+  The total count distributions of attempts for per upload
+  Histogram
+
 
   uploadBatchSizes
   The number of upload tasks (coming from one or more writers, i.e. 
backends/tasks) that were grouped together and form a single upload resulting 
in a single file
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
index 3bec459e03e..3495c6bc525 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
@@ -114,7 +114,10 @@ class BatchingStateChangeUploadScheduler implements 
StateChangeUploadScheduler {
 retryPolicy,
 delegate,
 SchedulerFactory.create(1, "ChangelogUploadScheduler", LOG),
-new RetryingExecutor(numUploadThreads, 
metricGroup.getAttemptsPerUpload()),
+new RetryingExecutor(
+numUploadThreads,
+metricGroup.getAttemptsPerUpload(),
+metricGroup.getTotalAttemptsPerUpload()),
 metricGroup);
 }
 
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStorageMetricGroup.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStorageMetricGroup.java
index 318eec92f48..12754838474 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStorageMetricGroup.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStorageMetricGroup.java
@@ -42,6 +42,7 @@ public class ChangelogStorageMetricGroup extends 
ProxyMetricGroup {
 private final Histogram uploadSizes;
 private final Histogram uploadLatenciesNanos;
 private final Histogram attemptsPerUpload;
+private final Histogram totalAttemptsPerUpload;
 
 public ChangelogStorageMetricGroup(MetricGroup parent) {
 super(parent);
@@ -55,6 +56,10 @@ public class ChangelogStorageMetricGroup extends 
ProxyMetricGroup {
 histogram(
 CHANGELOG_STORAGE_ATTEMPTS_PER_UPLOAD,
 new DescriptiveStatisticsHistogram(WINDOW_SIZE));
+this.totalAttemptsPerUpload =
+histogram(
+CHANGELOG_STORAGE_TOTAL_ATTEMPTS_PER_UPLOAD,
+new DescriptiveStatisticsHistogram(WINDOW_SIZE));
 this.uploadSizes =
 histogram(
 

[flink] branch release-1.15 updated: [FLINK-27218] fix the problem that the internal Serializer in OperatorState may have not been updated when sechema changes

2022-04-22 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
 new 3d4b3a495b2 [FLINK-27218] fix the problem that the internal Serializer 
in OperatorState may have not been updated when sechema changes
3d4b3a495b2 is described below

commit 3d4b3a495b273c3a15ce7d35ba5a5b2e4ddc4c20
Author: mayuehappy 
AuthorDate: Mon Apr 18 21:44:56 2022 +0800

[FLINK-27218] fix the problem that the internal Serializer in OperatorState 
may have not been updated when sechema changes
---
 .../flink/runtime/state/HeapBroadcastState.java| 11 -
 .../runtime/state/PartitionableListState.java  | 10 -
 .../state/StateBackendMigrationTestBase.java   | 49 ++
 3 files changed, 68 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
index 14e72f4cfdd..78ff61945b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -46,7 +47,7 @@ public class HeapBroadcastState implements 
BackendWritableBroadcastState backingMap;
 
 /** A serializer that allows to perform deep copies of internal map state. 
*/
-private final MapSerializer internalMapCopySerializer;
+private MapSerializer internalMapCopySerializer;
 
 HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo 
stateMetaInfo) {
 this(stateMetaInfo, new HashMap<>());
@@ -71,6 +72,9 @@ public class HeapBroadcastState implements 
BackendWritableBroadcastState 
stateMetaInfo) {
+this.internalMapCopySerializer =
+new MapSerializer<>(
+stateMetaInfo.getKeySerializer(), 
stateMetaInfo.getValueSerializer());
 this.stateMetaInfo = stateMetaInfo;
 }
 
@@ -154,4 +158,9 @@ public class HeapBroadcastState implements 
BackendWritableBroadcastState> immutableEntries() {
 return Collections.unmodifiableSet(backingMap.entrySet());
 }
+
+@VisibleForTesting
+public MapSerializer getInternalMapCopySerializer() {
+return internalMapCopySerializer;
+}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
index 12ef958a8f8..7cb363b91c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.memory.DataOutputView;
@@ -42,7 +43,7 @@ public final class PartitionableListState implements 
ListState {
 private final ArrayList internalList;
 
 /** A typeSerializer that allows to perform deep copies of internalList */
-private final ArrayListSerializer internalListCopySerializer;
+private ArrayListSerializer internalListCopySerializer;
 
 PartitionableListState(RegisteredOperatorStateBackendMetaInfo 
stateMetaInfo) {
 this(stateMetaInfo, new ArrayList());
@@ -65,6 +66,8 @@ public final class PartitionableListState implements 
ListState {
 }
 
 public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo 
stateMetaInfo) {
+this.internalListCopySerializer =
+new 
ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
 this.stateMetaInfo = stateMetaInfo;
 }
 
@@ -130,4 +133,9 @@ public final class PartitionableListState implements 
ListState {
 internalList.addAll(values);
 }
 }
+
+@VisibleForTesting
+public ArrayListSerializer getInternalListCopySerializer() {
+return internalListCopySerializer;
+}
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index 733f39da47d..aafab4a5f2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++ 

[flink] branch release-1.13 updated: [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source

2022-04-22 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
 new 79a86f35fb3 [FLINK-22984][python] Don't pushdown Calc containing 
Python UDFs into table source
79a86f35fb3 is described below

commit 79a86f35fb321cb5f8dd40442db8c6bafb00153c
Author: Juntao Hu 
AuthorDate: Thu Apr 21 19:38:01 2022 +0800

[FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table 
source

This closes #19551.
---
 ...WatermarkIntoTableSourceScanAcrossCalcRule.java |  7 ++-
 .../PushWatermarkIntoTableSourceScanRuleTest.java  | 22 ++
 .../utils/JavaUserDefinedScalarFunctions.java  | 22 ++
 .../PushWatermarkIntoTableSourceScanRuleTest.xml   | 21 +
 4 files changed, 71 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
index 18539f08e20..d7311cc9ab1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
@@ -37,6 +37,8 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall;
+
 /**
  * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link 
FlinkLogicalCalc} to the
  * {@link FlinkLogicalTableSourceScan}. The rule will first look for the 
computed column in the
@@ -62,7 +64,10 @@ public class PushWatermarkIntoTableSourceScanAcrossCalcRule
 @Override
 public boolean matches(RelOptRuleCall call) {
 FlinkLogicalTableSourceScan scan = call.rel(2);
-return supportsWatermarkPushDown(scan);
+FlinkLogicalCalc calc = call.rel(1);
+return supportsWatermarkPushDown(scan)
+&& calc.getProgram().getExprList().stream()
+.noneMatch(rexNode -> containsPythonCall(rexNode, 
null));
 }
 
 @Override
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
index e47a110687f..f6e7f99ef5c 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgr
 import 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder;
 import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
 import 
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5;
 import org.apache.flink.table.planner.utils.StreamTableTestUtil;
 import org.apache.flink.table.planner.utils.TableTestBase;
@@ -256,4 +257,25 @@ public class PushWatermarkIntoTableSourceScanRuleTest 
extends TableTestBase {
 util.tableEnv().executeSql(ddl);
 util.verifyRelPlan("select a, c from MyTable");
 }
+
+@Test
+public void testWatermarkWithPythonFunctionInComputedColumn() {
+util.tableEnv()
+.createTemporaryFunction(
+"parse_ts",
+new 
JavaUserDefinedScalarFunctions.PythonTimestampScalarFunction());
+String ddl =
+"CREATE TABLE MyTable("
++ "  a INT,\n"
++ "  b AS parse_ts(a),\n"
++ "  WATERMARK FOR b AS b\n"
++ ") WITH (\n"
++ " 'connector' = 'values',\n"
++ " 'enable-watermark-push-down' = 'true',\n"
++ " 'bounded' = 'false',\n"
++ " 'disable-lookup' = 'true'"
++ ")";
+util.tableEnv().executeSql(ddl);
+

[flink] branch master updated: [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source

2022-04-22 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 7ce5a7c6e1e [FLINK-22984][python] Don't pushdown Calc containing 
Python UDFs into table source
7ce5a7c6e1e is described below

commit 7ce5a7c6e1eab6823094a94bc0bca30d0ee618f1
Author: Juntao Hu 
AuthorDate: Thu Apr 21 19:38:01 2022 +0800

[FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table 
source

This closes #19551.
---
 ...WatermarkIntoTableSourceScanAcrossCalcRule.java |  7 ++-
 .../PushWatermarkIntoTableSourceScanRuleTest.java  | 22 ++
 .../utils/JavaUserDefinedScalarFunctions.java  | 21 +
 .../PushWatermarkIntoTableSourceScanRuleTest.xml   | 21 +
 4 files changed, 70 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
index 18539f08e20..d7311cc9ab1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
@@ -37,6 +37,8 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall;
+
 /**
  * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link 
FlinkLogicalCalc} to the
  * {@link FlinkLogicalTableSourceScan}. The rule will first look for the 
computed column in the
@@ -62,7 +64,10 @@ public class PushWatermarkIntoTableSourceScanAcrossCalcRule
 @Override
 public boolean matches(RelOptRuleCall call) {
 FlinkLogicalTableSourceScan scan = call.rel(2);
-return supportsWatermarkPushDown(scan);
+FlinkLogicalCalc calc = call.rel(1);
+return supportsWatermarkPushDown(scan)
+&& calc.getProgram().getExprList().stream()
+.noneMatch(rexNode -> containsPythonCall(rexNode, 
null));
 }
 
 @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
index cdef53073e2..1c3ead462be 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgr
 import 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder;
 import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
 import 
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5;
 import org.apache.flink.table.planner.utils.StreamTableTestUtil;
 import org.apache.flink.table.planner.utils.TableTestBase;
@@ -253,4 +254,25 @@ public class PushWatermarkIntoTableSourceScanRuleTest 
extends TableTestBase {
 util.tableEnv().executeSql(ddl);
 util.verifyRelPlan("select a, c from MyTable");
 }
+
+@Test
+public void testWatermarkWithPythonFunctionInComputedColumn() {
+util.tableEnv()
+.createTemporaryFunction(
+"parse_ts",
+new 
JavaUserDefinedScalarFunctions.PythonTimestampScalarFunction());
+String ddl =
+"CREATE TABLE MyTable("
++ "  a INT,\n"
++ "  b AS parse_ts(a),\n"
++ "  WATERMARK FOR b AS b\n"
++ ") WITH (\n"
++ " 'connector' = 'values',\n"
++ " 'enable-watermark-push-down' = 'true',\n"
++ " 'bounded' = 'false',\n"
++ " 'disable-lookup' = 'true'"
++ ")";
+util.tableEnv().executeSql(ddl);
+util.verifyRelPlan("SELECT * FROM MyTable");
+}
 }
diff --git 

[flink] branch release-1.14 updated: [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source

2022-04-22 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
 new 0806ad5a154 [FLINK-22984][python] Don't pushdown Calc containing 
Python UDFs into table source
0806ad5a154 is described below

commit 0806ad5a154e37d09b53ce56d59cec8dc11209da
Author: Juntao Hu 
AuthorDate: Thu Apr 21 19:38:01 2022 +0800

[FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table 
source

This closes #19551.
---
 ...WatermarkIntoTableSourceScanAcrossCalcRule.java |  7 ++-
 .../PushWatermarkIntoTableSourceScanRuleTest.java  | 22 ++
 .../utils/JavaUserDefinedScalarFunctions.java  | 21 +
 .../PushWatermarkIntoTableSourceScanRuleTest.xml   | 21 +
 4 files changed, 70 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
index 18539f08e20..d7311cc9ab1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
@@ -37,6 +37,8 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall;
+
 /**
  * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link 
FlinkLogicalCalc} to the
  * {@link FlinkLogicalTableSourceScan}. The rule will first look for the 
computed column in the
@@ -62,7 +64,10 @@ public class PushWatermarkIntoTableSourceScanAcrossCalcRule
 @Override
 public boolean matches(RelOptRuleCall call) {
 FlinkLogicalTableSourceScan scan = call.rel(2);
-return supportsWatermarkPushDown(scan);
+FlinkLogicalCalc calc = call.rel(1);
+return supportsWatermarkPushDown(scan)
+&& calc.getProgram().getExprList().stream()
+.noneMatch(rexNode -> containsPythonCall(rexNode, 
null));
 }
 
 @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
index e47a110687f..f6e7f99ef5c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgr
 import 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder;
 import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
 import 
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5;
 import org.apache.flink.table.planner.utils.StreamTableTestUtil;
 import org.apache.flink.table.planner.utils.TableTestBase;
@@ -256,4 +257,25 @@ public class PushWatermarkIntoTableSourceScanRuleTest 
extends TableTestBase {
 util.tableEnv().executeSql(ddl);
 util.verifyRelPlan("select a, c from MyTable");
 }
+
+@Test
+public void testWatermarkWithPythonFunctionInComputedColumn() {
+util.tableEnv()
+.createTemporaryFunction(
+"parse_ts",
+new 
JavaUserDefinedScalarFunctions.PythonTimestampScalarFunction());
+String ddl =
+"CREATE TABLE MyTable("
++ "  a INT,\n"
++ "  b AS parse_ts(a),\n"
++ "  WATERMARK FOR b AS b\n"
++ ") WITH (\n"
++ " 'connector' = 'values',\n"
++ " 'enable-watermark-push-down' = 'true',\n"
++ " 'bounded' = 'false',\n"
++ " 'disable-lookup' = 'true'"
++ ")";
+util.tableEnv().executeSql(ddl);
+util.verifyRelPlan("SELECT * FROM MyTable");
+}
 }

[flink] branch release-1.15 updated: [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source

2022-04-22 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
 new 703b10ca5d0 [FLINK-22984][python] Don't pushdown Calc containing 
Python UDFs into table source
703b10ca5d0 is described below

commit 703b10ca5d004e8e79059e814fcf8503f84e2da8
Author: Juntao Hu 
AuthorDate: Thu Apr 21 19:38:01 2022 +0800

[FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table 
source

This closes #19551.
---
 ...WatermarkIntoTableSourceScanAcrossCalcRule.java |  7 ++-
 .../PushWatermarkIntoTableSourceScanRuleTest.java  | 22 ++
 .../utils/JavaUserDefinedScalarFunctions.java  | 21 +
 .../PushWatermarkIntoTableSourceScanRuleTest.xml   | 21 +
 4 files changed, 70 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
index 18539f08e20..d7311cc9ab1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
@@ -37,6 +37,8 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall;
+
 /**
  * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link 
FlinkLogicalCalc} to the
  * {@link FlinkLogicalTableSourceScan}. The rule will first look for the 
computed column in the
@@ -62,7 +64,10 @@ public class PushWatermarkIntoTableSourceScanAcrossCalcRule
 @Override
 public boolean matches(RelOptRuleCall call) {
 FlinkLogicalTableSourceScan scan = call.rel(2);
-return supportsWatermarkPushDown(scan);
+FlinkLogicalCalc calc = call.rel(1);
+return supportsWatermarkPushDown(scan)
+&& calc.getProgram().getExprList().stream()
+.noneMatch(rexNode -> containsPythonCall(rexNode, 
null));
 }
 
 @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
index 58215cb2d74..db31b45e7b7 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgr
 import 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder;
 import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
 import 
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5;
 import org.apache.flink.table.planner.utils.StreamTableTestUtil;
 import org.apache.flink.table.planner.utils.TableTestBase;
@@ -253,4 +254,25 @@ public class PushWatermarkIntoTableSourceScanRuleTest 
extends TableTestBase {
 util.tableEnv().executeSql(ddl);
 util.verifyRelPlan("select a, c from MyTable");
 }
+
+@Test
+public void testWatermarkWithPythonFunctionInComputedColumn() {
+util.tableEnv()
+.createTemporaryFunction(
+"parse_ts",
+new 
JavaUserDefinedScalarFunctions.PythonTimestampScalarFunction());
+String ddl =
+"CREATE TABLE MyTable("
++ "  a INT,\n"
++ "  b AS parse_ts(a),\n"
++ "  WATERMARK FOR b AS b\n"
++ ") WITH (\n"
++ " 'connector' = 'values',\n"
++ " 'enable-watermark-push-down' = 'true',\n"
++ " 'bounded' = 'false',\n"
++ " 'disable-lookup' = 'true'"
++ ")";
+util.tableEnv().executeSql(ddl);
+util.verifyRelPlan("SELECT * FROM MyTable");
+}
 }

[flink] branch master updated: [FLINK-27218] fix the problem that the internal Serializer in OperatorState may have not been updated when sechema changes

2022-04-22 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 4033ddc5fa6 [FLINK-27218] fix the problem that the internal Serializer 
in OperatorState may have not been updated when sechema changes
4033ddc5fa6 is described below

commit 4033ddc5fa682a1619f8f22348e2ee38afcc1c85
Author: mayuehappy 
AuthorDate: Mon Apr 18 21:44:56 2022 +0800

[FLINK-27218] fix the problem that the internal Serializer in OperatorState 
may have not been updated when sechema changes
---
 .../flink/runtime/state/HeapBroadcastState.java| 11 -
 .../runtime/state/PartitionableListState.java  | 10 -
 .../state/StateBackendMigrationTestBase.java   | 49 ++
 3 files changed, 68 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
index 14e72f4cfdd..78ff61945b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -46,7 +47,7 @@ public class HeapBroadcastState implements 
BackendWritableBroadcastState backingMap;
 
 /** A serializer that allows to perform deep copies of internal map state. 
*/
-private final MapSerializer internalMapCopySerializer;
+private MapSerializer internalMapCopySerializer;
 
 HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo 
stateMetaInfo) {
 this(stateMetaInfo, new HashMap<>());
@@ -71,6 +72,9 @@ public class HeapBroadcastState implements 
BackendWritableBroadcastState 
stateMetaInfo) {
+this.internalMapCopySerializer =
+new MapSerializer<>(
+stateMetaInfo.getKeySerializer(), 
stateMetaInfo.getValueSerializer());
 this.stateMetaInfo = stateMetaInfo;
 }
 
@@ -154,4 +158,9 @@ public class HeapBroadcastState implements 
BackendWritableBroadcastState> immutableEntries() {
 return Collections.unmodifiableSet(backingMap.entrySet());
 }
+
+@VisibleForTesting
+public MapSerializer getInternalMapCopySerializer() {
+return internalMapCopySerializer;
+}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
index 12ef958a8f8..7cb363b91c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.memory.DataOutputView;
@@ -42,7 +43,7 @@ public final class PartitionableListState implements 
ListState {
 private final ArrayList internalList;
 
 /** A typeSerializer that allows to perform deep copies of internalList */
-private final ArrayListSerializer internalListCopySerializer;
+private ArrayListSerializer internalListCopySerializer;
 
 PartitionableListState(RegisteredOperatorStateBackendMetaInfo 
stateMetaInfo) {
 this(stateMetaInfo, new ArrayList());
@@ -65,6 +66,8 @@ public final class PartitionableListState implements 
ListState {
 }
 
 public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo 
stateMetaInfo) {
+this.internalListCopySerializer =
+new 
ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
 this.stateMetaInfo = stateMetaInfo;
 }
 
@@ -130,4 +133,9 @@ public final class PartitionableListState implements 
ListState {
 internalList.addAll(values);
 }
 }
+
+@VisibleForTesting
+public ArrayListSerializer getInternalListCopySerializer() {
+return internalListCopySerializer;
+}
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index 733f39da47d..aafab4a5f2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ 

[flink] branch release-1.15 updated: [examples][python] Add examples on how to use json/csv/avro formats in Python DataStream API

2022-04-22 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
 new fa8fbf5bc29 [examples][python] Add examples on how to use 
json/csv/avro formats in Python DataStream API
fa8fbf5bc29 is described below

commit fa8fbf5bc29b1432e1baaefe3049b50ffeca706a
Author: Dian Fu 
AuthorDate: Fri Apr 22 22:26:44 2022 +0800

[examples][python] Add examples on how to use json/csv/avro formats in 
Python DataStream API
---
 .../examples/datastream/formats/__init__.py| 17 
 .../examples/datastream/formats/avro_format.py | 91 ++
 .../examples/datastream/formats/csv_format.py  | 72 +
 .../examples/datastream/formats/json_format.py | 73 +
 4 files changed, 253 insertions(+)

diff --git a/flink-python/pyflink/examples/datastream/formats/__init__.py 
b/flink-python/pyflink/examples/datastream/formats/__init__.py
new file mode 100644
index 000..65b48d4d79b
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/formats/__init__.py
@@ -0,0 +1,17 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
diff --git a/flink-python/pyflink/examples/datastream/formats/avro_format.py 
b/flink-python/pyflink/examples/datastream/formats/avro_format.py
new file mode 100644
index 000..9e3b2658d89
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/formats/avro_format.py
@@ -0,0 +1,91 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import sys
+
+from pyflink.common import AvroRowSerializationSchema, Types, 
AvroRowDeserializationSchema
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import FlinkKafkaProducer, 
FlinkKafkaConsumer
+
+
+# Make sure that the Kafka cluster is started and the topic 'test_avro_topic' 
is
+# created before executing this job.
+def write_to_kafka(env):
+ds = env.from_collection([
+(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 
'hello'), (6, 'hello')],
+type_info=Types.ROW([Types.INT(), Types.STRING()]))
+
+serialization_schema = AvroRowSerializationSchema(
+avro_schema_string="""
+{
+"type": "record",
+"name": "TestRecord",
+"fields": [
+{"name": "id", "type": "int"},
+{"name": "name", "type": "string"}
+]
+}"""
+)
+
+kafka_producer = FlinkKafkaProducer(
+topic='test_avro_topic',
+serialization_schema=serialization_schema,
+producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group'}
+)
+
+# note that the output type of ds must be RowTypeInfo
+ds.add_sink(kafka_producer)
+env.execute()
+
+
+def read_from_kafka(env):
+deserialization_schema = AvroRowDeserializationSchema(
+avro_schema_string="""
+{
+"type": "record",
+"name": "TestRecord",

[flink] branch master updated (cc6d1a27baf -> bdd91141658)

2022-04-22 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from cc6d1a27baf [FLINK-27321][ci] Migrate java-ci-tools to JUnit 5
 add bdd91141658 [examples][python] Add examples on how to use 
json/csv/avro formats in Python DataStream API

No new revisions were added by this update.

Summary of changes:
 .../examples/datastream/formats}/__init__.py   |  0
 .../examples/datastream/formats/avro_format.py | 91 ++
 .../examples/datastream/formats/csv_format.py  | 72 +
 .../examples/datastream/formats/json_format.py | 73 +
 4 files changed, 236 insertions(+)
 copy {flink-end-to-end-tests/flink-python-test/python/datastream => 
flink-python/pyflink/examples/datastream/formats}/__init__.py (100%)
 create mode 100644 
flink-python/pyflink/examples/datastream/formats/avro_format.py
 create mode 100644 
flink-python/pyflink/examples/datastream/formats/csv_format.py
 create mode 100644 
flink-python/pyflink/examples/datastream/formats/json_format.py



[flink-ml] branch master updated (cccd47c -> aedf663)

2022-04-22 Thread hxb
This is an automated email from the ASF dual-hosted git repository.

hxb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


from cccd47c  Bump junit from 4.12 to 4.13.1 (#79)
 add aedf663  [FLINK-26268][ml][python] Add classfication algorithm support 
for LogisticRegression, KNN and NaiveBayes in ML Python API

No new revisions were added by this update.

Summary of changes:
 .github/workflows/python-checks.yml|   6 +
 flink-ml-python/dev/dev-requirements.txt   |   2 +-
 flink-ml-python/pyflink/ml/__init__.py |  44 +
 flink-ml-python/pyflink/ml/core/api.py |   5 +-
 flink-ml-python/pyflink/ml/core/builder.py |  11 +-
 flink-ml-python/pyflink/ml/core/linalg.py  | 133 +++
 .../pyflink/ml/{ => core}/tests/__init__.py|   7 +-
 .../pyflink/ml/{ => core}/tests/test_linalg.py |   0
 .../pyflink/ml/{ => core}/tests/test_param.py  |   0
 .../pyflink/ml/{ => core}/tests/test_pipeline.py   |   9 +-
 .../pyflink/ml/{ => core}/tests/test_stage.py  |   4 +-
 flink-ml-python/pyflink/ml/core/wrapper.py |  18 +-
 .../ml => ml/lib/classification}/__init__.py   |   0
 .../pyflink/ml/lib/classification/common.py|  74 +
 .../pyflink/ml/lib/classification/knn.py   | 103 
 .../ml/lib/classification/logisticregression.py|  97 +++
 .../pyflink/ml/lib/classification/naivebayes.py| 119 +
 .../ml/{ => lib/classification}/tests/__init__.py  |   7 +-
 .../ml/lib/classification/tests/test_knn.py| 166 ++
 .../tests/test_logisticregression.py   | 185 +
 .../ml/lib/classification/tests/test_naivebayes.py | 120 +
 flink-ml-python/pyflink/ml/tests/test_utils.py |  30 +++-
 .../pyflink/ml/util/read_write_utils.py|  14 +-
 flink-ml-python/setup.py   |   2 +-
 pom.xml|   2 +-
 25 files changed, 1120 insertions(+), 38 deletions(-)
 copy flink-ml-python/pyflink/ml/{ => core}/tests/__init__.py (89%)
 rename flink-ml-python/pyflink/ml/{ => core}/tests/test_linalg.py (100%)
 rename flink-ml-python/pyflink/ml/{ => core}/tests/test_param.py (100%)
 rename flink-ml-python/pyflink/ml/{ => core}/tests/test_pipeline.py (92%)
 rename flink-ml-python/pyflink/ml/{ => core}/tests/test_stage.py (98%)
 copy flink-ml-python/pyflink/{examples/ml => 
ml/lib/classification}/__init__.py (100%)
 create mode 100644 flink-ml-python/pyflink/ml/lib/classification/common.py
 create mode 100644 flink-ml-python/pyflink/ml/lib/classification/knn.py
 create mode 100644 
flink-ml-python/pyflink/ml/lib/classification/logisticregression.py
 create mode 100644 flink-ml-python/pyflink/ml/lib/classification/naivebayes.py
 copy flink-ml-python/pyflink/ml/{ => lib/classification}/tests/__init__.py 
(89%)
 create mode 100644 
flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py
 create mode 100644 
flink-ml-python/pyflink/ml/lib/classification/tests/test_logisticregression.py
 create mode 100644 
flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py



[flink-training] branch master updated: [FLINK-26382] Add Chinese documents for flink-training exercises (#46)

2022-04-22 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git


The following commit(s) were added to refs/heads/master by this push:
 new aca6c47  [FLINK-26382] Add Chinese documents for flink-training 
exercises (#46)
aca6c47 is described below

commit aca6c47b79d486eb38969492c7e2dc8cb200d146
Author: T.C 
AuthorDate: Fri Apr 22 17:09:26 2022 +0800

[FLINK-26382] Add Chinese documents for flink-training exercises (#46)

Co-authored-by: Victor Xu 
Co-authored-by: Nico Kruber 
---
 README.md   |   2 +
 README_zh.md| 274 
 build.gradle|   2 +-
 hourly-tips/DISCUSSION.md   |   2 +
 hourly-tips/{DISCUSSION.md => DISCUSSION_zh.md} |  46 ++--
 hourly-tips/README.md   |   2 +
 hourly-tips/README_zh.md|  82 +++
 long-ride-alerts/DISCUSSION.md  |   2 +
 long-ride-alerts/DISCUSSION_zh.md   |  50 +
 long-ride-alerts/README.md  |   2 +
 long-ride-alerts/{README.md => README_zh.md}|  68 +++---
 ride-cleansing/README.md|   2 +
 ride-cleansing/{README.md => README_zh.md}  |  55 ++---
 rides-and-fares/README.md   |   2 +
 rides-and-fares/README_zh.md|  95 
 15 files changed, 601 insertions(+), 85 deletions(-)

diff --git a/README.md b/README.md
index 0fc84e1..b1a65cb 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+[中文版](./README_zh.md)
+
 # Apache Flink Training Exercises
 
 Exercises that accompany the training content in the documentation.
diff --git a/README_zh.md b/README_zh.md
new file mode 100644
index 000..2af9bba
--- /dev/null
+++ b/README_zh.md
@@ -0,0 +1,274 @@
+
+
+# Apache Flink 实践练习
+
+与文档中实践练习内容相关的练习。
+
+## 目录
+
+[**设置开发环境**](#set-up-your-development-environment)
+
+1. [软件要求](#software-requirements)
+1. [克隆并构建 flink-training 项目](#clone-and-build-the-flink-training-project)
+1. [将 flink-training 项目导入 
IDE](#import-the-flink-training-project-into-your-ide)
+
+[**使用出租车数据流(taxi data stream)**](#using-the-taxi-data-streams)
+
+1. [出租车车程(taxi ride)事件结构](#schema-of-taxi-ride-events)
+1. [出租车费用(taxi fare)事件结构](#schema-of-taxi-fare-events)
+
+[**如何做练习**](#how-to-do-the-lab-exercises)
+
+1. [了解数据](#learn-about-the-data)
+2. [在 IDE 中运行和调试 Flink 程序](#run-and-debug-flink-programs-in-your-ide)
+3. [练习、测试及解决方案](#exercises-tests-and-solutions)
+
+[**练习**](#lab-exercises)
+
+[**提交贡献**](#contributing)
+
+[**许可证**](#license)
+
+
+
+## 设置开发环境
+
+你需要设置便于进行开发、调试并运行实践练习的示例和解决方案的环境。
+
+
+
+### 软件要求
+
+Linux、OS X 和 Windows 均可作为 Flink 程序和本地执行的开发环境。 Flink 开发设置需要以下软件,它们应该安装在系统上:
+
+- Git
+- Java 8 或者 Java 11 版本的 JDK (JRE不满足要求;目前不支持其他版本的Java)
+- 支持 Gradle 的 Java (及/或 Scala) 开发IDE
+- 推荐使用 [IntelliJ](https://www.jetbrains.com/idea/), 但 
[Eclipse](https://www.eclipse.org/downloads/) 或 [Visual Studio 
Code](https://code.visualstudio.com/) (安装 [Java extension 
pack](https://code.visualstudio.com/docs/java/java-tutorial) 插件) 也可以用于Java环境
+- 为了使用 Scala, 需要使用 IntelliJ (及其 [Scala 
plugin](https://plugins.jetbrains.com/plugin/1347-scala/) 插件)
+
+> **:information_source: Windows 用户须知:** 实践说明中提供的 shell 命令示例适用于 UNIX 环境。
+> 您可能会发现值得在 Windows 环境中设置 cygwin 或 WSL。对于开发 Flink 
作业(jobs),Windows工作的相当好:可以在单机上运行 Flink 集群、提交作业、运行 webUI 并在IDE中执行作业。
+
+
+
+### 克隆并构建 flink-training 项目
+
+`flink-training` 仓库包含编程练习的习题、测试和参考解决方案。
+
+> **:information_source: 仓库格局:** 本仓库有几个分支,分别指向不同的 Apache Flink 版本,类似于 
[apache/flink](https://github.com/apache/flink) 仓库:
+> - 每个 Apache Flink 次要版本的发布分支,例如 `release-1.10`,和
+> - 一个指向当前 Flink 版本的 `master` 分支(不是 `flink:master`!)
+>
+> 如果想在当前 Flink 版本以外的版本上工作,请务必签出相应的分支。
+
+从 GitHub 克隆出 `flink-training` 仓库,导航到本地项目仓库并构建它:
+
+```bash
+git clone https://github.com/apache/flink-training.git
+cd flink-training
+./gradlew test shadowJar
+```
+
+如果是第一次构建,将会下载此 Flink 练习项目的所有依赖项。这通常需要几分钟时间,但具体取决于互联网连接速度。
+
+如果所有测试都通过并且构建成功,这说明你的实践练习已经开了一个好头。
+
+
+:cn: 中国用户: 点击这里了解如何使用本地 Maven 镜像。
+
+如果你在中国,我们建议将 Maven 存储库配置为使用镜像。 可以通过在 [`build.gradle`](build.gradle) 
文件中取消注释此部分来做到这一点:
+
+```groovy
+repositories {
+// for access from China, you may need to uncomment this line
+maven { url 'https://maven.aliyun.com/repository/public/' }
+mavenCentral()
+maven {
+url "https://repository.apache.org/content/repositories/snapshots/;
+mavenContent {
+snapshotsOnly()
+}
+}
+}
+```
+
+
+
+启用 Scala (可选)
+这个项目中的练习也可以使用 Scala ,但由于非 Scala 用户报告的一些问题,我们决定默认禁用 Scala。
+可以通过以下的方法修改 `gradle.properties` 文件以重新启用所有 Scala 练习和解决方案:
+
+[`gradle.properties`](gradle.properties) 文件如下:
+
+```properties
+#...
+
+# Scala exercises 

[flink-training] branch release-1.14 updated: fixup! [FLINK-26382]Add Chinese documents for flink-training exercises (#46)

2022-04-22 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink-training.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
 new 18e6db2  fixup! [FLINK-26382]Add Chinese documents for flink-training 
exercises (#46)
18e6db2 is described below

commit 18e6db2206ca4156e21276b14d35bebaf222c151
Author: Nico Kruber 
AuthorDate: Fri Apr 22 11:15:17 2022 +0200

fixup! [FLINK-26382]Add Chinese documents for flink-training exercises (#46)
---
 long-ride-alerts/README_zh.md | 2 +-
 rides-and-fares/README_zh.md  | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/long-ride-alerts/README_zh.md b/long-ride-alerts/README_zh.md
index 85912da..3b1d060 100644
--- a/long-ride-alerts/README_zh.md
+++ b/long-ride-alerts/README_zh.md
@@ -44,7 +44,7 @@ END 事件可能会丢失,但你可以假设没有重复的事件,也没有
 ## 入门指南
 
 > :information_source: 最好在 IDE 的 flink-training 项目中找到这些类,而不是使用本节中源文件的链接。
-> 
+>
 ### 练习相关类
 
 - Java:  
[`org.apache.flink.training.exercises.longrides.LongRidesExercise`](src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java)
diff --git a/rides-and-fares/README_zh.md b/rides-and-fares/README_zh.md
index a1dff45..f54852f 100644
--- a/rides-and-fares/README_zh.md
+++ b/rides-and-fares/README_zh.md
@@ -27,7 +27,7 @@ under the License.
 1. `TaxiRide` END 事件
 1. 一个 `TaxiFare` 事件(其时间戳恰好与开始时间匹配)
 
-最终的结果应该是 `DataStream`,每个不同的 `rideId` 都产生一个 `RideAndFare` 记录。 
+最终的结果应该是 `DataStream`,每个不同的 `rideId` 都产生一个 `RideAndFare` 记录。
 每个 `RideAndFare` 都应该将某个 `rideId` 的 `TaxiRide` START 事件与其匹配的 `TaxiFare` 配对。
 
 ### 输入数据
@@ -37,7 +37,7 @@ under the License.
 
 ### 期望输出
 
-所希望的结果是一个 `RideAndFare` 记录的数据流,每个不同的 `rideId` 都有一条这样的记录。 
+所希望的结果是一个 `RideAndFare` 记录的数据流,每个不同的 `rideId` 都有一条这样的记录。
 本练习设置为忽略 END 事件,你应该连接每次乘车的 START 事件及其相应的车费事件。
 
 一旦具有了相互关联的车程和车费事件,你可以使用 `new RideAndFare(ride, fare)` 方法为输出流创建所需的对象。
@@ -76,7 +76,7 @@ under the License.
 ## 讨论
 
 出于练习的目的,可以假设 START 和 fare 事件完美配对。
-但是在现实世界的应用程序中,你应该担心每当一个事件丢失时,同一个 `rideId` 的另一个事件的状态将被永远保持。 
+但是在现实世界的应用程序中,你应该担心每当一个事件丢失时,同一个 `rideId` 的另一个事件的状态将被永远保持。
 在 [稍后的练习](../long-ride-alerts/README_zh.md) 中,我们将看到 `ProcessFunction` 
和定时器,它们将有助于处理这样的情况。
 
 ## 相关文档



[flink-training] branch release-1.14 updated: [FLINK-26382]Add Chinese documents for flink-training exercises (#46)

2022-04-22 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink-training.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
 new 0132dd7  [FLINK-26382]Add Chinese documents for flink-training 
exercises (#46)
0132dd7 is described below

commit 0132dd7be8c881607f9a374613309493ade8c6dd
Author: T.C 
AuthorDate: Fri Apr 22 17:09:26 2022 +0800

[FLINK-26382]Add Chinese documents for flink-training exercises (#46)

Co-authored-by: Victor Xu 
---
 README.md   |   2 +
 README_zh.md| 274 
 hourly-tips/DISCUSSION.md   |   2 +
 hourly-tips/{DISCUSSION.md => DISCUSSION_zh.md} |  46 ++--
 hourly-tips/README.md   |   2 +
 hourly-tips/README_zh.md|  82 +++
 long-ride-alerts/DISCUSSION.md  |   2 +
 long-ride-alerts/DISCUSSION_zh.md   |  50 +
 long-ride-alerts/README.md  |   2 +
 long-ride-alerts/{README.md => README_zh.md}|  68 +++---
 ride-cleansing/README.md|   2 +
 ride-cleansing/{README.md => README_zh.md}  |  55 ++---
 rides-and-fares/README.md   |   2 +
 rides-and-fares/README_zh.md|  95 
 14 files changed, 600 insertions(+), 84 deletions(-)

diff --git a/README.md b/README.md
index 0fc84e1..b1a65cb 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+[中文版](./README_zh.md)
+
 # Apache Flink Training Exercises
 
 Exercises that accompany the training content in the documentation.
diff --git a/README_zh.md b/README_zh.md
new file mode 100644
index 000..2af9bba
--- /dev/null
+++ b/README_zh.md
@@ -0,0 +1,274 @@
+
+
+# Apache Flink 实践练习
+
+与文档中实践练习内容相关的练习。
+
+## 目录
+
+[**设置开发环境**](#set-up-your-development-environment)
+
+1. [软件要求](#software-requirements)
+1. [克隆并构建 flink-training 项目](#clone-and-build-the-flink-training-project)
+1. [将 flink-training 项目导入 
IDE](#import-the-flink-training-project-into-your-ide)
+
+[**使用出租车数据流(taxi data stream)**](#using-the-taxi-data-streams)
+
+1. [出租车车程(taxi ride)事件结构](#schema-of-taxi-ride-events)
+1. [出租车费用(taxi fare)事件结构](#schema-of-taxi-fare-events)
+
+[**如何做练习**](#how-to-do-the-lab-exercises)
+
+1. [了解数据](#learn-about-the-data)
+2. [在 IDE 中运行和调试 Flink 程序](#run-and-debug-flink-programs-in-your-ide)
+3. [练习、测试及解决方案](#exercises-tests-and-solutions)
+
+[**练习**](#lab-exercises)
+
+[**提交贡献**](#contributing)
+
+[**许可证**](#license)
+
+
+
+## 设置开发环境
+
+你需要设置便于进行开发、调试并运行实践练习的示例和解决方案的环境。
+
+
+
+### 软件要求
+
+Linux、OS X 和 Windows 均可作为 Flink 程序和本地执行的开发环境。 Flink 开发设置需要以下软件,它们应该安装在系统上:
+
+- Git
+- Java 8 或者 Java 11 版本的 JDK (JRE不满足要求;目前不支持其他版本的Java)
+- 支持 Gradle 的 Java (及/或 Scala) 开发IDE
+- 推荐使用 [IntelliJ](https://www.jetbrains.com/idea/), 但 
[Eclipse](https://www.eclipse.org/downloads/) 或 [Visual Studio 
Code](https://code.visualstudio.com/) (安装 [Java extension 
pack](https://code.visualstudio.com/docs/java/java-tutorial) 插件) 也可以用于Java环境
+- 为了使用 Scala, 需要使用 IntelliJ (及其 [Scala 
plugin](https://plugins.jetbrains.com/plugin/1347-scala/) 插件)
+
+> **:information_source: Windows 用户须知:** 实践说明中提供的 shell 命令示例适用于 UNIX 环境。
+> 您可能会发现值得在 Windows 环境中设置 cygwin 或 WSL。对于开发 Flink 
作业(jobs),Windows工作的相当好:可以在单机上运行 Flink 集群、提交作业、运行 webUI 并在IDE中执行作业。
+
+
+
+### 克隆并构建 flink-training 项目
+
+`flink-training` 仓库包含编程练习的习题、测试和参考解决方案。
+
+> **:information_source: 仓库格局:** 本仓库有几个分支,分别指向不同的 Apache Flink 版本,类似于 
[apache/flink](https://github.com/apache/flink) 仓库:
+> - 每个 Apache Flink 次要版本的发布分支,例如 `release-1.10`,和
+> - 一个指向当前 Flink 版本的 `master` 分支(不是 `flink:master`!)
+>
+> 如果想在当前 Flink 版本以外的版本上工作,请务必签出相应的分支。
+
+从 GitHub 克隆出 `flink-training` 仓库,导航到本地项目仓库并构建它:
+
+```bash
+git clone https://github.com/apache/flink-training.git
+cd flink-training
+./gradlew test shadowJar
+```
+
+如果是第一次构建,将会下载此 Flink 练习项目的所有依赖项。这通常需要几分钟时间,但具体取决于互联网连接速度。
+
+如果所有测试都通过并且构建成功,这说明你的实践练习已经开了一个好头。
+
+
+:cn: 中国用户: 点击这里了解如何使用本地 Maven 镜像。
+
+如果你在中国,我们建议将 Maven 存储库配置为使用镜像。 可以通过在 [`build.gradle`](build.gradle) 
文件中取消注释此部分来做到这一点:
+
+```groovy
+repositories {
+// for access from China, you may need to uncomment this line
+maven { url 'https://maven.aliyun.com/repository/public/' }
+mavenCentral()
+maven {
+url "https://repository.apache.org/content/repositories/snapshots/;
+mavenContent {
+snapshotsOnly()
+}
+}
+}
+```
+
+
+
+启用 Scala (可选)
+这个项目中的练习也可以使用 Scala ,但由于非 Scala 用户报告的一些问题,我们决定默认禁用 Scala。
+可以通过以下的方法修改 `gradle.properties` 文件以重新启用所有 Scala 练习和解决方案:
+
+[`gradle.properties`](gradle.properties) 文件如下:
+
+```properties
+#...
+
+# Scala exercises can be enabled by setting this to true
+org.gradle.project.enable_scala = true

[flink-kubernetes-operator] branch main updated (805fef5 -> f7ee710)

2022-04-22 Thread wangyang0918
This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


from 805fef5  [FLINK-27023] Unify flink and operator configuration
 add f7ee710  [FLINK-27161] Support to fetch user jar from different 
sources for session job

No new revisions were added by this update.

Summary of changes:
 docs/content/docs/operations/configuration.md  |   1 +
 .../flink/kubernetes/operator/FlinkOperator.java   |   5 +
 .../{JarResolver.java => ArtifactFetcher.java} |  26 ++--
 .../operator/artifact/ArtifactManager.java |  74 ++
 .../artifact/FileSystemBasedArtifactFetcher.java   |  52 +++
 .../operator/artifact/HttpArtifactFetcher.java |  50 +++
 .../config/FlinkOperatorConfiguration.java |   8 +-
 .../config/KubernetesOperatorConfigOptions.java|   6 +
 .../kubernetes/operator/service/FlinkService.java  |  21 +--
 .../kubernetes/operator/TestingFlinkService.java   |   3 +-
 .../operator/artifact/ArtifactManagerTest.java | 154 +
 .../operator/artifact/JarResolverTest.java |  35 -
 12 files changed, 376 insertions(+), 59 deletions(-)
 rename 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/{JarResolver.java
 => ArtifactFetcher.java} (65%)
 create mode 100644 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManager.java
 create mode 100644 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/FileSystemBasedArtifactFetcher.java
 create mode 100644 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/HttpArtifactFetcher.java
 create mode 100644 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManagerTest.java
 delete mode 100644 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/JarResolverTest.java