[flink] branch release-1.11 updated (c98122c -> ab6cf40)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from c98122c [FLINK-16559][hive] Add test for avro table new 0d9ed90 [FLINK-18056][fs-connector] Hadoop path-based file writer adds UUID to in-progress file to avoid conflicts new ab6cf40 [FLINK-18130][hive][fs-connector] File name conflict for different jobs in filesystem/hive sink () The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/connectors/hive/HiveTableSink.java | 6 +- .../flink/connectors/hive/HiveTableSinkITCase.java | 75 flink-formats/flink-hadoop-bulk/pom.xml| 37 ++ .../bulk/DefaultHadoopFileCommitterFactory.java| 16 ++- .../formats/hadoop/bulk/HadoopFileCommitter.java | 2 +- .../hadoop/bulk/HadoopFileCommitterFactory.java| 18 ++- .../hadoop/bulk/HadoopPathBasedPartFileWriter.java | 59 ++--- .../bulk/committer/HadoopRenameFileCommitter.java | 41 +-- ...terTest.java => AbstractFileCommitterTest.java} | 132 + .../bulk/HadoopPathBasedPartFileWriterTest.java| 6 +- .../committer/HadoopRenameCommitterHDFSTest.java | 97 +++ .../HadoopRenameCommitterLocalFSTest.java | 69 +++ .../bulk/committer/cluster/HDFSCluster.java} | 40 --- .../planner/runtime/FileSystemITCaseBase.scala | 37 ++ .../stream/sql/StreamFileSystemITCaseBase.scala| 6 +- .../table/filesystem/FileSystemTableSink.java | 8 ++ .../flink/table/filesystem/PartitionLoader.java| 9 +- .../table/filesystem/PartitionTempFileManager.java | 2 +- 18 files changed, 546 insertions(+), 114 deletions(-) rename flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/{committer/HadoopRenameFileCommitterTest.java => AbstractFileCommitterTest.java} (66%) create mode 100644 flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterHDFSTest.java create mode 100644 flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterLocalFSTest.java copy flink-formats/flink-hadoop-bulk/src/{main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java => test/java/org/apache/flink/formats/hadoop/bulk/committer/cluster/HDFSCluster.java} (56%)
[flink] 02/02: [FLINK-18130][hive][fs-connector] File name conflict for different jobs in filesystem/hive sink ()
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit ab6cf4083c84bf8c04564c74f6d9f9783adf5e21 Author: Jingsong Lee AuthorDate: Tue Jun 9 13:48:42 2020 +0800 [FLINK-18130][hive][fs-connector] File name conflict for different jobs in filesystem/hive sink () This closes #12485 --- .../flink/connectors/hive/HiveTableSink.java | 6 +- .../flink/connectors/hive/HiveTableSinkITCase.java | 75 ++ .../planner/runtime/FileSystemITCaseBase.scala | 37 +++ .../stream/sql/StreamFileSystemITCaseBase.scala| 6 +- .../table/filesystem/FileSystemTableSink.java | 8 +++ .../flink/table/filesystem/PartitionLoader.java| 9 +-- .../table/filesystem/PartitionTempFileManager.java | 2 +- 7 files changed, 132 insertions(+), 11 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index a4e3012..1cac9e0 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -77,6 +77,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE; import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_TIME_INTERVAL; @@ -141,9 +142,10 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS isCompressed); String extension = Utilities.getFileExtension(jobConf, isCompressed, (HiveOutputFormat) hiveOutputFormatClz.newInstance()); - extension = extension == null ? "" : extension; OutputFileConfig outputFileConfig = OutputFileConfig.builder() - .withPartSuffix(extension).build(); + .withPartPrefix("part-" + UUID.randomUUID().toString()) + .withPartSuffix(extension == null ? "" : extension) + .build(); if (isBounded) { FileSystemOutputFormat.Builder builder = new FileSystemOutputFormat.Builder<>(); builder.setPartitionComputer(new HiveRowPartitionComputer( diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java index a73bce9..7592a33 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java @@ -45,6 +45,8 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + import com.klarna.hiverunner.HiveShell; import com.klarna.hiverunner.annotations.HiveSQL; import org.apache.hadoop.hive.conf.HiveConf; @@ -60,6 +62,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -228,6 +231,25 @@ public class HiveTableSinkITCase { } } + @Test + public void testBatchAppend() { + TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE); + tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tEnv.useCatalog(hiveCatalog.getName()); + tEnv.executeSql("create database db1"); + tEnv.useDatabase("db1"); + try { + tEnv.executeSql("create table append_table (i int, j int)"); + TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 1, 1"); + TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 2, 2"); + ArrayList rows = Lists.newArrayList(tEnv.executeSql("select * from append_table").collect()); + rows.sort(Comparator.comparingInt(o -> (int) o.getField(0))); +
[flink] 01/02: [FLINK-18056][fs-connector] Hadoop path-based file writer adds UUID to in-progress file to avoid conflicts
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 0d9ed9012286ef2495b1b75277601743e8c9ff6f Author: Yun Gao AuthorDate: Tue Jun 9 13:47:53 2020 +0800 [FLINK-18056][fs-connector] Hadoop path-based file writer adds UUID to in-progress file to avoid conflicts This closes #12452 --- flink-formats/flink-hadoop-bulk/pom.xml| 37 ++ .../bulk/DefaultHadoopFileCommitterFactory.java| 16 ++- .../formats/hadoop/bulk/HadoopFileCommitter.java | 2 +- .../hadoop/bulk/HadoopFileCommitterFactory.java| 18 ++- .../hadoop/bulk/HadoopPathBasedPartFileWriter.java | 59 ++--- .../bulk/committer/HadoopRenameFileCommitter.java | 41 +-- ...terTest.java => AbstractFileCommitterTest.java} | 132 + .../bulk/HadoopPathBasedPartFileWriterTest.java| 6 +- .../committer/HadoopRenameCommitterHDFSTest.java | 97 +++ .../HadoopRenameCommitterLocalFSTest.java | 69 +++ .../bulk/committer/cluster/HDFSCluster.java} | 40 --- 11 files changed, 414 insertions(+), 103 deletions(-) diff --git a/flink-formats/flink-hadoop-bulk/pom.xml b/flink-formats/flink-hadoop-bulk/pom.xml index 371c15a..051cc53 100644 --- a/flink-formats/flink-hadoop-bulk/pom.xml +++ b/flink-formats/flink-hadoop-bulk/pom.xml @@ -80,6 +80,43 @@ under the License. ${project.version} test + + + org.apache.hadoop + hadoop-hdfs + test + test-jar + ${hadoop.version} + + + log4j + log4j + + + + + + org.apache.hadoop + hadoop-common + test + test-jar + ${hadoop.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + jdk.tools + jdk.tools + + + diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java index 01ac88c..00f8f58 100644 --- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java +++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java @@ -23,6 +23,8 @@ import org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import java.io.IOException; + /** * The default hadoop file committer factory which always use {@link HadoopRenameFileCommitter}. */ @@ -31,7 +33,19 @@ public class DefaultHadoopFileCommitterFactory implements HadoopFileCommitterFac private static final long serialVersionUID = 1L; @Override - public HadoopFileCommitter create(Configuration configuration, Path targetFilePath) { + public HadoopFileCommitter create( + Configuration configuration, + Path targetFilePath) throws IOException { + return new HadoopRenameFileCommitter(configuration, targetFilePath); } + + @Override + public HadoopFileCommitter recoverForCommit( + Configuration configuration, + Path targetFilePath, + Path tempFilePath) throws IOException { + + return new HadoopRenameFileCommitter(configuration, targetFilePath, tempFilePath); + } } diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java index 7ae0d56..0fdb4f8 100644 --- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java +++
[flink] branch master updated: [FLINK-18130][hive][fs-connector] File name conflict for different jobs in filesystem/hive sink ()
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 603cd83 [FLINK-18130][hive][fs-connector] File name conflict for different jobs in filesystem/hive sink () 603cd83 is described below commit 603cd830073f95d1b48cf8fa6817100ac17e1eb3 Author: Jingsong Lee AuthorDate: Tue Jun 9 13:48:42 2020 +0800 [FLINK-18130][hive][fs-connector] File name conflict for different jobs in filesystem/hive sink () This closes #12485 --- .../flink/connectors/hive/HiveTableSink.java | 6 +- .../flink/connectors/hive/HiveTableSinkITCase.java | 75 ++ .../planner/runtime/FileSystemITCaseBase.scala | 37 +++ .../stream/sql/StreamFileSystemITCaseBase.scala| 6 +- .../table/filesystem/FileSystemTableSink.java | 8 +++ .../flink/table/filesystem/PartitionLoader.java| 9 +-- .../table/filesystem/PartitionTempFileManager.java | 2 +- 7 files changed, 132 insertions(+), 11 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index a4e3012..1cac9e0 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -77,6 +77,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE; import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_TIME_INTERVAL; @@ -141,9 +142,10 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS isCompressed); String extension = Utilities.getFileExtension(jobConf, isCompressed, (HiveOutputFormat) hiveOutputFormatClz.newInstance()); - extension = extension == null ? "" : extension; OutputFileConfig outputFileConfig = OutputFileConfig.builder() - .withPartSuffix(extension).build(); + .withPartPrefix("part-" + UUID.randomUUID().toString()) + .withPartSuffix(extension == null ? "" : extension) + .build(); if (isBounded) { FileSystemOutputFormat.Builder builder = new FileSystemOutputFormat.Builder<>(); builder.setPartitionComputer(new HiveRowPartitionComputer( diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java index a73bce9..7592a33 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java @@ -45,6 +45,8 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + import com.klarna.hiverunner.HiveShell; import com.klarna.hiverunner.annotations.HiveSQL; import org.apache.hadoop.hive.conf.HiveConf; @@ -60,6 +62,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -228,6 +231,25 @@ public class HiveTableSinkITCase { } } + @Test + public void testBatchAppend() { + TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE); + tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tEnv.useCatalog(hiveCatalog.getName()); + tEnv.executeSql("create database db1"); + tEnv.useDatabase("db1"); + try { + tEnv.executeSql("create table append_table (i int, j int)"); + TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 1, 1"); + TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 2, 2"); +
[flink] branch master updated (74231f7 -> 45f42f8)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 74231f7 [FLINK-17902][python] Support the new interfaces about temporary functions in PyFlink add 45f42f8 [FLINK-18056][fs-connector] Hadoop path-based file writer adds UUID to in-progress file to avoid conflicts No new revisions were added by this update. Summary of changes: flink-formats/flink-hadoop-bulk/pom.xml| 37 ++ .../bulk/DefaultHadoopFileCommitterFactory.java| 16 ++- .../formats/hadoop/bulk/HadoopFileCommitter.java | 2 +- .../hadoop/bulk/HadoopFileCommitterFactory.java| 18 ++- .../hadoop/bulk/HadoopPathBasedPartFileWriter.java | 59 ++--- .../bulk/committer/HadoopRenameFileCommitter.java | 41 +-- ...terTest.java => AbstractFileCommitterTest.java} | 132 + .../bulk/HadoopPathBasedPartFileWriterTest.java| 6 +- .../committer/HadoopRenameCommitterHDFSTest.java | 97 +++ .../HadoopRenameCommitterLocalFSTest.java | 69 +++ .../bulk/committer/cluster/HDFSCluster.java} | 40 --- 11 files changed, 414 insertions(+), 103 deletions(-) rename flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/{committer/HadoopRenameFileCommitterTest.java => AbstractFileCommitterTest.java} (66%) create mode 100644 flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterHDFSTest.java create mode 100644 flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterLocalFSTest.java copy flink-formats/flink-hadoop-bulk/src/{main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java => test/java/org/apache/flink/formats/hadoop/bulk/committer/cluster/HDFSCluster.java} (56%)
[flink] branch master updated (fd9214e -> 74231f7)
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from fd9214e [FLINK-16559][hive] Add test for avro table add 74231f7 [FLINK-17902][python] Support the new interfaces about temporary functions in PyFlink No new revisions were added by this update. Summary of changes: flink-python/pyflink/table/table_environment.py| 235 + .../table/tests/test_table_environment_api.py | 34 +++ flink-python/pyflink/table/tests/test_udf.py | 14 ++ 3 files changed, 283 insertions(+)
[flink] branch release-1.11 updated: [FLINK-16559][hive] Add test for avro table
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new c98122c [FLINK-16559][hive] Add test for avro table c98122c is described below commit c98122c29ef875d7fbc2b85fc9496bcfa11e0363 Author: Rui Li AuthorDate: Tue Jun 9 12:15:55 2020 +0800 [FLINK-16559][hive] Add test for avro table This closes #12408 --- .../apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java| 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java index 2da7d10..367edb9 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java @@ -128,11 +128,14 @@ public class TableEnvHiveConnectorITCase { @Test public void testDifferentFormats() throws Exception { - String[] formats = new String[]{"orc", "parquet", "sequencefile", "csv"}; + String[] formats = new String[]{"orc", "parquet", "sequencefile", "csv", "avro"}; for (String format : formats) { if (format.equals("orc") && HiveShimLoader.getHiveVersion().startsWith("2.0")) { // Ignore orc test for Hive version 2.0.x for now due to FLINK-13998 continue; + } else if (format.equals("avro") && !HiveVersionTestUtil.HIVE_110_OR_LATER) { + // timestamp is not supported for avro tables before 1.1.0 + continue; } readWriteFormat(format); }
[flink] branch master updated (2be8239 -> fd9214e)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2be8239 [FLINK-16291][hive] Ban count from HiveModule add fd9214e [FLINK-16559][hive] Add test for avro table No new revisions were added by this update. Summary of changes: .../apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java| 5 - 1 file changed, 4 insertions(+), 1 deletion(-)
[flink] branch release-1.11 updated: [FLINK-16291][hive] Ban count from HiveModule
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 513ac5a [FLINK-16291][hive] Ban count from HiveModule 513ac5a is described below commit 513ac5ae8d4fc1e679a7faf07a7d020f073a09fc Author: Rui Li AuthorDate: Tue Jun 9 12:03:23 2020 +0800 [FLINK-16291][hive] Ban count from HiveModule This closes #12382 --- .../java/org/apache/flink/table/module/hive/HiveModule.java | 2 +- .../org/apache/flink/table/module/hive/HiveModuleTest.java | 12 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java index e34965b..12fc5f1 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java @@ -44,7 +44,7 @@ public class HiveModule implements Module { // a set of functions that shouldn't be overridden by HiveModule @VisibleForTesting static final Set BUILT_IN_FUNC_BLACKLIST = Collections.unmodifiableSet(new HashSet<>( - Arrays.asList("dense_rank", "first_value", "lag", "last_value", "lead", "rank", "row_number", + Arrays.asList("count", "dense_rank", "first_value", "lag", "last_value", "lead", "rank", "row_number", "hop", "hop_end", "hop_proctime", "hop_rowtime", "hop_start", "session", "session_end", "session_proctime", "session_rowtime", "session_start", "tumble", "tumble_end", "tumble_proctime", "tumble_rowtime", "tumble_start"))); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java index 65cbda51..374984a 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java @@ -62,22 +62,22 @@ public class HiveModuleTest { switch (hiveVersion) { case HIVE_VERSION_V1_2_0: - assertEquals(232, hiveModule.listFunctions().size()); + assertEquals(231, hiveModule.listFunctions().size()); break; case HIVE_VERSION_V2_0_0: - assertEquals(236, hiveModule.listFunctions().size()); + assertEquals(235, hiveModule.listFunctions().size()); break; case HIVE_VERSION_V2_1_1: - assertEquals(246, hiveModule.listFunctions().size()); + assertEquals(245, hiveModule.listFunctions().size()); break; case HIVE_VERSION_V2_2_0: - assertEquals(262, hiveModule.listFunctions().size()); + assertEquals(261, hiveModule.listFunctions().size()); break; case HIVE_VERSION_V2_3_4: - assertEquals(280, hiveModule.listFunctions().size()); + assertEquals(279, hiveModule.listFunctions().size()); break; case HIVE_VERSION_V3_1_1: - assertEquals(299, hiveModule.listFunctions().size()); + assertEquals(298, hiveModule.listFunctions().size()); break; } }
[flink] branch master updated (02b915a -> 2be8239)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 02b915a [FLINK-17893][sql-client] SQL CLI should print the root cause if the statement is invalid add 2be8239 [FLINK-16291][hive] Ban count from HiveModule No new revisions were added by this update. Summary of changes: .../java/org/apache/flink/table/module/hive/HiveModule.java | 2 +- .../org/apache/flink/table/module/hive/HiveModuleTest.java | 12 ++-- 2 files changed, 7 insertions(+), 7 deletions(-)
[flink] branch release-1.11 updated (882fdda -> c211946)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from 882fdda [FLINK-16101][docs-zh][table] Translate "Hive Functions" page of "Hive Integration" into Chinese add c211946 [FLINK-17893][sql-client] SQL CLI should print the root cause if the statement is invalid No new revisions were added by this update. Summary of changes: .../apache/flink/table/client/cli/CliClient.java | 11 +- .../apache/flink/table/client/cli/CliStrings.java | 6 - .../flink/table/client/cli/SqlCommandParser.java | 42 +++--- .../flink/table/client/cli/CliClientTest.java | 23 .../table/client/cli/SqlCommandParserTest.java | 152 +++-- 5 files changed, 137 insertions(+), 97 deletions(-)
[flink] branch master updated (dee868d -> 02b915a)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from dee868d [FLINK-16101][docs-zh][table] Translate "Hive Functions" page of "Hive Integration" into Chinese add 02b915a [FLINK-17893][sql-client] SQL CLI should print the root cause if the statement is invalid No new revisions were added by this update. Summary of changes: .../apache/flink/table/client/cli/CliClient.java | 11 +- .../apache/flink/table/client/cli/CliStrings.java | 6 - .../flink/table/client/cli/SqlCommandParser.java | 42 +++--- .../flink/table/client/cli/CliClientTest.java | 23 .../table/client/cli/SqlCommandParserTest.java | 152 +++-- 5 files changed, 137 insertions(+), 97 deletions(-)
[flink] branch release-1.11 updated: [FLINK-16101][docs-zh][table] Translate "Hive Functions" page of "Hive Integration" into Chinese
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 882fdda [FLINK-16101][docs-zh][table] Translate "Hive Functions" page of "Hive Integration" into Chinese 882fdda is described below commit 882fdda3a33dd4bf8fc749e559ba44da7f7df8fd Author: 20010079 AuthorDate: Wed Apr 8 10:19:04 2020 +0800 [FLINK-16101][docs-zh][table] Translate "Hive Functions" page of "Hive Integration" into Chinese This closes #11664 --- docs/dev/table/hive/hive_functions.zh.md | 46 +++- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/docs/dev/table/hive/hive_functions.zh.md b/docs/dev/table/hive/hive_functions.zh.md index 537d684..c2e46cf 100644 --- a/docs/dev/table/hive/hive_functions.zh.md +++ b/docs/dev/table/hive/hive_functions.zh.md @@ -1,5 +1,5 @@ --- -title: "Hive functions" +title: "Hive 函数" nav-parent_id: hive_tableapi nav-pos: 3 --- @@ -22,11 +22,11 @@ specific language governing permissions and limitations under the License. --> -## Use Hive Built-in Functions via HiveModule +## 通过 HiveModule 使用 Hive 内置函数 -The `HiveModule` provides Hive built-in functions as Flink system (built-in) functions to Flink SQL and Table API users. +在 Flink SQL 和 Table API 中,可以通过系统内置的 `HiveModule` 来使用 Hive 内置函数, -For detailed information, please refer to [HiveModule]({{ site.baseurl }}/dev/table/modules.html#hivemodule). +详细信息,请参考 [HiveModule]({{ site.baseurl }}/zh/dev/table/modules.html#hivemodule)。 @@ -58,14 +58,14 @@ modules: -* NOTE that some Hive built-in functions in older versions have [thread safety issues](https://issues.apache.org/jira/browse/HIVE-16183). -We recommend users patch their own Hive to fix them. +* 请注意旧版本的部分 Hive 内置函数存在[线程安全问题](https://issues.apache.org/jira/browse/HIVE-16183)。 +我们建议用户及时通过补丁修正 Hive 中的这些问题。 -## Hive User Defined Functions +## Hive 用户自定义函数(User Defined Functions) -Users can use their existing Hive User Defined Functions in Flink. +在 Flink 中用户可以使用 Hive 里已经存在的 UDF 函数。 -Supported UDF types include: +支持的 UDF 类型包括: - UDF - GenericUDF @@ -73,24 +73,23 @@ Supported UDF types include: - UDAF - GenericUDAFResolver2 -Upon query planning and execution, Hive's UDF and GenericUDF are automatically translated into Flink's ScalarFunction, -Hive's GenericUDTF is automatically translated into Flink's TableFunction, -and Hive's UDAF and GenericUDAFResolver2 are translated into Flink's AggregateFunction. +在进行查询规划和执行时,Hive UDF 和 GenericUDF 函数会自动转换成 Flink 中的 ScalarFunction,GenericUDTF 会被自动转换成 Flink 中的 + TableFunction,UDAF 和 GenericUDAFResolver2 则转换成 Flink 聚合函数(AggregateFunction). -To use a Hive User Defined Function, user have to +想要使用 Hive UDF 函数,需要如下几步: -- set a HiveCatalog backed by Hive Metastore that contains that function as current catalog of the session -- include a jar that contains that function in Flink's classpath -- use Blink planner. +- 通过 Hive Metastore 将带有 UDF 的 HiveCatalog 设置为当前会话的 catalog 后端。 +- 将带有 UDF 的 jar 包放入 Flink classpath 中,并在代码中引入。 +- 使用 Blink planner。 -## Using Hive User Defined Functions +## 使用 Hive UDF -Assuming we have the following Hive functions registered in Hive Metastore: +假设我们在 Hive Metastore 中已经注册了下面的 UDF 函数: {% highlight java %} /** - * Test simple udf. Registered under name 'myudf' + * 注册为 'myudf' 的简单 UDF 测试类. */ public class TestHiveSimpleUDF extends UDF { @@ -104,7 +103,7 @@ public class TestHiveSimpleUDF extends UDF { } /** - * Test generic udf. Registered under name 'mygenericudf' + * 注册为 'mygenericudf' 的普通 UDF 测试类 */ public class TestHiveGenericUDF extends GenericUDF { @@ -137,7 +136,7 @@ public class TestHiveGenericUDF extends GenericUDF { } /** - * Test split udtf. Registered under name 'mygenericudtf' + * 注册为 'mygenericudtf' 的字符串分割 UDF 测试类 */ public class TestHiveUDTF extends GenericUDTF { @@ -172,7 +171,7 @@ public class TestHiveUDTF extends GenericUDTF { {% endhighlight %} -From Hive CLI, we can see they are registered: +在 Hive CLI 中,可以查询到已经注册的 UDF 函数: {% highlight bash %} hive> show functions; @@ -184,8 +183,7 @@ myudtf {% endhighlight %} - -Then, users can use them in SQL as: +此时,用户如果想使用这些 UDF,在 SQL 中就可以这样写: {% highlight bash %}
[flink] branch master updated (926523e -> dee868d)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 926523e [FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction add dee868d [FLINK-16101][docs-zh][table] Translate "Hive Functions" page of "Hive Integration" into Chinese No new revisions were added by this update. Summary of changes: docs/dev/table/hive/hive_functions.zh.md | 46 +++- 1 file changed, 22 insertions(+), 24 deletions(-)
[flink] branch release-1.11 updated: [FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 11958b0 [FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction 11958b0 is described below commit 11958b0dd424c143184ed8fd5a2943049b27a4b4 Author: lsy AuthorDate: Tue Jun 9 11:04:59 2020 +0800 [FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction This closes #12303 --- .../operators/rank/AppendOnlyTopNFunction.java | 16 +++-- .../table/runtime/operators/rank/TopNBuffer.java | 83 +++--- 2 files changed, 68 insertions(+), 31 deletions(-) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java index 6a3cf5a..82e2fae 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java @@ -211,16 +211,22 @@ public class AppendOnlyTopNFunction extends AbstractTopNFunction { if (buffer.getCurrentTopNum() > rankEnd) { Map.Entry> lastEntry = buffer.lastEntry(); RowData lastKey = lastEntry.getKey(); - List lastList = (List) lastEntry.getValue(); + Collection lastList = lastEntry.getValue(); + RowData lastElement = buffer.lastElement(); + int size = lastList.size(); // remove last one - RowData lastElement = lastList.remove(lastList.size() - 1); - if (lastList.isEmpty()) { + if (size <= 1) { buffer.removeAll(lastKey); dataState.remove(lastKey); } else { - dataState.put(lastKey, lastList); + buffer.removeLast(); + // last element has been removed from lastList, we have to copy a new collection + // for lastList to avoid mutating state values, see CopyOnWriteStateMap, + // otherwise, the result might be corrupt. + // don't need to perform a deep copy, because RowData elements will not be updated + dataState.put(lastKey, new ArrayList<>(lastList)); } - if (input.equals(lastElement)) { + if (size == 0 || input.equals(lastElement)) { return; } else { // lastElement shouldn't be null diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java index 5d3ec52..3c8f7f5 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java @@ -23,7 +23,7 @@ import org.apache.flink.table.data.RowData; import java.io.Serializable; import java.util.Collection; import java.util.Comparator; -import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -94,12 +94,12 @@ class TopNBuffer implements Serializable { } public void remove(RowData sortKey, RowData value) { - Collection list = treeMap.get(sortKey); - if (list != null) { - if (list.remove(value)) { + Collection collection = treeMap.get(sortKey); + if (collection != null) { + if (collection.remove(value)) { currentTopNum -= 1; } - if (list.size() == 0) { + if (collection.size() == 0) { treeMap.remove(sortKey); } } @@ -111,9 +111,9 @@ class TopNBuffer implements Serializable { * @param sortKey key to remove */ void removeAll(RowData sortKey) { - Collection list = treeMap.get(sortKey); - if (list != null) { -
[flink] branch master updated (87d6a76 -> 926523e)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 87d6a76 [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink add 926523e [FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction No new revisions were added by this update. Summary of changes: .../operators/rank/AppendOnlyTopNFunction.java | 16 +++-- .../table/runtime/operators/rank/TopNBuffer.java | 83 +++--- 2 files changed, 68 insertions(+), 31 deletions(-)
[flink] branch release-1.11 updated: [FLINK-17722][python][build system] (followups) Keeps all jars of plugins and bin directory in CachedFiles
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new aeef207 [FLINK-17722][python][build system] (followups) Keeps all jars of plugins and bin directory in CachedFiles aeef207 is described below commit aeef20744dc1a867c447e489be8c009bfa7be246 Author: huangxingbo AuthorDate: Mon Jun 8 20:30:20 2020 +0800 [FLINK-17722][python][build system] (followups) Keeps all jars of plugins and bin directory in CachedFiles This closes #12528. --- tools/azure_controller.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/azure_controller.sh b/tools/azure_controller.sh index 35848c2..0d305b7 100755 --- a/tools/azure_controller.sh +++ b/tools/azure_controller.sh @@ -122,6 +122,8 @@ if [ $STAGE == "$STAGE_COMPILE" ]; then ! -path "$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/lib/*.jar" \ ! -path "$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/opt/flink-python*.jar" \ ! -path "$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/opt/flink-sql-client_*.jar" \ +! -path "$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/plugins/*.jar" \ +! -path "$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/bin/*.jar" \ ! -path "$CACHE_FLINK_DIR/flink-connectors/flink-connector-elasticsearch-base/target/flink-*.jar" \ ! -path "$CACHE_FLINK_DIR/flink-connectors/flink-connector-kafka-base/target/flink-*.jar" \ ! -path "$CACHE_FLINK_DIR/flink-table/flink-table-planner/target/flink-table-planner*tests.jar" | xargs rm -rf
[flink] branch release-1.11 updated (f323a79 -> aeef207)
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from f323a79 [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink add aeef207 [FLINK-17722][python][build system] (followups) Keeps all jars of plugins and bin directory in CachedFiles No new revisions were added by this update. Summary of changes: tools/azure_controller.sh | 2 ++ 1 file changed, 2 insertions(+)
[flink] branch master updated: [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 87d6a76 [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink 87d6a76 is described below commit 87d6a76923511f6b47ea2d5d2bba365e21cd3b96 Author: Dawid Wysakowicz AuthorDate: Mon Jun 8 20:02:24 2020 +0200 [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink --- .../org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java | 3 +-- .../flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java| 3 +-- .../org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java| 3 +-- .../flink/streaming/connectors/kafka/table/KafkaDynamicSink.java | 3 +-- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java index cb42bd7..8abc2a2 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java @@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; @@ -57,7 +56,7 @@ public class Kafka011TableSink extends KafkaTableSinkBase { Optional> partitioner) { return new FlinkKafkaProducer011<>( topic, - new KeyedSerializationSchemaWrapper<>(serializationSchema), + serializationSchema, properties, partitioner); } diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java index 110dd9a..d632928 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; -import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; @@ -60,7 +59,7 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase { Optional> partitioner) { return new FlinkKafkaProducer011<>( topic, - new KeyedSerializationSchemaWrapper<>(serializationSchema), + serializationSchema, properties, partitioner); } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 50e4381..861e5d7 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import
[flink] branch release-1.11 updated: [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new f323a79 [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink f323a79 is described below commit f323a793b9776c56178446ee08445aaf99f8231c Author: Dawid Wysakowicz AuthorDate: Mon Jun 8 20:02:24 2020 +0200 [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink --- .../org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java | 3 +-- .../flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java| 3 +-- .../org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java| 3 +-- .../flink/streaming/connectors/kafka/table/KafkaDynamicSink.java | 3 +-- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java index cb42bd7..8abc2a2 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java @@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; @@ -57,7 +56,7 @@ public class Kafka011TableSink extends KafkaTableSinkBase { Optional> partitioner) { return new FlinkKafkaProducer011<>( topic, - new KeyedSerializationSchemaWrapper<>(serializationSchema), + serializationSchema, properties, partitioner); } diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java index 110dd9a..d632928 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; -import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; @@ -60,7 +59,7 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase { Optional> partitioner) { return new FlinkKafkaProducer011<>( topic, - new KeyedSerializationSchemaWrapper<>(serializationSchema), + serializationSchema, properties, partitioner); } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 50e4381..861e5d7 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import
[flink] branch master updated: [FLINK-17260] Embellish assert output of StreamingKafkaITCase to help debugging
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 50b6d9d [FLINK-17260] Embellish assert output of StreamingKafkaITCase to help debugging 50b6d9d is described below commit 50b6d9d5ffbcfbcc20c843e8758a0d2326b44ee1 Author: Aljoscha Krettek AuthorDate: Mon Jun 8 19:03:42 2020 +0200 [FLINK-17260] Embellish assert output of StreamingKafkaITCase to help debugging --- .../org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java index 5e64159..00d050b 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java @@ -163,10 +163,10 @@ public class StreamingKafkaITCase extends TestLogger { final List bees = filterMessages(messages, "bee"); final List giraffes = filterMessages(messages, "giraffe"); - Assert.assertEquals(Arrays.asList("elephant,27,64213"), elephants); - Assert.assertEquals(Arrays.asList("squirrel,52,66413"), squirrels); - Assert.assertEquals(Arrays.asList("bee,18,65647"), bees); - Assert.assertEquals(Arrays.asList("giraffe,9,6"), giraffes); + Assert.assertEquals(String.format("Messages from Kafka %s: %s", kafkaVersion, messages), Arrays.asList("elephant,27,64213"), elephants); + Assert.assertEquals(String.format("Messages from Kafka %s: %s", kafkaVersion, messages), Arrays.asList("squirrel,52,66413"), squirrels); + Assert.assertEquals(String.format("Messages from Kafka %s: %s", kafkaVersion, messages), Arrays.asList("bee,18,65647"), bees); + Assert.assertEquals(String.format("Messages from Kafka %s: %s", kafkaVersion, messages), Arrays.asList("giraffe,9,6"), giraffes); } } }
[flink] branch master updated (74fd291 -> 0e8a53b)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 74fd291 [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector add 0e8a53b [FLINK-13782][table-api] Implement type strategies for IF ELSE expression No new revisions were added by this update. Summary of changes: .../functions/BuiltInFunctionDefinitions.java | 11 +- .../table/types/inference/InputTypeStrategies.java | 24 +- ...eStrategy.java => CommonInputTypeStrategy.java} | 73 -- .../strategies/SubsequenceInputTypeStrategy.java | 256 + .../types/inference/InputTypeStrategiesTest.java | 3 +- .../SubsequenceInputTypeStrategyTest.java | 143 .../expressions/PlannerExpressionConverter.scala | 4 - .../flink/table/planner/expressions/logic.scala| 51 8 files changed, 487 insertions(+), 78 deletions(-) rename flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/{ArrayInputTypeStrategy.java => CommonInputTypeStrategy.java} (52%) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubsequenceInputTypeStrategy.java create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/SubsequenceInputTypeStrategyTest.java delete mode 100644 flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala
[flink] branch master updated (74fd291 -> 0e8a53b)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 74fd291 [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector add 0e8a53b [FLINK-13782][table-api] Implement type strategies for IF ELSE expression No new revisions were added by this update. Summary of changes: .../functions/BuiltInFunctionDefinitions.java | 11 +- .../table/types/inference/InputTypeStrategies.java | 24 +- ...eStrategy.java => CommonInputTypeStrategy.java} | 73 -- .../strategies/SubsequenceInputTypeStrategy.java | 256 + .../types/inference/InputTypeStrategiesTest.java | 3 +- .../SubsequenceInputTypeStrategyTest.java | 143 .../expressions/PlannerExpressionConverter.scala | 4 - .../flink/table/planner/expressions/logic.scala| 51 8 files changed, 487 insertions(+), 78 deletions(-) rename flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/{ArrayInputTypeStrategy.java => CommonInputTypeStrategy.java} (52%) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubsequenceInputTypeStrategy.java create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/SubsequenceInputTypeStrategyTest.java delete mode 100644 flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala
[flink] branch release-1.11 updated (5399e03 -> 663e9f3)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from 5399e03 [FLINK-17635][docs][table] Add documentation about view support new 8d9e3a0 [FLINK-18075] Remove deprecation of Kafka producer ctor that take SerializationSchema new 663e9f3 [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../connectors/kafka/FlinkKafkaProducerTest.java | 72 + .../connectors/kafka/KafkaTestEnvironmentImpl.java | 19 +++- .../connectors/kafka/FlinkKafkaProducerTest.java | 72 + .../connectors/kafka/KafkaTestEnvironmentImpl.java | 33 +- .../internals/KafkaSerializationSchemaWrapper.java | 95 + .../connectors/kafka/KafkaConsumerTestBase.java| 23 ++-- .../connectors/kafka/KafkaProducerTestBase.java| 11 +- .../kafka/KafkaShortRetentionTestBase.java | 3 +- .../connectors/kafka/KafkaTestEnvironment.java | 24 - .../connectors/kafka/testutils/DataGenerators.java | 7 +- .../connectors/kafka/FlinkKafkaProducer.java | 108 +++ .../connectors/kafka/FlinkKafkaProducerTest.java | 118 + .../connectors/kafka/KafkaTestEnvironmentImpl.java | 27 - .../connectors/kafka/table/KafkaTableITCase.java | 5 +- 14 files changed, 534 insertions(+), 83 deletions(-) create mode 100644 flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java create mode 100644 flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java create mode 100644 flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
[flink] 01/02: [FLINK-18075] Remove deprecation of Kafka producer ctor that take SerializationSchema
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 8d9e3a034963f00f657a0f6dc33b660c663f391f Author: Dawid Wysakowicz AuthorDate: Thu Jun 4 13:22:56 2020 +0200 [FLINK-18075] Remove deprecation of Kafka producer ctor that take SerializationSchema SerializationSchema is an important interface that is widely spread and used in other components such as Table API. It is also the most common interface for reusable interfaces. Therefore we should support it long term in our connectors. This commit removes the deprecation of ctors that take this interface. Moreover it adds the most general ctor that takes all producer configuration options along with SerializationSchema. This makes it feature equivalent with KafkaSerializationSchema in respect to configuration of the producer. --- .../connectors/kafka/FlinkKafkaProducer.java | 84 +++--- 1 file changed, 57 insertions(+), 27 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java index 25b359c..3bda1fe 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -288,16 +288,9 @@ public class FlinkKafkaProducer * ID of the Kafka topic. * @param serializationSchema * User defined (keyless) serialization schema. -* -* @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)} */ - @Deprecated public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema serializationSchema) { - this( - topicId, - new KeyedSerializationSchemaWrapper<>(serializationSchema), - getPropertiesFromBrokerList(brokerList), - Optional.of(new FlinkFixedPartitioner())); + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList)); } /** @@ -318,16 +311,41 @@ public class FlinkKafkaProducer * User defined key-less serialization schema. * @param producerConfig * Properties with the producer configuration. -* -* @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)} */ - @Deprecated public FlinkKafkaProducer(String topicId, SerializationSchema serializationSchema, Properties producerConfig) { + this(topicId, serializationSchema, producerConfig, Optional.of(new FlinkFixedPartitioner<>())); + } + + /** +* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to +* the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}. +* +* Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an +* attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka +* partitions in a round-robin fashion. +* +* @param topicId +* The topic to write data to +* @param serializationSchema +* A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[] +* @param producerConfig +* Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. +* @param customPartitioner +* A serializable partitioner for assigning messages to Kafka partitions. If a partitioner is not +* provided, records will be distributed to Kafka partitions in a round-robin fashion. +*/ + public FlinkKafkaProducer( + String topicId, + SerializationSchema serializationSchema, + Properties producerConfig, + Optional> customPartitioner) { this( topicId, - new KeyedSerializationSchemaWrapper<>(serializationSchema), + serializationSchema, producerConfig, - Optional.of(new FlinkFixedPartitioner())); + customPartitioner.orElse(null), +
[flink] 02/02: [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 663e9f3f0e903c9e20a9e786dd56364f6482b8f2 Author: Dawid Wysakowicz AuthorDate: Thu Jun 4 13:17:27 2020 +0200 [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector --- .../connectors/kafka/FlinkKafkaProducerTest.java | 72 + .../connectors/kafka/KafkaTestEnvironmentImpl.java | 19 +++- .../connectors/kafka/FlinkKafkaProducerTest.java | 72 + .../connectors/kafka/KafkaTestEnvironmentImpl.java | 33 +- .../internals/KafkaSerializationSchemaWrapper.java | 95 + .../connectors/kafka/KafkaConsumerTestBase.java| 23 ++-- .../connectors/kafka/KafkaProducerTestBase.java| 11 +- .../kafka/KafkaShortRetentionTestBase.java | 3 +- .../connectors/kafka/KafkaTestEnvironment.java | 24 - .../connectors/kafka/testutils/DataGenerators.java | 7 +- .../connectors/kafka/FlinkKafkaProducer.java | 28 ++--- .../connectors/kafka/FlinkKafkaProducerTest.java | 118 + .../connectors/kafka/KafkaTestEnvironmentImpl.java | 27 - .../connectors/kafka/table/KafkaTableITCase.java | 5 +- 14 files changed, 479 insertions(+), 58 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java new file mode 100644 index 000..2bd54f1 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link FlinkKafkaProducer010}. + */ +public class FlinkKafkaProducerTest { + @Test + public void testOpenProducer() throws Exception { + + OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema(); + FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010<>( + "localhost:9092", + "test-topic", + schema + ); + + OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>( + new StreamSink<>(kafkaProducer), + 1, + 1, + 0, + IntSerializer.INSTANCE, + new OperatorID(1, 1)); + + testHarness.open(); + + assertThat(schema.openCalled, equalTo(true)); + } + + private static class OpenTestingSerializationSchema implements SerializationSchema { + private boolean openCalled; + + @Override + public void open(InitializationContext context) throws Exception { + openCalled = true; + } + + @Override + public byte[] serialize(Integer element) { + return new byte[0]; + } + } +} diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 9ae751b..33abb05 100644 ---
[flink] 01/02: [FLINK-18075] Remove deprecation of Kafka producer ctor that take SerializationSchema
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit bebe50346d9485fa2d962c5ed1d00da2778c8feb Author: Dawid Wysakowicz AuthorDate: Thu Jun 4 13:22:56 2020 +0200 [FLINK-18075] Remove deprecation of Kafka producer ctor that take SerializationSchema SerializationSchema is an important interface that is widely spread and used in other components such as Table API. It is also the most common interface for reusable interfaces. Therefore we should support it long term in our connectors. This commit removes the deprecation of ctors that take this interface. Moreover it adds the most general ctor that takes all producer configuration options along with SerializationSchema. This makes it feature equivalent with KafkaSerializationSchema in respect to configuration of the producer. --- .../connectors/kafka/FlinkKafkaProducer.java | 84 +++--- 1 file changed, 57 insertions(+), 27 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java index 25b359c..3bda1fe 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -288,16 +288,9 @@ public class FlinkKafkaProducer * ID of the Kafka topic. * @param serializationSchema * User defined (keyless) serialization schema. -* -* @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)} */ - @Deprecated public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema serializationSchema) { - this( - topicId, - new KeyedSerializationSchemaWrapper<>(serializationSchema), - getPropertiesFromBrokerList(brokerList), - Optional.of(new FlinkFixedPartitioner())); + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList)); } /** @@ -318,16 +311,41 @@ public class FlinkKafkaProducer * User defined key-less serialization schema. * @param producerConfig * Properties with the producer configuration. -* -* @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)} */ - @Deprecated public FlinkKafkaProducer(String topicId, SerializationSchema serializationSchema, Properties producerConfig) { + this(topicId, serializationSchema, producerConfig, Optional.of(new FlinkFixedPartitioner<>())); + } + + /** +* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to +* the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}. +* +* Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an +* attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka +* partitions in a round-robin fashion. +* +* @param topicId +* The topic to write data to +* @param serializationSchema +* A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[] +* @param producerConfig +* Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. +* @param customPartitioner +* A serializable partitioner for assigning messages to Kafka partitions. If a partitioner is not +* provided, records will be distributed to Kafka partitions in a round-robin fashion. +*/ + public FlinkKafkaProducer( + String topicId, + SerializationSchema serializationSchema, + Properties producerConfig, + Optional> customPartitioner) { this( topicId, - new KeyedSerializationSchemaWrapper<>(serializationSchema), + serializationSchema, producerConfig, - Optional.of(new FlinkFixedPartitioner())); + customPartitioner.orElse(null), +
[flink] branch master updated (0358292 -> 74fd291)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 0358292 [FLINK-16572][e2e][pubsub] Acknowledge message in previous test new bebe503 [FLINK-18075] Remove deprecation of Kafka producer ctor that take SerializationSchema new 74fd291 [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../connectors/kafka/FlinkKafkaProducerTest.java | 72 + .../connectors/kafka/KafkaTestEnvironmentImpl.java | 19 +++- .../connectors/kafka/FlinkKafkaProducerTest.java | 72 + .../connectors/kafka/KafkaTestEnvironmentImpl.java | 33 +- .../internals/KafkaSerializationSchemaWrapper.java | 95 + .../connectors/kafka/KafkaConsumerTestBase.java| 23 ++-- .../connectors/kafka/KafkaProducerTestBase.java| 11 +- .../kafka/KafkaShortRetentionTestBase.java | 3 +- .../connectors/kafka/KafkaTestEnvironment.java | 24 - .../connectors/kafka/testutils/DataGenerators.java | 7 +- .../connectors/kafka/FlinkKafkaProducer.java | 108 +++ .../connectors/kafka/FlinkKafkaProducerTest.java | 118 + .../connectors/kafka/KafkaTestEnvironmentImpl.java | 27 - .../connectors/kafka/table/KafkaTableITCase.java | 5 +- 14 files changed, 534 insertions(+), 83 deletions(-) create mode 100644 flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java create mode 100644 flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java create mode 100644 flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
[flink] 02/02: [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 74fd2912d146cc2db103825f0aee2282a99b5774 Author: Dawid Wysakowicz AuthorDate: Thu Jun 4 13:17:27 2020 +0200 [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector --- .../connectors/kafka/FlinkKafkaProducerTest.java | 72 + .../connectors/kafka/KafkaTestEnvironmentImpl.java | 19 +++- .../connectors/kafka/FlinkKafkaProducerTest.java | 72 + .../connectors/kafka/KafkaTestEnvironmentImpl.java | 33 +- .../internals/KafkaSerializationSchemaWrapper.java | 95 + .../connectors/kafka/KafkaConsumerTestBase.java| 23 ++-- .../connectors/kafka/KafkaProducerTestBase.java| 11 +- .../kafka/KafkaShortRetentionTestBase.java | 3 +- .../connectors/kafka/KafkaTestEnvironment.java | 24 - .../connectors/kafka/testutils/DataGenerators.java | 7 +- .../connectors/kafka/FlinkKafkaProducer.java | 28 ++--- .../connectors/kafka/FlinkKafkaProducerTest.java | 118 + .../connectors/kafka/KafkaTestEnvironmentImpl.java | 27 - .../connectors/kafka/table/KafkaTableITCase.java | 5 +- 14 files changed, 479 insertions(+), 58 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java new file mode 100644 index 000..2bd54f1 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link FlinkKafkaProducer010}. + */ +public class FlinkKafkaProducerTest { + @Test + public void testOpenProducer() throws Exception { + + OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema(); + FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010<>( + "localhost:9092", + "test-topic", + schema + ); + + OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>( + new StreamSink<>(kafkaProducer), + 1, + 1, + 0, + IntSerializer.INSTANCE, + new OperatorID(1, 1)); + + testHarness.open(); + + assertThat(schema.openCalled, equalTo(true)); + } + + private static class OpenTestingSerializationSchema implements SerializationSchema { + private boolean openCalled; + + @Override + public void open(InitializationContext context) throws Exception { + openCalled = true; + } + + @Override + public byte[] serialize(Integer element) { + return new byte[0]; + } + } +} diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 9ae751b..33abb05 100644 ---
[flink] branch master updated (3952bfa -> 0358292)
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 3952bfa [FLINK-17635][docs][table] Add documentation about view support add a08061b [hotfix][pubsub] Use TestLogger add 0358292 [FLINK-16572][e2e][pubsub] Acknowledge message in previous test No new revisions were added by this update. Summary of changes: .../pom.xml| 7 ++- .../gcp/pubsub/CheckPubSubEmulatorTest.java| 23 +- .../gcp/pubsub/emulator/GCloudUnitTestBase.java| 4 +++- 3 files changed, 18 insertions(+), 16 deletions(-)
[flink] branch master updated (3952bfa -> 0358292)
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 3952bfa [FLINK-17635][docs][table] Add documentation about view support add a08061b [hotfix][pubsub] Use TestLogger add 0358292 [FLINK-16572][e2e][pubsub] Acknowledge message in previous test No new revisions were added by this update. Summary of changes: .../pom.xml| 7 ++- .../gcp/pubsub/CheckPubSubEmulatorTest.java| 23 +- .../gcp/pubsub/emulator/GCloudUnitTestBase.java| 4 +++- 3 files changed, 18 insertions(+), 16 deletions(-)
[flink-training] branch master updated (22420e1 -> 163558d)
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git. from 22420e1 [hotfix] use correct main class references new b7fecd4 [hotfix][training] git-ignore eclipse build files new 163558d [FLINK-18178][build] fix Eclipse import not using flinkShadowJar dependencies The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .gitignore | 10 -- build.gradle | 7 +++ 2 files changed, 15 insertions(+), 2 deletions(-)
[flink-training] 02/02: [FLINK-18178][build] fix Eclipse import not using flinkShadowJar dependencies
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git commit 163558d39bc2c15796d180dc5ae8a7943d608679 Author: Nico Kruber AuthorDate: Mon Jun 8 14:05:56 2020 +0200 [FLINK-18178][build] fix Eclipse import not using flinkShadowJar dependencies --- build.gradle | 7 +++ 1 file changed, 7 insertions(+) diff --git a/build.gradle b/build.gradle index 5e54c95..5aae798 100644 --- a/build.gradle +++ b/build.gradle @@ -27,6 +27,7 @@ subprojects { } apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'checkstyle' +apply plugin: 'eclipse' // artifact properties group = 'org.apache.flink' @@ -113,6 +114,12 @@ subprojects { javadoc.classpath += configurations.flinkShadowJar } +eclipse { +classpath { +plusConfigurations += [configurations.flinkShadowJar] +} +} + if (plugins.findPlugin('application')) { applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"] run.classpath = sourceSets.main.runtimeClasspath
[flink-training] 01/02: [hotfix][training] git-ignore eclipse build files
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git commit b7fecd43861cbb7ef15c55f5887cc121f4b18acc Author: Nico Kruber AuthorDate: Mon Jun 8 12:33:37 2020 +0200 [hotfix][training] git-ignore eclipse build files --- .gitignore | 10 -- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index ba34970..d47fbf8 100644 --- a/.gitignore +++ b/.gitignore @@ -10,9 +10,15 @@ # Debugger .attach_* +# Eclipse +.project +.settings +.classpath +bin/ + # Gradle build process files -**/.gradle/**/* -**/build/**/* +/.gradle/ +build/ **/.gradletasknamecache # IntelliJ
[flink-training] branch master updated: [hotfix] use correct main class references
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git The following commit(s) were added to refs/heads/master by this push: new 22420e1 [hotfix] use correct main class references 22420e1 is described below commit 22420e10bc970eea0eb84379fbf68a7d0d0eac78 Author: David Anderson AuthorDate: Mon Jun 8 12:31:46 2020 +0200 [hotfix] use correct main class references --- hourly-tips/build.gradle | 2 +- long-ride-alerts/build.gradle | 2 +- ride-cleansing/build.gradle | 2 +- rides-and-fares/build.gradle | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hourly-tips/build.gradle b/hourly-tips/build.gradle index e0b929e..11df511 100644 --- a/hourly-tips/build.gradle +++ b/hourly-tips/build.gradle @@ -1 +1 @@ -mainClassName = 'org.apache.flink.training.exercises.windows.hourlytips.HourlyTipsExercise' +mainClassName = 'org.apache.flink.training.exercises.hourlytips.HourlyTipsExercise' diff --git a/long-ride-alerts/build.gradle b/long-ride-alerts/build.gradle index 61aa69b..62e4ffd 100644 --- a/long-ride-alerts/build.gradle +++ b/long-ride-alerts/build.gradle @@ -1 +1 @@ -mainClassName = 'org.apache.flink.training.exercises.process.longrides.LongRidesExercise' +mainClassName = 'org.apache.flink.training.exercises.longrides.LongRidesExercise' diff --git a/ride-cleansing/build.gradle b/ride-cleansing/build.gradle index 2a9d0aa..106e3b9 100644 --- a/ride-cleansing/build.gradle +++ b/ride-cleansing/build.gradle @@ -1 +1 @@ -mainClassName = 'org.apache.flink.training.exercises.basics.ridecleansing.RideCleansingExercise' +mainClassName = 'org.apache.flink.training.exercises.ridecleansing.RideCleansingExercise' diff --git a/rides-and-fares/build.gradle b/rides-and-fares/build.gradle index 1d013ef..18366af 100644 --- a/rides-and-fares/build.gradle +++ b/rides-and-fares/build.gradle @@ -1 +1 @@ -mainClassName = 'org.apache.flink.training.exercises.state.ridesandfares.RidesAndFaresExercise' +mainClassName = 'org.apache.flink.training.exercises.ridesandfares.RidesAndFaresExercise'
[flink] branch release-1.11 updated: [FLINK-17635][docs][table] Add documentation about view support
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 5399e03 [FLINK-17635][docs][table] Add documentation about view support 5399e03 is described below commit 5399e0315698778e579dcf6557f14e74e08dcc2e Author: Leonard Xu AuthorDate: Mon Jun 8 19:52:21 2020 +0800 [FLINK-17635][docs][table] Add documentation about view support This closes #12502 --- docs/dev/table/sql/create.md | 20 docs/dev/table/sql/create.zh.md | 20 docs/dev/table/sql/drop.md | 20 docs/dev/table/sql/drop.zh.md| 20 docs/dev/table/sql/index.md | 4 ++-- docs/dev/table/sql/index.zh.md | 4 ++-- docs/dev/table/sql/queries.md| 22 +- docs/dev/table/sql/queries.zh.md | 22 +- 8 files changed, 126 insertions(+), 6 deletions(-) diff --git a/docs/dev/table/sql/create.md b/docs/dev/table/sql/create.md index d3e454d..dc799bb 100644 --- a/docs/dev/table/sql/create.md +++ b/docs/dev/table/sql/create.md @@ -31,6 +31,7 @@ Flink SQL supports the following CREATE statements for now: - CREATE TABLE - CREATE DATABASE +- CREATE VIEW - CREATE FUNCTION ## Run a CREATE statement @@ -351,6 +352,25 @@ The key and value of expression `key1=val1` should both be string literal. {% top %} +## CREATE VIEW +{% highlight sql %} +CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name + [{columnName [, columnName ]* }] [COMMENT view_comment] + AS query_expression +{% endhighlight %} + +Create a view with the given query expression. If a view with the same name already exists in the catalog, an exception is thrown. + +**TEMPORARY** + +Create temporary view that has catalog and database namespaces and overrides views. + +**IF NOT EXISTS** + +If the view already exists, nothing happens. + +{% top %} + ## CREATE FUNCTION {% highlight sql%} CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION diff --git a/docs/dev/table/sql/create.zh.md b/docs/dev/table/sql/create.zh.md index d4f9d79..8348d52 100644 --- a/docs/dev/table/sql/create.zh.md +++ b/docs/dev/table/sql/create.zh.md @@ -31,6 +31,7 @@ CREATE 语句用于向当前或指定的 [Catalog]({{ site.baseurl }}/zh/dev/tab - CREATE TABLE - CREATE DATABASE +- CREATE VIEW - CREATE FUNCTION ## 执行 CREATE 语句 @@ -344,6 +345,25 @@ CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name {% top %} +## CREATE VIEW +{% highlight sql %} +CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name + [{columnName [, columnName ]* }] [COMMENT view_comment] + AS query_expression +{% endhighlight %} + +根据给定的 query 语句创建一个视图。若数据库中已经存在同名视图会抛出异常. + +**TEMPORARY** + +创建一个有 catalog 和数据库命名空间的临时视图,并覆盖原有的视图。 + +**IF NOT EXISTS** + +若该视图已经存在,则不会进行任何操作。 + +{% top %} + ## CREATE FUNCTION {% highlight sql%} CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION diff --git a/docs/dev/table/sql/drop.md b/docs/dev/table/sql/drop.md index 6f340e8..bcaea0b 100644 --- a/docs/dev/table/sql/drop.md +++ b/docs/dev/table/sql/drop.md @@ -31,6 +31,7 @@ Flink SQL supports the following DROP statements for now: - DROP TABLE - DROP DATABASE +- DROP VIEW - DROP FUNCTION ## Run a DROP statement @@ -143,6 +144,25 @@ Dropping a non-empty database triggers an exception. Enabled by default. Dropping a non-empty database also drops all associated tables and functions. +## DROP VIEW + +{% highlight sql %} +DROP [TEMPORARY] VIEW [IF EXISTS] [catalog_name.][db_name.]view_name +{% endhighlight %} + +Drop a view that has catalog and database namespaces. If the view to drop does not exist, an exception is thrown. + +**TEMPORARY** + +Drop temporary view that has catalog and database namespaces. + +**IF EXISTS** + +If the view does not exist, nothing happens. + +**MAINTAIN DEPENDENCIES** +Flink does not maintain dependencies of view by CASCADE/RESTRICT keywords, the current way is producing postpone error message when user tries to use the view under the scenarios like the underlying table of view has been dropped. + ## DROP FUNCTION {% highlight sql%} diff --git a/docs/dev/table/sql/drop.zh.md b/docs/dev/table/sql/drop.zh.md index 51ac456..8d1b76d 100644 --- a/docs/dev/table/sql/drop.zh.md +++ b/docs/dev/table/sql/drop.zh.md @@ -31,6 +31,7 @@ Flink SQL 目前支持以下 DROP 语句: - DROP TABLE - DROP DATABASE +- DROP VIEW - DROP FUNCTION ## 执行 DROP 语句 @@ -143,6 +144,25 @@ DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ] 删除一个非空数据库时,把相关联的表与函数一并删除。 +## DROP VIEW + +{% highlight sql %} +DROP [TEMPORARY] VIEW [IF EXISTS] [catalog_name.][db_name.]view_name +{% endhighlight %} + +删除一个有 catalog 和数据库命名空间的视图。若需要删除的视图不存在,则会产生异常。 + +**TEMPORARY** + +删除一个有 catalog 和数据库命名空间的临时视图。 + +**IF EXISTS** + +若视图不存在,则不会进行任何操作。 + +**依赖管理** +Flink 没有使用
[flink] branch master updated: [FLINK-17635][docs][table] Add documentation about view support
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3952bfa [FLINK-17635][docs][table] Add documentation about view support 3952bfa is described below commit 3952bfa029df4d278a8f694d3e89db9de398006b Author: Leonard Xu AuthorDate: Mon Jun 8 19:52:21 2020 +0800 [FLINK-17635][docs][table] Add documentation about view support This closes #12502 --- docs/dev/table/sql/create.md | 20 docs/dev/table/sql/create.zh.md | 20 docs/dev/table/sql/drop.md | 20 docs/dev/table/sql/drop.zh.md| 20 docs/dev/table/sql/index.md | 4 ++-- docs/dev/table/sql/index.zh.md | 4 ++-- docs/dev/table/sql/queries.md| 22 +- docs/dev/table/sql/queries.zh.md | 22 +- 8 files changed, 126 insertions(+), 6 deletions(-) diff --git a/docs/dev/table/sql/create.md b/docs/dev/table/sql/create.md index d3e454d..dc799bb 100644 --- a/docs/dev/table/sql/create.md +++ b/docs/dev/table/sql/create.md @@ -31,6 +31,7 @@ Flink SQL supports the following CREATE statements for now: - CREATE TABLE - CREATE DATABASE +- CREATE VIEW - CREATE FUNCTION ## Run a CREATE statement @@ -351,6 +352,25 @@ The key and value of expression `key1=val1` should both be string literal. {% top %} +## CREATE VIEW +{% highlight sql %} +CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name + [{columnName [, columnName ]* }] [COMMENT view_comment] + AS query_expression +{% endhighlight %} + +Create a view with the given query expression. If a view with the same name already exists in the catalog, an exception is thrown. + +**TEMPORARY** + +Create temporary view that has catalog and database namespaces and overrides views. + +**IF NOT EXISTS** + +If the view already exists, nothing happens. + +{% top %} + ## CREATE FUNCTION {% highlight sql%} CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION diff --git a/docs/dev/table/sql/create.zh.md b/docs/dev/table/sql/create.zh.md index d4f9d79..8348d52 100644 --- a/docs/dev/table/sql/create.zh.md +++ b/docs/dev/table/sql/create.zh.md @@ -31,6 +31,7 @@ CREATE 语句用于向当前或指定的 [Catalog]({{ site.baseurl }}/zh/dev/tab - CREATE TABLE - CREATE DATABASE +- CREATE VIEW - CREATE FUNCTION ## 执行 CREATE 语句 @@ -344,6 +345,25 @@ CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name {% top %} +## CREATE VIEW +{% highlight sql %} +CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name + [{columnName [, columnName ]* }] [COMMENT view_comment] + AS query_expression +{% endhighlight %} + +根据给定的 query 语句创建一个视图。若数据库中已经存在同名视图会抛出异常. + +**TEMPORARY** + +创建一个有 catalog 和数据库命名空间的临时视图,并覆盖原有的视图。 + +**IF NOT EXISTS** + +若该视图已经存在,则不会进行任何操作。 + +{% top %} + ## CREATE FUNCTION {% highlight sql%} CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION diff --git a/docs/dev/table/sql/drop.md b/docs/dev/table/sql/drop.md index 6f340e8..bcaea0b 100644 --- a/docs/dev/table/sql/drop.md +++ b/docs/dev/table/sql/drop.md @@ -31,6 +31,7 @@ Flink SQL supports the following DROP statements for now: - DROP TABLE - DROP DATABASE +- DROP VIEW - DROP FUNCTION ## Run a DROP statement @@ -143,6 +144,25 @@ Dropping a non-empty database triggers an exception. Enabled by default. Dropping a non-empty database also drops all associated tables and functions. +## DROP VIEW + +{% highlight sql %} +DROP [TEMPORARY] VIEW [IF EXISTS] [catalog_name.][db_name.]view_name +{% endhighlight %} + +Drop a view that has catalog and database namespaces. If the view to drop does not exist, an exception is thrown. + +**TEMPORARY** + +Drop temporary view that has catalog and database namespaces. + +**IF EXISTS** + +If the view does not exist, nothing happens. + +**MAINTAIN DEPENDENCIES** +Flink does not maintain dependencies of view by CASCADE/RESTRICT keywords, the current way is producing postpone error message when user tries to use the view under the scenarios like the underlying table of view has been dropped. + ## DROP FUNCTION {% highlight sql%} diff --git a/docs/dev/table/sql/drop.zh.md b/docs/dev/table/sql/drop.zh.md index 51ac456..8d1b76d 100644 --- a/docs/dev/table/sql/drop.zh.md +++ b/docs/dev/table/sql/drop.zh.md @@ -31,6 +31,7 @@ Flink SQL 目前支持以下 DROP 语句: - DROP TABLE - DROP DATABASE +- DROP VIEW - DROP FUNCTION ## 执行 DROP 语句 @@ -143,6 +144,25 @@ DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ] 删除一个非空数据库时,把相关联的表与函数一并删除。 +## DROP VIEW + +{% highlight sql %} +DROP [TEMPORARY] VIEW [IF EXISTS] [catalog_name.][db_name.]view_name +{% endhighlight %} + +删除一个有 catalog 和数据库命名空间的视图。若需要删除的视图不存在,则会产生异常。 + +**TEMPORARY** + +删除一个有 catalog 和数据库命名空间的临时视图。 + +**IF EXISTS** + +若视图不存在,则不会进行任何操作。 + +**依赖管理** +Flink 没有使用 CASCADE /
[flink] branch master updated (88e416f -> b8c3033)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 88e416f [FLINK-18046][hive] Decimal column stats not supported for Hive table add 4a04a71 [FLINK-17287][github] Disable merge commit button add b8c3033 [FLINK-17512] Add notification settings to .asf.yaml No new revisions were added by this update. Summary of changes: .asf.yaml| 10 ++ tools/releasing/create_source_release.sh | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) create mode 100644 .asf.yaml
[flink] branch master updated (88e416f -> b8c3033)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 88e416f [FLINK-18046][hive] Decimal column stats not supported for Hive table add 4a04a71 [FLINK-17287][github] Disable merge commit button add b8c3033 [FLINK-17512] Add notification settings to .asf.yaml No new revisions were added by this update. Summary of changes: .asf.yaml| 10 ++ tools/releasing/create_source_release.sh | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) create mode 100644 .asf.yaml
[flink] branch release-1.11 updated: [FLINK-18046][hive] Decimal column stats not supported for Hive table
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 406b5d2 [FLINK-18046][hive] Decimal column stats not supported for Hive table 406b5d2 is described below commit 406b5d24404c3f72ce84d8308f47aa76246000d3 Author: Rui Li AuthorDate: Mon Jun 8 18:41:11 2020 +0800 [FLINK-18046][hive] Decimal column stats not supported for Hive table This closes #12424 --- .../table/catalog/hive/util/HiveStatsUtil.java | 51 ++ .../catalog/hive/HiveCatalogHiveMetadataTest.java | 10 +++-- 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java index 9fd4394..d4ae6d2 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java @@ -31,12 +31,15 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Decimal; +import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; @@ -50,6 +53,9 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -171,6 +177,20 @@ public class HiveStatsUtil { stringStats.isSetAvgColLen() ? stringStats.getAvgColLen() : null, stringStats.isSetNumDVs() ? stringStats.getNumDVs() : null, stringStats.isSetNumDVs() ? stringStats.getNumNulls() : null); + } else if (stats.isSetDecimalStats()) { + DecimalColumnStatsData decimalStats = stats.getDecimalStats(); + // for now, just return CatalogColumnStatisticsDataDouble for decimal columns + Double max = null; + if (decimalStats.isSetHighValue()) { + max = toHiveDecimal(decimalStats.getHighValue()).doubleValue(); + } + Double min = null; + if (decimalStats.isSetLowValue()) { + min = toHiveDecimal(decimalStats.getLowValue()).doubleValue(); + } + Long ndv = decimalStats.isSetNumDVs() ? decimalStats.getNumDVs() : null; + Long nullCount = decimalStats.isSetNumNulls() ? decimalStats.getNumNulls() : null; + return new CatalogColumnStatisticsDataDouble(min, max, ndv, nullCount); } else { LOG.warn("Flink does not support converting ColumnStatisticsData '{}' for Hive column type '{}' yet.", stats, colType); return null; @@ -288,11 +308,42 @@ public class HiveStatsUtil { } return ColumnStatisticsData.binaryStats(hiveBinaryColumnStats); } + } else if (type.equals(LogicalTypeRoot.DECIMAL)) { + if (colStat instanceof CatalogColumnStatisticsDataDouble) { + CatalogColumnStatisticsDataDouble flinkStats = (CatalogColumnStatisticsDataDouble) colStat; + DecimalColumnStatsData hiveStats = new DecimalColumnStatsData(); + if (flinkStats.getMax() != null) { + // in older versions we cannot create HiveDecimal from Double, so convert Double to BigDecimal first +
[flink] branch master updated: [FLINK-18046][hive] Decimal column stats not supported for Hive table
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 88e416f [FLINK-18046][hive] Decimal column stats not supported for Hive table 88e416f is described below commit 88e416f988bc286b7cc0df7bcf0679184e7bb805 Author: Rui Li AuthorDate: Mon Jun 8 18:41:11 2020 +0800 [FLINK-18046][hive] Decimal column stats not supported for Hive table This closes #12424 --- .../table/catalog/hive/util/HiveStatsUtil.java | 51 ++ .../catalog/hive/HiveCatalogHiveMetadataTest.java | 10 +++-- 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java index 9fd4394..d4ae6d2 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java @@ -31,12 +31,15 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Decimal; +import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; @@ -50,6 +53,9 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -171,6 +177,20 @@ public class HiveStatsUtil { stringStats.isSetAvgColLen() ? stringStats.getAvgColLen() : null, stringStats.isSetNumDVs() ? stringStats.getNumDVs() : null, stringStats.isSetNumDVs() ? stringStats.getNumNulls() : null); + } else if (stats.isSetDecimalStats()) { + DecimalColumnStatsData decimalStats = stats.getDecimalStats(); + // for now, just return CatalogColumnStatisticsDataDouble for decimal columns + Double max = null; + if (decimalStats.isSetHighValue()) { + max = toHiveDecimal(decimalStats.getHighValue()).doubleValue(); + } + Double min = null; + if (decimalStats.isSetLowValue()) { + min = toHiveDecimal(decimalStats.getLowValue()).doubleValue(); + } + Long ndv = decimalStats.isSetNumDVs() ? decimalStats.getNumDVs() : null; + Long nullCount = decimalStats.isSetNumNulls() ? decimalStats.getNumNulls() : null; + return new CatalogColumnStatisticsDataDouble(min, max, ndv, nullCount); } else { LOG.warn("Flink does not support converting ColumnStatisticsData '{}' for Hive column type '{}' yet.", stats, colType); return null; @@ -288,11 +308,42 @@ public class HiveStatsUtil { } return ColumnStatisticsData.binaryStats(hiveBinaryColumnStats); } + } else if (type.equals(LogicalTypeRoot.DECIMAL)) { + if (colStat instanceof CatalogColumnStatisticsDataDouble) { + CatalogColumnStatisticsDataDouble flinkStats = (CatalogColumnStatisticsDataDouble) colStat; + DecimalColumnStatsData hiveStats = new DecimalColumnStatsData(); + if (flinkStats.getMax() != null) { + // in older versions we cannot create HiveDecimal from Double, so convert Double to BigDecimal first +
[flink] branch master updated (ca89635 -> 88e416f)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ca89635 [FLINK-17406][doc] Add documentation about dynamic table options add 88e416f [FLINK-18046][hive] Decimal column stats not supported for Hive table No new revisions were added by this update. Summary of changes: .../table/catalog/hive/util/HiveStatsUtil.java | 51 ++ .../catalog/hive/HiveCatalogHiveMetadataTest.java | 10 +++-- 2 files changed, 58 insertions(+), 3 deletions(-)
[flink] branch master updated (0b7c23e -> ca89635)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 0b7c23e [FLINK-17776][hive][doc] Add documentation for DDL in hive dialect add ca89635 [FLINK-17406][doc] Add documentation about dynamic table options No new revisions were added by this update. Summary of changes: docs/dev/table/config.md | 6 +++ docs/dev/table/config.zh.md | 6 +++ docs/dev/table/sql/hints.md | 88 docs/dev/table/sql/hints.zh.md | 88 docs/dev/table/sql/index.md | 1 + docs/dev/table/sql/index.zh.md | 1 + docs/dev/table/sql/queries.md| 11 - docs/dev/table/sql/queries.zh.md | 11 - 8 files changed, 210 insertions(+), 2 deletions(-) create mode 100644 docs/dev/table/sql/hints.md create mode 100644 docs/dev/table/sql/hints.zh.md
[flink] branch release-1.11 updated: [FLINK-17406][doc] Add documentation about dynamic table options
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 256bd06 [FLINK-17406][doc] Add documentation about dynamic table options 256bd06 is described below commit 256bd06c12ca2cde9c3038f1a66b313313c00e92 Author: Danny Chan AuthorDate: Mon Jun 8 16:27:52 2020 +0800 [FLINK-17406][doc] Add documentation about dynamic table options This closes #12319 --- docs/dev/table/config.md | 6 +++ docs/dev/table/config.zh.md | 6 +++ docs/dev/table/sql/hints.md | 88 docs/dev/table/sql/hints.zh.md | 88 docs/dev/table/sql/index.md | 1 + docs/dev/table/sql/index.zh.md | 1 + docs/dev/table/sql/queries.md| 11 - docs/dev/table/sql/queries.zh.md | 11 - 8 files changed, 210 insertions(+), 2 deletions(-) diff --git a/docs/dev/table/config.md b/docs/dev/table/config.md index dd312d6..68a73ba 100644 --- a/docs/dev/table/config.md +++ b/docs/dev/table/config.md @@ -104,3 +104,9 @@ The following options can be used to tune the performance of the query execution The following options can be used to adjust the behavior of the query optimizer to get a better execution plan. {% include generated/optimizer_config_configuration.html %} + +### Table Options + +The following options can be used to adjust the behavior of the table planner. + +{% include generated/table_config_configuration.html %} diff --git a/docs/dev/table/config.zh.md b/docs/dev/table/config.zh.md index 9d325e6..3c41bdc 100644 --- a/docs/dev/table/config.zh.md +++ b/docs/dev/table/config.zh.md @@ -96,3 +96,9 @@ configuration.set_string("table.exec.mini-batch.size", "5000"); 以下配置可以用于调整查询优化器的行为以获得更好的执行计划。 {% include generated/optimizer_config_configuration.html %} + +### Planner 配置 + +以下配置可以用于调整 planner 的行为。 + +{% include generated/table_config_configuration.html %} diff --git a/docs/dev/table/sql/hints.md b/docs/dev/table/sql/hints.md new file mode 100644 index 000..d8c4c90 --- /dev/null +++ b/docs/dev/table/sql/hints.md @@ -0,0 +1,88 @@ +--- +title: "SQL Hints" +nav-parent_id: sql +nav-pos: 6 +--- + + +* This will be replaced by the TOC +{:toc} + +SQL hints can be used with SQL statements to alter execution plans. This chapter explains how to use hints to force various approaches. + +Generally a hint can be used to: + +- Enforce planner: there's no perfect planner, so it makes sense to implement hints to +allow user better control the execution; +- Append meta data(or statistics): some statistics like “table index for scan” and +“skew info of some shuffle keys” are somewhat dynamic for the query, it would be very +convenient to config them with hints because our planning metadata from the planner is very +often not that accurate; +- Operator resource constraints: for many cases, we would give a default resource +configuration for the execution operators, i.e. min parallelism or +managed memory (resource consuming UDF) or special resource requirement (GPU or SSD disk) +and so on, it would be very flexible to profile the resource with hints per query(instead of the Job). + +## Dynamic Table Options +Dynamic table options allows to specify or override table options dynamically, different with static table options defined with SQL DDL or connect API, +these options can be specified flexibly in per-table scope within each query. + +Thus it is very suitable to use for the ad-hoc queries in interactive terminal, for example, in the SQL-CLI, +you can specify to ignore the parse error for a CSV source just by adding a dynamic option `/*+ OPTIONS('csv.ignore-parse-errors'='true') */`. + +Note: Dynamic table options default is forbidden to use because it may change the semantics of the query. +You need to set the config option `table.dynamic-table-options.enabled` to be `true` explicitly (default is false), +See the Configuration for details on how to set up the config options. + +### Syntax +In order to not break the SQL compatibility, we use the Oracle style SQL hint syntax: +{% highlight sql %} +table_path /*+ OPTIONS(key=val [, key=val]*) */ + +key: +stringLiteral +val: +stringLiteral + +{% endhighlight %} + +### Examples + +{% highlight sql %} + +CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...); +CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...); + +-- override table options in query source +select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; + +-- override table options in join +select * from +kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1 +join +kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2 +on t1.id = t2.id; + +-- override table
[flink] branch release-1.11 updated: [FLINK-17776][hive][doc] Add documentation for DDL in hive dialect
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 11eb83f [FLINK-17776][hive][doc] Add documentation for DDL in hive dialect 11eb83f is described below commit 11eb83f13d49cf5db02cf0db011a0121912f9331 Author: Rui Li AuthorDate: Mon Jun 8 16:21:22 2020 +0800 [FLINK-17776][hive][doc] Add documentation for DDL in hive dialect This closes #12439 --- docs/dev/table/hive/hive_dialect.md| 347 + docs/dev/table/hive/hive_dialect.zh.md | 347 + docs/dev/table/hive/index.md | 3 +- 3 files changed, 696 insertions(+), 1 deletion(-) diff --git a/docs/dev/table/hive/hive_dialect.md b/docs/dev/table/hive/hive_dialect.md new file mode 100644 index 000..3ac1177 --- /dev/null +++ b/docs/dev/table/hive/hive_dialect.md @@ -0,0 +1,347 @@ +--- +title: "Hive Dialect" +nav-parent_id: hive_tableapi +nav-pos: 1 +--- + + +Starting from 1.11.0, Flink allows users to write SQL statements in Hive syntax when Hive dialect +is used. By providing compatibility with Hive syntax, we aim to improve the interoperability with +Hive and reduce the scenarios when users need to switch between Flink and Hive in order to execute +different statements. + +* This will be replaced by the TOC +{:toc} + +## Use Hive Dialect + +Flink currently supports two SQL dialects: `default` and `hive`. You need to switch to Hive dialect +before you can write in Hive syntax. The following describes how to set dialect with +SQL Client and Table API. Also notice that you can dynamically switch dialect for each +statement you execute. There's no need to restart a session to use a different dialect. + +### SQL Client + +SQL dialect can be specified via the `table.sql-dialect` property. Therefore you can set the initial dialect to use in +the `configuration` section of the yaml file for your SQL Client. + +{% highlight yaml %} + +execution: + planner: blink + type: batch + result-mode: table + +configuration: + table.sql-dialect: hive + +{% endhighlight %} + +You can also set the dialect after the SQL Client has launched. + +{% highlight bash %} + +Flink SQL> set table.sql-dialect=hive; -- to use hive dialect +[INFO] Session property has been set. + +Flink SQL> set table.sql-dialect=default; -- to use default dialect +[INFO] Session property has been set. + +{% endhighlight %} + +### Table API + +You can set dialect for your TableEnvironment with Table API. + +{% highlight java %} + +EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build(); +TableEnvironment tableEnv = TableEnvironment.create(settings); +// to use hive dialect +tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); +// to use default dialect +tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + +{% endhighlight %} + +## DDL + +This section lists the supported DDLs with the Hive dialect. We'll mainly focus on the syntax +here. You can refer to [Hive doc](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL) +for the semantics of each DDL statement. + +### DATABASE + + Show + +{% highlight sql %} +SHOW DATABASES; +{% endhighlight %} + + Create + +{% highlight sql %} +CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name + [COMMENT database_comment] + [LOCATION fs_path] + [WITH DBPROPERTIES (property_name=property_value, ...)]; +{% endhighlight %} + + Alter + +# Update Properties + +{% highlight sql %} +ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...); +{% endhighlight %} + +# Update Owner + +{% highlight sql %} +ALTER (DATABASE|SCHEMA) database_name SET OWNER [USER|ROLE] user_or_role; +{% endhighlight %} + +# Update Location + +{% highlight sql %} +ALTER (DATABASE|SCHEMA) database_name SET LOCATION fs_path; +{% endhighlight %} + + Drop + +{% highlight sql %} +DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT|CASCADE]; +{% endhighlight %} + + Use + +{% highlight sql %} +USE database_name; +{% endhighlight %} + +### TABLE + + Show + +{% highlight sql %} +SHOW TABLES; +{% endhighlight %} + + Create + +{% highlight sql %} +CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name + [(col_name data_type [column_constraint] [COMMENT col_comment], ... [table_constraint])] + [COMMENT table_comment] + [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] + [ +[ROW FORMAT row_format] +[STORED AS file_format] + ] + [LOCATION fs_path] + [TBLPROPERTIES (property_name=property_value, ...)] + +row_format: + : DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char] + [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char] + [NULL DEFINED AS char] + | SERDE
[flink] branch master updated (f2dd4b8 -> 0b7c23e)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f2dd4b8 [FLINK-18050][task][checkpointing] Simplify ChannelStateCheckpointWriter interface add 0b7c23e [FLINK-17776][hive][doc] Add documentation for DDL in hive dialect No new revisions were added by this update. Summary of changes: docs/dev/table/hive/hive_dialect.md| 347 + docs/dev/table/hive/hive_dialect.zh.md | 347 + docs/dev/table/hive/index.md | 3 +- 3 files changed, 696 insertions(+), 1 deletion(-) create mode 100644 docs/dev/table/hive/hive_dialect.md create mode 100644 docs/dev/table/hive/hive_dialect.zh.md
[flink] branch master updated (44af789 -> f2dd4b8)
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 44af789 [FLINK-16350] Support Zookeeper 3.5 in test_ha_per_job_cluster_datastream.sh add ed7b0b1 [FLINK-18050][task][checkpointing] Use CloseableIterator to write ResultSubpartition state add f2dd4b8 [FLINK-18050][task][checkpointing] Simplify ChannelStateCheckpointWriter interface No new revisions were added by this update. Summary of changes: .../channel/ChannelStateCheckpointWriter.java | 16 +++-- .../channel/ChannelStateWriteRequest.java | 22 +-- .../checkpoint/channel/ChannelStateWriterImpl.java | 23 +--- ...ChannelStateWriteRequestDispatcherImplTest.java | 69 ++ .../ChannelStateWriteRequestDispatcherTest.java| 2 +- 5 files changed, 94 insertions(+), 38 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java
[flink] branch release-1.11 updated: [FLINK-16350] Support Zookeeper 3.5 in test_ha_per_job_cluster_datastream.sh
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new e604784 [FLINK-16350] Support Zookeeper 3.5 in test_ha_per_job_cluster_datastream.sh e604784 is described below commit e60478429fe09df9126022eca0a756203a3dc17b Author: Till Rohrmann AuthorDate: Fri Jun 5 16:29:31 2020 +0200 [FLINK-16350] Support Zookeeper 3.5 in test_ha_per_job_cluster_datastream.sh This closes #12504. --- flink-end-to-end-tests/test-scripts/test_ha_datastream.sh| 2 +- .../test-scripts/test_ha_per_job_cluster_datastream.sh | 9 + 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh b/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh index dd9b51e..bbe0ac8 100755 --- a/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh +++ b/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh @@ -57,7 +57,7 @@ function run_ha_test() { start_local_zk start_cluster -echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}." +echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, incremSnapshots=${INCREM} and zk=${ZOOKEEPER_VERSION}." # submit a job in detached mode and let it run local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \ diff --git a/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh b/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh index 13a1522..58b3e6f 100755 --- a/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh +++ b/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh @@ -99,6 +99,7 @@ function run_ha_test() { local BACKEND=$2 local ASYNC=$3 local INCREM=$4 +local ZOOKEEPER_VERSION=$5 local JM_KILLS=3 @@ -114,9 +115,10 @@ function run_ha_test() { # jm killing loop set_config_key "env.pid.dir" "${TEST_DATA_DIR}" +setup_flink_shaded_zookeeper ${ZOOKEEPER_VERSION} start_local_zk -echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}." +echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, incremSnapshots=${INCREM} and zk=${ZOOKEEPER_VERSION}." # submit a job in detached mode and let it run run_job ${PARALLELISM} ${BACKEND} ${ASYNC} ${INCREM} @@ -152,6 +154,7 @@ function run_ha_test() { STATE_BACKEND_TYPE=${1:-file} STATE_BACKEND_FILE_ASYNC=${2:-true} STATE_BACKEND_ROCKS_INCREMENTAL=${3:-false} +ZOOKEEPER_VERSION=${4:-3.4} function kill_test_watchdog() { local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid` @@ -169,7 +172,5 @@ on_exit kill_test_watchdog kill "$cmdpid") & watchdog_pid=$! echo $watchdog_pid > $TEST_DATA_DIR/job_watchdog.pid -run_ha_test 4 ${STATE_BACKEND_TYPE} ${STATE_BACKEND_FILE_ASYNC} ${STATE_BACKEND_ROCKS_INCREMENTAL} +run_ha_test 4 ${STATE_BACKEND_TYPE} ${STATE_BACKEND_FILE_ASYNC} ${STATE_BACKEND_ROCKS_INCREMENTAL} ${ZOOKEEPER_VERSION} ) - -
[flink] branch master updated (a4a99ba -> 44af789)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from a4a99ba [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map add 44af789 [FLINK-16350] Support Zookeeper 3.5 in test_ha_per_job_cluster_datastream.sh No new revisions were added by this update. Summary of changes: flink-end-to-end-tests/test-scripts/test_ha_datastream.sh | 2 +- .../test-scripts/test_ha_per_job_cluster_datastream.sh | 7 +-- 2 files changed, 6 insertions(+), 3 deletions(-)
[flink] branch release-1.11 updated: [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 0965b47 [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map 0965b47 is described below commit 0965b47f5b55ce64c5ed5687c37d4b740aceec36 Author: wangyang0918 AuthorDate: Fri Jun 5 16:33:37 2020 +0800 [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map DeploymentOptionsInternal#CONF_DIR is an internal option and stores the client config path. It should not be added to config map and used by JobManager pod. Instead, KubernetesConfigOptions#FLINK_CONF_DIR will be used. This closes #12501. --- .../decorators/FlinkConfMountDecorator.java| 11 - .../parameters/AbstractKubernetesParameters.java | 4 +++- .../flink/kubernetes/KubernetesTestUtils.java | 15 + .../decorators/FlinkConfMountDecoratorTest.java| 26 +++--- .../AbstractKubernetesParametersTest.java | 16 + 5 files changed, 58 insertions(+), 14 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java index 79eb7aa..ef24fa3 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.kubeclient.decorators; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptionsInternal; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; @@ -144,11 +145,11 @@ public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator { * Get properties map for the cluster-side after removal of some keys. */ private Map getClusterSidePropertiesMap(Configuration flinkConfig) { - final Map propertiesMap = flinkConfig.toMap(); - - // remove kubernetes.config.file - propertiesMap.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key()); - return propertiesMap; + final Configuration clusterSideConfig = flinkConfig.clone(); + // Remove some configuration options that should not be taken to cluster side. + clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE); + clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR); + return clusterSideConfig.toMap(); } @VisibleForTesting diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java index c655b63..85a33d6 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java @@ -56,7 +56,9 @@ public abstract class AbstractKubernetesParameters implements KubernetesParamete @Override public String getConfigDirectory() { - final String configDir = flinkConfig.get(DeploymentOptionsInternal.CONF_DIR); + final String configDir = flinkConfig.getOptional(DeploymentOptionsInternal.CONF_DIR).orElse( + flinkConfig.getString(KubernetesConfigOptions.FLINK_CONF_DIR)); + checkNotNull(configDir); return configDir; } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java index f9dea84..6d1f8e7 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java @@ -18,8 +18,12 @@ package org.apache.flink.kubernetes; +import org.apache.flink.configuration.Configuration; + import org.apache.flink.shaded.guava18.com.google.common.io.Files; +import org.apache.commons.lang3.StringUtils; + import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -32,4 +36,15 @@ public class
[flink] branch master updated: [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new a4a99ba [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map a4a99ba is described below commit a4a99bac919d57387d54a2db80a249becf3ba680 Author: wangyang0918 AuthorDate: Fri Jun 5 16:33:37 2020 +0800 [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map DeploymentOptionsInternal#CONF_DIR is an internal option and stores the client config path. It should not be added to config map and used by JobManager pod. Instead, KubernetesConfigOptions#FLINK_CONF_DIR will be used. This closes #12501. --- .../decorators/FlinkConfMountDecorator.java| 11 - .../parameters/AbstractKubernetesParameters.java | 4 +++- .../flink/kubernetes/KubernetesTestUtils.java | 15 + .../decorators/FlinkConfMountDecoratorTest.java| 26 +++--- .../AbstractKubernetesParametersTest.java | 16 + 5 files changed, 58 insertions(+), 14 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java index 79eb7aa..ef24fa3 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.kubeclient.decorators; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptionsInternal; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; @@ -144,11 +145,11 @@ public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator { * Get properties map for the cluster-side after removal of some keys. */ private Map getClusterSidePropertiesMap(Configuration flinkConfig) { - final Map propertiesMap = flinkConfig.toMap(); - - // remove kubernetes.config.file - propertiesMap.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key()); - return propertiesMap; + final Configuration clusterSideConfig = flinkConfig.clone(); + // Remove some configuration options that should not be taken to cluster side. + clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE); + clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR); + return clusterSideConfig.toMap(); } @VisibleForTesting diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java index c655b63..85a33d6 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java @@ -56,7 +56,9 @@ public abstract class AbstractKubernetesParameters implements KubernetesParamete @Override public String getConfigDirectory() { - final String configDir = flinkConfig.get(DeploymentOptionsInternal.CONF_DIR); + final String configDir = flinkConfig.getOptional(DeploymentOptionsInternal.CONF_DIR).orElse( + flinkConfig.getString(KubernetesConfigOptions.FLINK_CONF_DIR)); + checkNotNull(configDir); return configDir; } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java index f9dea84..6d1f8e7 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java @@ -18,8 +18,12 @@ package org.apache.flink.kubernetes; +import org.apache.flink.configuration.Configuration; + import org.apache.flink.shaded.guava18.com.google.common.io.Files; +import org.apache.commons.lang3.StringUtils; + import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -32,4 +36,15 @@ public class