[flink] branch master updated: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new cb68ccf1b2c [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload cb68ccf1b2c is described below commit cb68ccf1b2cb879148fb17d2fd6394e15d1ae46c Author: wangfeifan AuthorDate: Tue Apr 12 16:49:45 2022 +0800 [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload --- docs/content.zh/docs/ops/metrics.md| 5 + docs/content/docs/ops/metrics.md | 5 + .../fs/BatchingStateChangeUploadScheduler.java | 5 +- .../changelog/fs/ChangelogStorageMetricGroup.java | 11 +++ .../flink/changelog/fs/RetryingExecutor.java | 52 -- .../fs/BatchingStateChangeUploadSchedulerTest.java | 34 --- .../changelog/fs/ChangelogStorageMetricsTest.java | 110 - .../flink/changelog/fs/RetryingExecutorTest.java | 8 +- 8 files changed, 201 insertions(+), 29 deletions(-) diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md index 85369590a5d..dbb6b1c3bd7 100644 --- a/docs/content.zh/docs/ops/metrics.md +++ b/docs/content.zh/docs/ops/metrics.md @@ -1292,6 +1292,11 @@ Note that the metrics are only available via reporters. The number of attempts per upload Histogram + + totalAttemptsPerUpload + The total count distributions of attempts for per upload + Histogram + uploadBatchSizes The number of upload tasks (coming from one or more writers, i.e. backends/tasks) that were grouped together and form a single upload resulting in a single file diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md index de466a6a27f..10e33513c23 100644 --- a/docs/content/docs/ops/metrics.md +++ b/docs/content/docs/ops/metrics.md @@ -1285,6 +1285,11 @@ Note that the metrics are only available via reporters. The number of attempts per upload Histogram + + totalAttemptsPerUpload + The total count distributions of attempts for per upload + Histogram + uploadBatchSizes The number of upload tasks (coming from one or more writers, i.e. backends/tasks) that were grouped together and form a single upload resulting in a single file diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java index 3bec459e03e..3495c6bc525 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java @@ -114,7 +114,10 @@ class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler { retryPolicy, delegate, SchedulerFactory.create(1, "ChangelogUploadScheduler", LOG), -new RetryingExecutor(numUploadThreads, metricGroup.getAttemptsPerUpload()), +new RetryingExecutor( +numUploadThreads, +metricGroup.getAttemptsPerUpload(), +metricGroup.getTotalAttemptsPerUpload()), metricGroup); } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStorageMetricGroup.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStorageMetricGroup.java index 318eec92f48..12754838474 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStorageMetricGroup.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStorageMetricGroup.java @@ -42,6 +42,7 @@ public class ChangelogStorageMetricGroup extends ProxyMetricGroup { private final Histogram uploadSizes; private final Histogram uploadLatenciesNanos; private final Histogram attemptsPerUpload; +private final Histogram totalAttemptsPerUpload; public ChangelogStorageMetricGroup(MetricGroup parent) { super(parent); @@ -55,6 +56,10 @@ public class ChangelogStorageMetricGroup extends ProxyMetricGroup { histogram( CHANGELOG_STORAGE_ATTEMPTS_PER_UPLOAD, new DescriptiveStatisticsHistogram(WINDOW_SIZE)); +this.totalAttemptsPerUpload = +histogram( +CHANGELOG_STORAGE_TOTAL_ATTEMPTS_PER_UPLOAD, +new DescriptiveStatisticsHistogram(WINDOW_SIZE)); this.uploadSizes = histogram(
[flink] branch release-1.15 updated: [FLINK-27218] fix the problem that the internal Serializer in OperatorState may have not been updated when sechema changes
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.15 by this push: new 3d4b3a495b2 [FLINK-27218] fix the problem that the internal Serializer in OperatorState may have not been updated when sechema changes 3d4b3a495b2 is described below commit 3d4b3a495b273c3a15ce7d35ba5a5b2e4ddc4c20 Author: mayuehappy AuthorDate: Mon Apr 18 21:44:56 2022 +0800 [FLINK-27218] fix the problem that the internal Serializer in OperatorState may have not been updated when sechema changes --- .../flink/runtime/state/HeapBroadcastState.java| 11 - .../runtime/state/PartitionableListState.java | 10 - .../state/StateBackendMigrationTestBase.java | 49 ++ 3 files changed, 68 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java index 14e72f4cfdd..78ff61945b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.typeutils.base.MapSerializer; import org.apache.flink.core.fs.FSDataOutputStream; @@ -46,7 +47,7 @@ public class HeapBroadcastState implements BackendWritableBroadcastState backingMap; /** A serializer that allows to perform deep copies of internal map state. */ -private final MapSerializer internalMapCopySerializer; +private MapSerializer internalMapCopySerializer; HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo stateMetaInfo) { this(stateMetaInfo, new HashMap<>()); @@ -71,6 +72,9 @@ public class HeapBroadcastState implements BackendWritableBroadcastState stateMetaInfo) { +this.internalMapCopySerializer = +new MapSerializer<>( +stateMetaInfo.getKeySerializer(), stateMetaInfo.getValueSerializer()); this.stateMetaInfo = stateMetaInfo; } @@ -154,4 +158,9 @@ public class HeapBroadcastState implements BackendWritableBroadcastState> immutableEntries() { return Collections.unmodifiableSet(backingMap.entrySet()); } + +@VisibleForTesting +public MapSerializer getInternalMapCopySerializer() { +return internalMapCopySerializer; +} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java index 12ef958a8f8..7cb363b91c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.ListState; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.memory.DataOutputView; @@ -42,7 +43,7 @@ public final class PartitionableListState implements ListState { private final ArrayList internalList; /** A typeSerializer that allows to perform deep copies of internalList */ -private final ArrayListSerializer internalListCopySerializer; +private ArrayListSerializer internalListCopySerializer; PartitionableListState(RegisteredOperatorStateBackendMetaInfo stateMetaInfo) { this(stateMetaInfo, new ArrayList()); @@ -65,6 +66,8 @@ public final class PartitionableListState implements ListState { } public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo stateMetaInfo) { +this.internalListCopySerializer = +new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer()); this.stateMetaInfo = stateMetaInfo; } @@ -130,4 +133,9 @@ public final class PartitionableListState implements ListState { internalList.addAll(values); } } + +@VisibleForTesting +public ArrayListSerializer getInternalListCopySerializer() { +return internalListCopySerializer; +} } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java index 733f39da47d..aafab4a5f2d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java +++
[flink] branch release-1.13 updated: [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.13 by this push: new 79a86f35fb3 [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source 79a86f35fb3 is described below commit 79a86f35fb321cb5f8dd40442db8c6bafb00153c Author: Juntao Hu AuthorDate: Thu Apr 21 19:38:01 2022 +0800 [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source This closes #19551. --- ...WatermarkIntoTableSourceScanAcrossCalcRule.java | 7 ++- .../PushWatermarkIntoTableSourceScanRuleTest.java | 22 ++ .../utils/JavaUserDefinedScalarFunctions.java | 22 ++ .../PushWatermarkIntoTableSourceScanRuleTest.xml | 21 + 4 files changed, 71 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java index 18539f08e20..d7311cc9ab1 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java @@ -37,6 +37,8 @@ import org.apache.calcite.sql.type.SqlTypeName; import java.util.List; import java.util.stream.Collectors; +import static org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall; + /** * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link FlinkLogicalCalc} to the * {@link FlinkLogicalTableSourceScan}. The rule will first look for the computed column in the @@ -62,7 +64,10 @@ public class PushWatermarkIntoTableSourceScanAcrossCalcRule @Override public boolean matches(RelOptRuleCall call) { FlinkLogicalTableSourceScan scan = call.rel(2); -return supportsWatermarkPushDown(scan); +FlinkLogicalCalc calc = call.rel(1); +return supportsWatermarkPushDown(scan) +&& calc.getProgram().getExprList().stream() +.noneMatch(rexNode -> containsPythonCall(rexNode, null)); } @Override diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java index e47a110687f..f6e7f99ef5c 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java @@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgr import org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder; import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions; import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5; import org.apache.flink.table.planner.utils.StreamTableTestUtil; import org.apache.flink.table.planner.utils.TableTestBase; @@ -256,4 +257,25 @@ public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase { util.tableEnv().executeSql(ddl); util.verifyRelPlan("select a, c from MyTable"); } + +@Test +public void testWatermarkWithPythonFunctionInComputedColumn() { +util.tableEnv() +.createTemporaryFunction( +"parse_ts", +new JavaUserDefinedScalarFunctions.PythonTimestampScalarFunction()); +String ddl = +"CREATE TABLE MyTable(" ++ " a INT,\n" ++ " b AS parse_ts(a),\n" ++ " WATERMARK FOR b AS b\n" ++ ") WITH (\n" ++ " 'connector' = 'values',\n" ++ " 'enable-watermark-push-down' = 'true',\n" ++ " 'bounded' = 'false',\n" ++ " 'disable-lookup' = 'true'" ++ ")"; +util.tableEnv().executeSql(ddl); +
[flink] branch master updated: [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 7ce5a7c6e1e [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source 7ce5a7c6e1e is described below commit 7ce5a7c6e1eab6823094a94bc0bca30d0ee618f1 Author: Juntao Hu AuthorDate: Thu Apr 21 19:38:01 2022 +0800 [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source This closes #19551. --- ...WatermarkIntoTableSourceScanAcrossCalcRule.java | 7 ++- .../PushWatermarkIntoTableSourceScanRuleTest.java | 22 ++ .../utils/JavaUserDefinedScalarFunctions.java | 21 + .../PushWatermarkIntoTableSourceScanRuleTest.xml | 21 + 4 files changed, 70 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java index 18539f08e20..d7311cc9ab1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java @@ -37,6 +37,8 @@ import org.apache.calcite.sql.type.SqlTypeName; import java.util.List; import java.util.stream.Collectors; +import static org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall; + /** * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link FlinkLogicalCalc} to the * {@link FlinkLogicalTableSourceScan}. The rule will first look for the computed column in the @@ -62,7 +64,10 @@ public class PushWatermarkIntoTableSourceScanAcrossCalcRule @Override public boolean matches(RelOptRuleCall call) { FlinkLogicalTableSourceScan scan = call.rel(2); -return supportsWatermarkPushDown(scan); +FlinkLogicalCalc calc = call.rel(1); +return supportsWatermarkPushDown(scan) +&& calc.getProgram().getExprList().stream() +.noneMatch(rexNode -> containsPythonCall(rexNode, null)); } @Override diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java index cdef53073e2..1c3ead462be 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java @@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgr import org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder; import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions; import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5; import org.apache.flink.table.planner.utils.StreamTableTestUtil; import org.apache.flink.table.planner.utils.TableTestBase; @@ -253,4 +254,25 @@ public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase { util.tableEnv().executeSql(ddl); util.verifyRelPlan("select a, c from MyTable"); } + +@Test +public void testWatermarkWithPythonFunctionInComputedColumn() { +util.tableEnv() +.createTemporaryFunction( +"parse_ts", +new JavaUserDefinedScalarFunctions.PythonTimestampScalarFunction()); +String ddl = +"CREATE TABLE MyTable(" ++ " a INT,\n" ++ " b AS parse_ts(a),\n" ++ " WATERMARK FOR b AS b\n" ++ ") WITH (\n" ++ " 'connector' = 'values',\n" ++ " 'enable-watermark-push-down' = 'true',\n" ++ " 'bounded' = 'false',\n" ++ " 'disable-lookup' = 'true'" ++ ")"; +util.tableEnv().executeSql(ddl); +util.verifyRelPlan("SELECT * FROM MyTable"); +} } diff --git
[flink] branch release-1.14 updated: [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.14 by this push: new 0806ad5a154 [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source 0806ad5a154 is described below commit 0806ad5a154e37d09b53ce56d59cec8dc11209da Author: Juntao Hu AuthorDate: Thu Apr 21 19:38:01 2022 +0800 [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source This closes #19551. --- ...WatermarkIntoTableSourceScanAcrossCalcRule.java | 7 ++- .../PushWatermarkIntoTableSourceScanRuleTest.java | 22 ++ .../utils/JavaUserDefinedScalarFunctions.java | 21 + .../PushWatermarkIntoTableSourceScanRuleTest.xml | 21 + 4 files changed, 70 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java index 18539f08e20..d7311cc9ab1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java @@ -37,6 +37,8 @@ import org.apache.calcite.sql.type.SqlTypeName; import java.util.List; import java.util.stream.Collectors; +import static org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall; + /** * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link FlinkLogicalCalc} to the * {@link FlinkLogicalTableSourceScan}. The rule will first look for the computed column in the @@ -62,7 +64,10 @@ public class PushWatermarkIntoTableSourceScanAcrossCalcRule @Override public boolean matches(RelOptRuleCall call) { FlinkLogicalTableSourceScan scan = call.rel(2); -return supportsWatermarkPushDown(scan); +FlinkLogicalCalc calc = call.rel(1); +return supportsWatermarkPushDown(scan) +&& calc.getProgram().getExprList().stream() +.noneMatch(rexNode -> containsPythonCall(rexNode, null)); } @Override diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java index e47a110687f..f6e7f99ef5c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java @@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgr import org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder; import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions; import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5; import org.apache.flink.table.planner.utils.StreamTableTestUtil; import org.apache.flink.table.planner.utils.TableTestBase; @@ -256,4 +257,25 @@ public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase { util.tableEnv().executeSql(ddl); util.verifyRelPlan("select a, c from MyTable"); } + +@Test +public void testWatermarkWithPythonFunctionInComputedColumn() { +util.tableEnv() +.createTemporaryFunction( +"parse_ts", +new JavaUserDefinedScalarFunctions.PythonTimestampScalarFunction()); +String ddl = +"CREATE TABLE MyTable(" ++ " a INT,\n" ++ " b AS parse_ts(a),\n" ++ " WATERMARK FOR b AS b\n" ++ ") WITH (\n" ++ " 'connector' = 'values',\n" ++ " 'enable-watermark-push-down' = 'true',\n" ++ " 'bounded' = 'false',\n" ++ " 'disable-lookup' = 'true'" ++ ")"; +util.tableEnv().executeSql(ddl); +util.verifyRelPlan("SELECT * FROM MyTable"); +} }
[flink] branch release-1.15 updated: [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.15 by this push: new 703b10ca5d0 [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source 703b10ca5d0 is described below commit 703b10ca5d004e8e79059e814fcf8503f84e2da8 Author: Juntao Hu AuthorDate: Thu Apr 21 19:38:01 2022 +0800 [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source This closes #19551. --- ...WatermarkIntoTableSourceScanAcrossCalcRule.java | 7 ++- .../PushWatermarkIntoTableSourceScanRuleTest.java | 22 ++ .../utils/JavaUserDefinedScalarFunctions.java | 21 + .../PushWatermarkIntoTableSourceScanRuleTest.xml | 21 + 4 files changed, 70 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java index 18539f08e20..d7311cc9ab1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java @@ -37,6 +37,8 @@ import org.apache.calcite.sql.type.SqlTypeName; import java.util.List; import java.util.stream.Collectors; +import static org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall; + /** * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link FlinkLogicalCalc} to the * {@link FlinkLogicalTableSourceScan}. The rule will first look for the computed column in the @@ -62,7 +64,10 @@ public class PushWatermarkIntoTableSourceScanAcrossCalcRule @Override public boolean matches(RelOptRuleCall call) { FlinkLogicalTableSourceScan scan = call.rel(2); -return supportsWatermarkPushDown(scan); +FlinkLogicalCalc calc = call.rel(1); +return supportsWatermarkPushDown(scan) +&& calc.getProgram().getExprList().stream() +.noneMatch(rexNode -> containsPythonCall(rexNode, null)); } @Override diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java index 58215cb2d74..db31b45e7b7 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java @@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgr import org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder; import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions; import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5; import org.apache.flink.table.planner.utils.StreamTableTestUtil; import org.apache.flink.table.planner.utils.TableTestBase; @@ -253,4 +254,25 @@ public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase { util.tableEnv().executeSql(ddl); util.verifyRelPlan("select a, c from MyTable"); } + +@Test +public void testWatermarkWithPythonFunctionInComputedColumn() { +util.tableEnv() +.createTemporaryFunction( +"parse_ts", +new JavaUserDefinedScalarFunctions.PythonTimestampScalarFunction()); +String ddl = +"CREATE TABLE MyTable(" ++ " a INT,\n" ++ " b AS parse_ts(a),\n" ++ " WATERMARK FOR b AS b\n" ++ ") WITH (\n" ++ " 'connector' = 'values',\n" ++ " 'enable-watermark-push-down' = 'true',\n" ++ " 'bounded' = 'false',\n" ++ " 'disable-lookup' = 'true'" ++ ")"; +util.tableEnv().executeSql(ddl); +util.verifyRelPlan("SELECT * FROM MyTable"); +} }
[flink] branch master updated: [FLINK-27218] fix the problem that the internal Serializer in OperatorState may have not been updated when sechema changes
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4033ddc5fa6 [FLINK-27218] fix the problem that the internal Serializer in OperatorState may have not been updated when sechema changes 4033ddc5fa6 is described below commit 4033ddc5fa682a1619f8f22348e2ee38afcc1c85 Author: mayuehappy AuthorDate: Mon Apr 18 21:44:56 2022 +0800 [FLINK-27218] fix the problem that the internal Serializer in OperatorState may have not been updated when sechema changes --- .../flink/runtime/state/HeapBroadcastState.java| 11 - .../runtime/state/PartitionableListState.java | 10 - .../state/StateBackendMigrationTestBase.java | 49 ++ 3 files changed, 68 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java index 14e72f4cfdd..78ff61945b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.typeutils.base.MapSerializer; import org.apache.flink.core.fs.FSDataOutputStream; @@ -46,7 +47,7 @@ public class HeapBroadcastState implements BackendWritableBroadcastState backingMap; /** A serializer that allows to perform deep copies of internal map state. */ -private final MapSerializer internalMapCopySerializer; +private MapSerializer internalMapCopySerializer; HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo stateMetaInfo) { this(stateMetaInfo, new HashMap<>()); @@ -71,6 +72,9 @@ public class HeapBroadcastState implements BackendWritableBroadcastState stateMetaInfo) { +this.internalMapCopySerializer = +new MapSerializer<>( +stateMetaInfo.getKeySerializer(), stateMetaInfo.getValueSerializer()); this.stateMetaInfo = stateMetaInfo; } @@ -154,4 +158,9 @@ public class HeapBroadcastState implements BackendWritableBroadcastState> immutableEntries() { return Collections.unmodifiableSet(backingMap.entrySet()); } + +@VisibleForTesting +public MapSerializer getInternalMapCopySerializer() { +return internalMapCopySerializer; +} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java index 12ef958a8f8..7cb363b91c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.ListState; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.memory.DataOutputView; @@ -42,7 +43,7 @@ public final class PartitionableListState implements ListState { private final ArrayList internalList; /** A typeSerializer that allows to perform deep copies of internalList */ -private final ArrayListSerializer internalListCopySerializer; +private ArrayListSerializer internalListCopySerializer; PartitionableListState(RegisteredOperatorStateBackendMetaInfo stateMetaInfo) { this(stateMetaInfo, new ArrayList()); @@ -65,6 +66,8 @@ public final class PartitionableListState implements ListState { } public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo stateMetaInfo) { +this.internalListCopySerializer = +new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer()); this.stateMetaInfo = stateMetaInfo; } @@ -130,4 +133,9 @@ public final class PartitionableListState implements ListState { internalList.addAll(values); } } + +@VisibleForTesting +public ArrayListSerializer getInternalListCopySerializer() { +return internalListCopySerializer; +} } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java index 733f39da47d..aafab4a5f2d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java @@
[flink] branch release-1.15 updated: [examples][python] Add examples on how to use json/csv/avro formats in Python DataStream API
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.15 by this push: new fa8fbf5bc29 [examples][python] Add examples on how to use json/csv/avro formats in Python DataStream API fa8fbf5bc29 is described below commit fa8fbf5bc29b1432e1baaefe3049b50ffeca706a Author: Dian Fu AuthorDate: Fri Apr 22 22:26:44 2022 +0800 [examples][python] Add examples on how to use json/csv/avro formats in Python DataStream API --- .../examples/datastream/formats/__init__.py| 17 .../examples/datastream/formats/avro_format.py | 91 ++ .../examples/datastream/formats/csv_format.py | 72 + .../examples/datastream/formats/json_format.py | 73 + 4 files changed, 253 insertions(+) diff --git a/flink-python/pyflink/examples/datastream/formats/__init__.py b/flink-python/pyflink/examples/datastream/formats/__init__.py new file mode 100644 index 000..65b48d4d79b --- /dev/null +++ b/flink-python/pyflink/examples/datastream/formats/__init__.py @@ -0,0 +1,17 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/flink-python/pyflink/examples/datastream/formats/avro_format.py b/flink-python/pyflink/examples/datastream/formats/avro_format.py new file mode 100644 index 000..9e3b2658d89 --- /dev/null +++ b/flink-python/pyflink/examples/datastream/formats/avro_format.py @@ -0,0 +1,91 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import sys + +from pyflink.common import AvroRowSerializationSchema, Types, AvroRowDeserializationSchema +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer + + +# Make sure that the Kafka cluster is started and the topic 'test_avro_topic' is +# created before executing this job. +def write_to_kafka(env): +ds = env.from_collection([ +(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')], +type_info=Types.ROW([Types.INT(), Types.STRING()])) + +serialization_schema = AvroRowSerializationSchema( +avro_schema_string=""" +{ +"type": "record", +"name": "TestRecord", +"fields": [ +{"name": "id", "type": "int"}, +{"name": "name", "type": "string"} +] +}""" +) + +kafka_producer = FlinkKafkaProducer( +topic='test_avro_topic', +serialization_schema=serialization_schema, +producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'} +) + +# note that the output type of ds must be RowTypeInfo +ds.add_sink(kafka_producer) +env.execute() + + +def read_from_kafka(env): +deserialization_schema = AvroRowDeserializationSchema( +avro_schema_string=""" +{ +"type": "record", +"name": "TestRecord",
[flink] branch master updated (cc6d1a27baf -> bdd91141658)
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from cc6d1a27baf [FLINK-27321][ci] Migrate java-ci-tools to JUnit 5 add bdd91141658 [examples][python] Add examples on how to use json/csv/avro formats in Python DataStream API No new revisions were added by this update. Summary of changes: .../examples/datastream/formats}/__init__.py | 0 .../examples/datastream/formats/avro_format.py | 91 ++ .../examples/datastream/formats/csv_format.py | 72 + .../examples/datastream/formats/json_format.py | 73 + 4 files changed, 236 insertions(+) copy {flink-end-to-end-tests/flink-python-test/python/datastream => flink-python/pyflink/examples/datastream/formats}/__init__.py (100%) create mode 100644 flink-python/pyflink/examples/datastream/formats/avro_format.py create mode 100644 flink-python/pyflink/examples/datastream/formats/csv_format.py create mode 100644 flink-python/pyflink/examples/datastream/formats/json_format.py
[flink-ml] branch master updated (cccd47c -> aedf663)
This is an automated email from the ASF dual-hosted git repository. hxb pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-ml.git from cccd47c Bump junit from 4.12 to 4.13.1 (#79) add aedf663 [FLINK-26268][ml][python] Add classfication algorithm support for LogisticRegression, KNN and NaiveBayes in ML Python API No new revisions were added by this update. Summary of changes: .github/workflows/python-checks.yml| 6 + flink-ml-python/dev/dev-requirements.txt | 2 +- flink-ml-python/pyflink/ml/__init__.py | 44 + flink-ml-python/pyflink/ml/core/api.py | 5 +- flink-ml-python/pyflink/ml/core/builder.py | 11 +- flink-ml-python/pyflink/ml/core/linalg.py | 133 +++ .../pyflink/ml/{ => core}/tests/__init__.py| 7 +- .../pyflink/ml/{ => core}/tests/test_linalg.py | 0 .../pyflink/ml/{ => core}/tests/test_param.py | 0 .../pyflink/ml/{ => core}/tests/test_pipeline.py | 9 +- .../pyflink/ml/{ => core}/tests/test_stage.py | 4 +- flink-ml-python/pyflink/ml/core/wrapper.py | 18 +- .../ml => ml/lib/classification}/__init__.py | 0 .../pyflink/ml/lib/classification/common.py| 74 + .../pyflink/ml/lib/classification/knn.py | 103 .../ml/lib/classification/logisticregression.py| 97 +++ .../pyflink/ml/lib/classification/naivebayes.py| 119 + .../ml/{ => lib/classification}/tests/__init__.py | 7 +- .../ml/lib/classification/tests/test_knn.py| 166 ++ .../tests/test_logisticregression.py | 185 + .../ml/lib/classification/tests/test_naivebayes.py | 120 + flink-ml-python/pyflink/ml/tests/test_utils.py | 30 +++- .../pyflink/ml/util/read_write_utils.py| 14 +- flink-ml-python/setup.py | 2 +- pom.xml| 2 +- 25 files changed, 1120 insertions(+), 38 deletions(-) copy flink-ml-python/pyflink/ml/{ => core}/tests/__init__.py (89%) rename flink-ml-python/pyflink/ml/{ => core}/tests/test_linalg.py (100%) rename flink-ml-python/pyflink/ml/{ => core}/tests/test_param.py (100%) rename flink-ml-python/pyflink/ml/{ => core}/tests/test_pipeline.py (92%) rename flink-ml-python/pyflink/ml/{ => core}/tests/test_stage.py (98%) copy flink-ml-python/pyflink/{examples/ml => ml/lib/classification}/__init__.py (100%) create mode 100644 flink-ml-python/pyflink/ml/lib/classification/common.py create mode 100644 flink-ml-python/pyflink/ml/lib/classification/knn.py create mode 100644 flink-ml-python/pyflink/ml/lib/classification/logisticregression.py create mode 100644 flink-ml-python/pyflink/ml/lib/classification/naivebayes.py copy flink-ml-python/pyflink/ml/{ => lib/classification}/tests/__init__.py (89%) create mode 100644 flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py create mode 100644 flink-ml-python/pyflink/ml/lib/classification/tests/test_logisticregression.py create mode 100644 flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py
[flink-training] branch master updated: [FLINK-26382] Add Chinese documents for flink-training exercises (#46)
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git The following commit(s) were added to refs/heads/master by this push: new aca6c47 [FLINK-26382] Add Chinese documents for flink-training exercises (#46) aca6c47 is described below commit aca6c47b79d486eb38969492c7e2dc8cb200d146 Author: T.C AuthorDate: Fri Apr 22 17:09:26 2022 +0800 [FLINK-26382] Add Chinese documents for flink-training exercises (#46) Co-authored-by: Victor Xu Co-authored-by: Nico Kruber --- README.md | 2 + README_zh.md| 274 build.gradle| 2 +- hourly-tips/DISCUSSION.md | 2 + hourly-tips/{DISCUSSION.md => DISCUSSION_zh.md} | 46 ++-- hourly-tips/README.md | 2 + hourly-tips/README_zh.md| 82 +++ long-ride-alerts/DISCUSSION.md | 2 + long-ride-alerts/DISCUSSION_zh.md | 50 + long-ride-alerts/README.md | 2 + long-ride-alerts/{README.md => README_zh.md}| 68 +++--- ride-cleansing/README.md| 2 + ride-cleansing/{README.md => README_zh.md} | 55 ++--- rides-and-fares/README.md | 2 + rides-and-fares/README_zh.md| 95 15 files changed, 601 insertions(+), 85 deletions(-) diff --git a/README.md b/README.md index 0fc84e1..b1a65cb 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ specific language governing permissions and limitations under the License. --> +[中文版](./README_zh.md) + # Apache Flink Training Exercises Exercises that accompany the training content in the documentation. diff --git a/README_zh.md b/README_zh.md new file mode 100644 index 000..2af9bba --- /dev/null +++ b/README_zh.md @@ -0,0 +1,274 @@ + + +# Apache Flink 实践练习 + +与文档中实践练习内容相关的练习。 + +## 目录 + +[**设置开发环境**](#set-up-your-development-environment) + +1. [软件要求](#software-requirements) +1. [克隆并构建 flink-training 项目](#clone-and-build-the-flink-training-project) +1. [将 flink-training 项目导入 IDE](#import-the-flink-training-project-into-your-ide) + +[**使用出租车数据流(taxi data stream)**](#using-the-taxi-data-streams) + +1. [出租车车程(taxi ride)事件结构](#schema-of-taxi-ride-events) +1. [出租车费用(taxi fare)事件结构](#schema-of-taxi-fare-events) + +[**如何做练习**](#how-to-do-the-lab-exercises) + +1. [了解数据](#learn-about-the-data) +2. [在 IDE 中运行和调试 Flink 程序](#run-and-debug-flink-programs-in-your-ide) +3. [练习、测试及解决方案](#exercises-tests-and-solutions) + +[**练习**](#lab-exercises) + +[**提交贡献**](#contributing) + +[**许可证**](#license) + + + +## 设置开发环境 + +你需要设置便于进行开发、调试并运行实践练习的示例和解决方案的环境。 + + + +### 软件要求 + +Linux、OS X 和 Windows 均可作为 Flink 程序和本地执行的开发环境。 Flink 开发设置需要以下软件,它们应该安装在系统上: + +- Git +- Java 8 或者 Java 11 版本的 JDK (JRE不满足要求;目前不支持其他版本的Java) +- 支持 Gradle 的 Java (及/或 Scala) 开发IDE +- 推荐使用 [IntelliJ](https://www.jetbrains.com/idea/), 但 [Eclipse](https://www.eclipse.org/downloads/) 或 [Visual Studio Code](https://code.visualstudio.com/) (安装 [Java extension pack](https://code.visualstudio.com/docs/java/java-tutorial) 插件) 也可以用于Java环境 +- 为了使用 Scala, 需要使用 IntelliJ (及其 [Scala plugin](https://plugins.jetbrains.com/plugin/1347-scala/) 插件) + +> **:information_source: Windows 用户须知:** 实践说明中提供的 shell 命令示例适用于 UNIX 环境。 +> 您可能会发现值得在 Windows 环境中设置 cygwin 或 WSL。对于开发 Flink 作业(jobs),Windows工作的相当好:可以在单机上运行 Flink 集群、提交作业、运行 webUI 并在IDE中执行作业。 + + + +### 克隆并构建 flink-training 项目 + +`flink-training` 仓库包含编程练习的习题、测试和参考解决方案。 + +> **:information_source: 仓库格局:** 本仓库有几个分支,分别指向不同的 Apache Flink 版本,类似于 [apache/flink](https://github.com/apache/flink) 仓库: +> - 每个 Apache Flink 次要版本的发布分支,例如 `release-1.10`,和 +> - 一个指向当前 Flink 版本的 `master` 分支(不是 `flink:master`!) +> +> 如果想在当前 Flink 版本以外的版本上工作,请务必签出相应的分支。 + +从 GitHub 克隆出 `flink-training` 仓库,导航到本地项目仓库并构建它: + +```bash +git clone https://github.com/apache/flink-training.git +cd flink-training +./gradlew test shadowJar +``` + +如果是第一次构建,将会下载此 Flink 练习项目的所有依赖项。这通常需要几分钟时间,但具体取决于互联网连接速度。 + +如果所有测试都通过并且构建成功,这说明你的实践练习已经开了一个好头。 + + +:cn: 中国用户: 点击这里了解如何使用本地 Maven 镜像。 + +如果你在中国,我们建议将 Maven 存储库配置为使用镜像。 可以通过在 [`build.gradle`](build.gradle) 文件中取消注释此部分来做到这一点: + +```groovy +repositories { +// for access from China, you may need to uncomment this line +maven { url 'https://maven.aliyun.com/repository/public/' } +mavenCentral() +maven { +url "https://repository.apache.org/content/repositories/snapshots/; +mavenContent { +snapshotsOnly() +} +} +} +``` + + + +启用 Scala (可选) +这个项目中的练习也可以使用 Scala ,但由于非 Scala 用户报告的一些问题,我们决定默认禁用 Scala。 +可以通过以下的方法修改 `gradle.properties` 文件以重新启用所有 Scala 练习和解决方案: + +[`gradle.properties`](gradle.properties) 文件如下: + +```properties +#... + +# Scala exercises
[flink-training] branch release-1.14 updated: fixup! [FLINK-26382]Add Chinese documents for flink-training exercises (#46)
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink-training.git The following commit(s) were added to refs/heads/release-1.14 by this push: new 18e6db2 fixup! [FLINK-26382]Add Chinese documents for flink-training exercises (#46) 18e6db2 is described below commit 18e6db2206ca4156e21276b14d35bebaf222c151 Author: Nico Kruber AuthorDate: Fri Apr 22 11:15:17 2022 +0200 fixup! [FLINK-26382]Add Chinese documents for flink-training exercises (#46) --- long-ride-alerts/README_zh.md | 2 +- rides-and-fares/README_zh.md | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/long-ride-alerts/README_zh.md b/long-ride-alerts/README_zh.md index 85912da..3b1d060 100644 --- a/long-ride-alerts/README_zh.md +++ b/long-ride-alerts/README_zh.md @@ -44,7 +44,7 @@ END 事件可能会丢失,但你可以假设没有重复的事件,也没有 ## 入门指南 > :information_source: 最好在 IDE 的 flink-training 项目中找到这些类,而不是使用本节中源文件的链接。 -> +> ### 练习相关类 - Java: [`org.apache.flink.training.exercises.longrides.LongRidesExercise`](src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java) diff --git a/rides-and-fares/README_zh.md b/rides-and-fares/README_zh.md index a1dff45..f54852f 100644 --- a/rides-and-fares/README_zh.md +++ b/rides-and-fares/README_zh.md @@ -27,7 +27,7 @@ under the License. 1. `TaxiRide` END 事件 1. 一个 `TaxiFare` 事件(其时间戳恰好与开始时间匹配) -最终的结果应该是 `DataStream`,每个不同的 `rideId` 都产生一个 `RideAndFare` 记录。 +最终的结果应该是 `DataStream`,每个不同的 `rideId` 都产生一个 `RideAndFare` 记录。 每个 `RideAndFare` 都应该将某个 `rideId` 的 `TaxiRide` START 事件与其匹配的 `TaxiFare` 配对。 ### 输入数据 @@ -37,7 +37,7 @@ under the License. ### 期望输出 -所希望的结果是一个 `RideAndFare` 记录的数据流,每个不同的 `rideId` 都有一条这样的记录。 +所希望的结果是一个 `RideAndFare` 记录的数据流,每个不同的 `rideId` 都有一条这样的记录。 本练习设置为忽略 END 事件,你应该连接每次乘车的 START 事件及其相应的车费事件。 一旦具有了相互关联的车程和车费事件,你可以使用 `new RideAndFare(ride, fare)` 方法为输出流创建所需的对象。 @@ -76,7 +76,7 @@ under the License. ## 讨论 出于练习的目的,可以假设 START 和 fare 事件完美配对。 -但是在现实世界的应用程序中,你应该担心每当一个事件丢失时,同一个 `rideId` 的另一个事件的状态将被永远保持。 +但是在现实世界的应用程序中,你应该担心每当一个事件丢失时,同一个 `rideId` 的另一个事件的状态将被永远保持。 在 [稍后的练习](../long-ride-alerts/README_zh.md) 中,我们将看到 `ProcessFunction` 和定时器,它们将有助于处理这样的情况。 ## 相关文档
[flink-training] branch release-1.14 updated: [FLINK-26382]Add Chinese documents for flink-training exercises (#46)
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink-training.git The following commit(s) were added to refs/heads/release-1.14 by this push: new 0132dd7 [FLINK-26382]Add Chinese documents for flink-training exercises (#46) 0132dd7 is described below commit 0132dd7be8c881607f9a374613309493ade8c6dd Author: T.C AuthorDate: Fri Apr 22 17:09:26 2022 +0800 [FLINK-26382]Add Chinese documents for flink-training exercises (#46) Co-authored-by: Victor Xu --- README.md | 2 + README_zh.md| 274 hourly-tips/DISCUSSION.md | 2 + hourly-tips/{DISCUSSION.md => DISCUSSION_zh.md} | 46 ++-- hourly-tips/README.md | 2 + hourly-tips/README_zh.md| 82 +++ long-ride-alerts/DISCUSSION.md | 2 + long-ride-alerts/DISCUSSION_zh.md | 50 + long-ride-alerts/README.md | 2 + long-ride-alerts/{README.md => README_zh.md}| 68 +++--- ride-cleansing/README.md| 2 + ride-cleansing/{README.md => README_zh.md} | 55 ++--- rides-and-fares/README.md | 2 + rides-and-fares/README_zh.md| 95 14 files changed, 600 insertions(+), 84 deletions(-) diff --git a/README.md b/README.md index 0fc84e1..b1a65cb 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ specific language governing permissions and limitations under the License. --> +[中文版](./README_zh.md) + # Apache Flink Training Exercises Exercises that accompany the training content in the documentation. diff --git a/README_zh.md b/README_zh.md new file mode 100644 index 000..2af9bba --- /dev/null +++ b/README_zh.md @@ -0,0 +1,274 @@ + + +# Apache Flink 实践练习 + +与文档中实践练习内容相关的练习。 + +## 目录 + +[**设置开发环境**](#set-up-your-development-environment) + +1. [软件要求](#software-requirements) +1. [克隆并构建 flink-training 项目](#clone-and-build-the-flink-training-project) +1. [将 flink-training 项目导入 IDE](#import-the-flink-training-project-into-your-ide) + +[**使用出租车数据流(taxi data stream)**](#using-the-taxi-data-streams) + +1. [出租车车程(taxi ride)事件结构](#schema-of-taxi-ride-events) +1. [出租车费用(taxi fare)事件结构](#schema-of-taxi-fare-events) + +[**如何做练习**](#how-to-do-the-lab-exercises) + +1. [了解数据](#learn-about-the-data) +2. [在 IDE 中运行和调试 Flink 程序](#run-and-debug-flink-programs-in-your-ide) +3. [练习、测试及解决方案](#exercises-tests-and-solutions) + +[**练习**](#lab-exercises) + +[**提交贡献**](#contributing) + +[**许可证**](#license) + + + +## 设置开发环境 + +你需要设置便于进行开发、调试并运行实践练习的示例和解决方案的环境。 + + + +### 软件要求 + +Linux、OS X 和 Windows 均可作为 Flink 程序和本地执行的开发环境。 Flink 开发设置需要以下软件,它们应该安装在系统上: + +- Git +- Java 8 或者 Java 11 版本的 JDK (JRE不满足要求;目前不支持其他版本的Java) +- 支持 Gradle 的 Java (及/或 Scala) 开发IDE +- 推荐使用 [IntelliJ](https://www.jetbrains.com/idea/), 但 [Eclipse](https://www.eclipse.org/downloads/) 或 [Visual Studio Code](https://code.visualstudio.com/) (安装 [Java extension pack](https://code.visualstudio.com/docs/java/java-tutorial) 插件) 也可以用于Java环境 +- 为了使用 Scala, 需要使用 IntelliJ (及其 [Scala plugin](https://plugins.jetbrains.com/plugin/1347-scala/) 插件) + +> **:information_source: Windows 用户须知:** 实践说明中提供的 shell 命令示例适用于 UNIX 环境。 +> 您可能会发现值得在 Windows 环境中设置 cygwin 或 WSL。对于开发 Flink 作业(jobs),Windows工作的相当好:可以在单机上运行 Flink 集群、提交作业、运行 webUI 并在IDE中执行作业。 + + + +### 克隆并构建 flink-training 项目 + +`flink-training` 仓库包含编程练习的习题、测试和参考解决方案。 + +> **:information_source: 仓库格局:** 本仓库有几个分支,分别指向不同的 Apache Flink 版本,类似于 [apache/flink](https://github.com/apache/flink) 仓库: +> - 每个 Apache Flink 次要版本的发布分支,例如 `release-1.10`,和 +> - 一个指向当前 Flink 版本的 `master` 分支(不是 `flink:master`!) +> +> 如果想在当前 Flink 版本以外的版本上工作,请务必签出相应的分支。 + +从 GitHub 克隆出 `flink-training` 仓库,导航到本地项目仓库并构建它: + +```bash +git clone https://github.com/apache/flink-training.git +cd flink-training +./gradlew test shadowJar +``` + +如果是第一次构建,将会下载此 Flink 练习项目的所有依赖项。这通常需要几分钟时间,但具体取决于互联网连接速度。 + +如果所有测试都通过并且构建成功,这说明你的实践练习已经开了一个好头。 + + +:cn: 中国用户: 点击这里了解如何使用本地 Maven 镜像。 + +如果你在中国,我们建议将 Maven 存储库配置为使用镜像。 可以通过在 [`build.gradle`](build.gradle) 文件中取消注释此部分来做到这一点: + +```groovy +repositories { +// for access from China, you may need to uncomment this line +maven { url 'https://maven.aliyun.com/repository/public/' } +mavenCentral() +maven { +url "https://repository.apache.org/content/repositories/snapshots/; +mavenContent { +snapshotsOnly() +} +} +} +``` + + + +启用 Scala (可选) +这个项目中的练习也可以使用 Scala ,但由于非 Scala 用户报告的一些问题,我们决定默认禁用 Scala。 +可以通过以下的方法修改 `gradle.properties` 文件以重新启用所有 Scala 练习和解决方案: + +[`gradle.properties`](gradle.properties) 文件如下: + +```properties +#... + +# Scala exercises can be enabled by setting this to true +org.gradle.project.enable_scala = true
[flink-kubernetes-operator] branch main updated (805fef5 -> f7ee710)
This is an automated email from the ASF dual-hosted git repository. wangyang0918 pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git from 805fef5 [FLINK-27023] Unify flink and operator configuration add f7ee710 [FLINK-27161] Support to fetch user jar from different sources for session job No new revisions were added by this update. Summary of changes: docs/content/docs/operations/configuration.md | 1 + .../flink/kubernetes/operator/FlinkOperator.java | 5 + .../{JarResolver.java => ArtifactFetcher.java} | 26 ++-- .../operator/artifact/ArtifactManager.java | 74 ++ .../artifact/FileSystemBasedArtifactFetcher.java | 52 +++ .../operator/artifact/HttpArtifactFetcher.java | 50 +++ .../config/FlinkOperatorConfiguration.java | 8 +- .../config/KubernetesOperatorConfigOptions.java| 6 + .../kubernetes/operator/service/FlinkService.java | 21 +-- .../kubernetes/operator/TestingFlinkService.java | 3 +- .../operator/artifact/ArtifactManagerTest.java | 154 + .../operator/artifact/JarResolverTest.java | 35 - 12 files changed, 376 insertions(+), 59 deletions(-) rename flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/{JarResolver.java => ArtifactFetcher.java} (65%) create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManager.java create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/FileSystemBasedArtifactFetcher.java create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/HttpArtifactFetcher.java create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManagerTest.java delete mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/JarResolverTest.java