[flink] branch release-1.11 updated (c98122c -> ab6cf40)

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c98122c  [FLINK-16559][hive] Add test for avro table
 new 0d9ed90  [FLINK-18056][fs-connector] Hadoop path-based file writer 
adds UUID to in-progress file to avoid conflicts
 new ab6cf40  [FLINK-18130][hive][fs-connector] File name conflict for 
different jobs in filesystem/hive sink ()

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/connectors/hive/HiveTableSink.java   |   6 +-
 .../flink/connectors/hive/HiveTableSinkITCase.java |  75 
 flink-formats/flink-hadoop-bulk/pom.xml|  37 ++
 .../bulk/DefaultHadoopFileCommitterFactory.java|  16 ++-
 .../formats/hadoop/bulk/HadoopFileCommitter.java   |   2 +-
 .../hadoop/bulk/HadoopFileCommitterFactory.java|  18 ++-
 .../hadoop/bulk/HadoopPathBasedPartFileWriter.java |  59 ++---
 .../bulk/committer/HadoopRenameFileCommitter.java  |  41 +--
 ...terTest.java => AbstractFileCommitterTest.java} | 132 +
 .../bulk/HadoopPathBasedPartFileWriterTest.java|   6 +-
 .../committer/HadoopRenameCommitterHDFSTest.java   |  97 +++
 .../HadoopRenameCommitterLocalFSTest.java  |  69 +++
 .../bulk/committer/cluster/HDFSCluster.java}   |  40 ---
 .../planner/runtime/FileSystemITCaseBase.scala |  37 ++
 .../stream/sql/StreamFileSystemITCaseBase.scala|   6 +-
 .../table/filesystem/FileSystemTableSink.java  |   8 ++
 .../flink/table/filesystem/PartitionLoader.java|   9 +-
 .../table/filesystem/PartitionTempFileManager.java |   2 +-
 18 files changed, 546 insertions(+), 114 deletions(-)
 rename 
flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/{committer/HadoopRenameFileCommitterTest.java
 => AbstractFileCommitterTest.java} (66%)
 create mode 100644 
flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterHDFSTest.java
 create mode 100644 
flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterLocalFSTest.java
 copy 
flink-formats/flink-hadoop-bulk/src/{main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java
 => 
test/java/org/apache/flink/formats/hadoop/bulk/committer/cluster/HDFSCluster.java}
 (56%)



[flink] 02/02: [FLINK-18130][hive][fs-connector] File name conflict for different jobs in filesystem/hive sink ()

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ab6cf4083c84bf8c04564c74f6d9f9783adf5e21
Author: Jingsong Lee 
AuthorDate: Tue Jun 9 13:48:42 2020 +0800

[FLINK-18130][hive][fs-connector] File name conflict for different jobs in 
filesystem/hive sink ()


This closes #12485
---
 .../flink/connectors/hive/HiveTableSink.java   |  6 +-
 .../flink/connectors/hive/HiveTableSinkITCase.java | 75 ++
 .../planner/runtime/FileSystemITCaseBase.scala | 37 +++
 .../stream/sql/StreamFileSystemITCaseBase.scala|  6 +-
 .../table/filesystem/FileSystemTableSink.java  |  8 +++
 .../flink/table/filesystem/PartitionLoader.java|  9 +--
 .../table/filesystem/PartitionTempFileManager.java |  2 +-
 7 files changed, 132 insertions(+), 11 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index a4e3012..1cac9e0 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -77,6 +77,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_TIME_INTERVAL;
@@ -141,9 +142,10 @@ public class HiveTableSink implements 
AppendStreamTableSink, PartitionableTableS
isCompressed);
String extension = Utilities.getFileExtension(jobConf, 
isCompressed,
(HiveOutputFormat) 
hiveOutputFormatClz.newInstance());
-   extension = extension == null ? "" : extension;
OutputFileConfig outputFileConfig = 
OutputFileConfig.builder()
-   .withPartSuffix(extension).build();
+   .withPartPrefix("part-" + 
UUID.randomUUID().toString())
+   .withPartSuffix(extension == null ? "" 
: extension)
+   .build();
if (isBounded) {
FileSystemOutputFormat.Builder builder = 
new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(new 
HiveRowPartitionComputer(
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index a73bce9..7592a33 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -45,6 +45,8 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -60,6 +62,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -228,6 +231,25 @@ public class HiveTableSinkITCase {
}
}
 
+   @Test
+   public void testBatchAppend() {
+   TableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+   tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+   tEnv.useCatalog(hiveCatalog.getName());
+   tEnv.executeSql("create database db1");
+   tEnv.useDatabase("db1");
+   try {
+   tEnv.executeSql("create table append_table (i int, j 
int)");
+   TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert 
into append_table select 1, 1");
+   TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert 
into append_table select 2, 2");
+   ArrayList rows = 
Lists.newArrayList(tEnv.executeSql("select * from append_table").collect());
+   rows.sort(Comparator.comparingInt(o -> (int) 
o.getField(0)));
+   

[flink] 01/02: [FLINK-18056][fs-connector] Hadoop path-based file writer adds UUID to in-progress file to avoid conflicts

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0d9ed9012286ef2495b1b75277601743e8c9ff6f
Author: Yun Gao 
AuthorDate: Tue Jun 9 13:47:53 2020 +0800

[FLINK-18056][fs-connector] Hadoop path-based file writer adds UUID to 
in-progress file to avoid conflicts


This closes #12452
---
 flink-formats/flink-hadoop-bulk/pom.xml|  37 ++
 .../bulk/DefaultHadoopFileCommitterFactory.java|  16 ++-
 .../formats/hadoop/bulk/HadoopFileCommitter.java   |   2 +-
 .../hadoop/bulk/HadoopFileCommitterFactory.java|  18 ++-
 .../hadoop/bulk/HadoopPathBasedPartFileWriter.java |  59 ++---
 .../bulk/committer/HadoopRenameFileCommitter.java  |  41 +--
 ...terTest.java => AbstractFileCommitterTest.java} | 132 +
 .../bulk/HadoopPathBasedPartFileWriterTest.java|   6 +-
 .../committer/HadoopRenameCommitterHDFSTest.java   |  97 +++
 .../HadoopRenameCommitterLocalFSTest.java  |  69 +++
 .../bulk/committer/cluster/HDFSCluster.java}   |  40 ---
 11 files changed, 414 insertions(+), 103 deletions(-)

diff --git a/flink-formats/flink-hadoop-bulk/pom.xml 
b/flink-formats/flink-hadoop-bulk/pom.xml
index 371c15a..051cc53 100644
--- a/flink-formats/flink-hadoop-bulk/pom.xml
+++ b/flink-formats/flink-hadoop-bulk/pom.xml
@@ -80,6 +80,43 @@ under the License.
${project.version}
test

+
+   
+   org.apache.hadoop
+   hadoop-hdfs
+   test
+   test-jar
+   ${hadoop.version}
+   
+   
+   log4j
+   log4j
+   
+   
+   
+
+   
+   org.apache.hadoop
+   hadoop-common
+   test
+   test-jar
+   ${hadoop.version}
+   
+   
+   log4j
+   log4j
+   
+   
+   org.slf4j
+   slf4j-log4j12
+   
+   
+   
+   jdk.tools
+   jdk.tools
+   
+   
+   

 

diff --git 
a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java
 
b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java
index 01ac88c..00f8f58 100644
--- 
a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java
+++ 
b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
+
 /**
  * The default hadoop file committer factory which always use {@link 
HadoopRenameFileCommitter}.
  */
@@ -31,7 +33,19 @@ public class DefaultHadoopFileCommitterFactory implements 
HadoopFileCommitterFac
private static final long serialVersionUID = 1L;
 
@Override
-   public HadoopFileCommitter create(Configuration configuration, Path 
targetFilePath) {
+   public HadoopFileCommitter create(
+   Configuration configuration,
+   Path targetFilePath) throws IOException {
+
return new HadoopRenameFileCommitter(configuration, 
targetFilePath);
}
+
+   @Override
+   public HadoopFileCommitter recoverForCommit(
+   Configuration configuration,
+   Path targetFilePath,
+   Path tempFilePath) throws IOException {
+
+   return new HadoopRenameFileCommitter(configuration, 
targetFilePath, tempFilePath);
+   }
 }
diff --git 
a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java
 
b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java
index 7ae0d56..0fdb4f8 100644
--- 
a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java
+++ 

[flink] branch master updated: [FLINK-18130][hive][fs-connector] File name conflict for different jobs in filesystem/hive sink ()

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 603cd83  [FLINK-18130][hive][fs-connector] File name conflict for 
different jobs in filesystem/hive sink ()
603cd83 is described below

commit 603cd830073f95d1b48cf8fa6817100ac17e1eb3
Author: Jingsong Lee 
AuthorDate: Tue Jun 9 13:48:42 2020 +0800

[FLINK-18130][hive][fs-connector] File name conflict for different jobs in 
filesystem/hive sink ()


This closes #12485
---
 .../flink/connectors/hive/HiveTableSink.java   |  6 +-
 .../flink/connectors/hive/HiveTableSinkITCase.java | 75 ++
 .../planner/runtime/FileSystemITCaseBase.scala | 37 +++
 .../stream/sql/StreamFileSystemITCaseBase.scala|  6 +-
 .../table/filesystem/FileSystemTableSink.java  |  8 +++
 .../flink/table/filesystem/PartitionLoader.java|  9 +--
 .../table/filesystem/PartitionTempFileManager.java |  2 +-
 7 files changed, 132 insertions(+), 11 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index a4e3012..1cac9e0 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -77,6 +77,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_TIME_INTERVAL;
@@ -141,9 +142,10 @@ public class HiveTableSink implements 
AppendStreamTableSink, PartitionableTableS
isCompressed);
String extension = Utilities.getFileExtension(jobConf, 
isCompressed,
(HiveOutputFormat) 
hiveOutputFormatClz.newInstance());
-   extension = extension == null ? "" : extension;
OutputFileConfig outputFileConfig = 
OutputFileConfig.builder()
-   .withPartSuffix(extension).build();
+   .withPartPrefix("part-" + 
UUID.randomUUID().toString())
+   .withPartSuffix(extension == null ? "" 
: extension)
+   .build();
if (isBounded) {
FileSystemOutputFormat.Builder builder = 
new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(new 
HiveRowPartitionComputer(
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index a73bce9..7592a33 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -45,6 +45,8 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -60,6 +62,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -228,6 +231,25 @@ public class HiveTableSinkITCase {
}
}
 
+   @Test
+   public void testBatchAppend() {
+   TableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+   tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+   tEnv.useCatalog(hiveCatalog.getName());
+   tEnv.executeSql("create database db1");
+   tEnv.useDatabase("db1");
+   try {
+   tEnv.executeSql("create table append_table (i int, j 
int)");
+   TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert 
into append_table select 1, 1");
+   TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert 
into append_table select 2, 2");
+   

[flink] branch master updated (74231f7 -> 45f42f8)

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 74231f7  [FLINK-17902][python] Support the new interfaces about 
temporary functions in PyFlink
 add 45f42f8  [FLINK-18056][fs-connector] Hadoop path-based file writer 
adds UUID to in-progress file to avoid conflicts

No new revisions were added by this update.

Summary of changes:
 flink-formats/flink-hadoop-bulk/pom.xml|  37 ++
 .../bulk/DefaultHadoopFileCommitterFactory.java|  16 ++-
 .../formats/hadoop/bulk/HadoopFileCommitter.java   |   2 +-
 .../hadoop/bulk/HadoopFileCommitterFactory.java|  18 ++-
 .../hadoop/bulk/HadoopPathBasedPartFileWriter.java |  59 ++---
 .../bulk/committer/HadoopRenameFileCommitter.java  |  41 +--
 ...terTest.java => AbstractFileCommitterTest.java} | 132 +
 .../bulk/HadoopPathBasedPartFileWriterTest.java|   6 +-
 .../committer/HadoopRenameCommitterHDFSTest.java   |  97 +++
 .../HadoopRenameCommitterLocalFSTest.java  |  69 +++
 .../bulk/committer/cluster/HDFSCluster.java}   |  40 ---
 11 files changed, 414 insertions(+), 103 deletions(-)
 rename 
flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/{committer/HadoopRenameFileCommitterTest.java
 => AbstractFileCommitterTest.java} (66%)
 create mode 100644 
flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterHDFSTest.java
 create mode 100644 
flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterLocalFSTest.java
 copy 
flink-formats/flink-hadoop-bulk/src/{main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java
 => 
test/java/org/apache/flink/formats/hadoop/bulk/committer/cluster/HDFSCluster.java}
 (56%)



[flink] branch master updated (fd9214e -> 74231f7)

2020-06-08 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from fd9214e  [FLINK-16559][hive] Add test for avro table
 add 74231f7  [FLINK-17902][python] Support the new interfaces about 
temporary functions in PyFlink

No new revisions were added by this update.

Summary of changes:
 flink-python/pyflink/table/table_environment.py| 235 +
 .../table/tests/test_table_environment_api.py  |  34 +++
 flink-python/pyflink/table/tests/test_udf.py   |  14 ++
 3 files changed, 283 insertions(+)



[flink] branch release-1.11 updated: [FLINK-16559][hive] Add test for avro table

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new c98122c  [FLINK-16559][hive] Add test for avro table
c98122c is described below

commit c98122c29ef875d7fbc2b85fc9496bcfa11e0363
Author: Rui Li 
AuthorDate: Tue Jun 9 12:15:55 2020 +0800

[FLINK-16559][hive] Add test for avro table


This closes #12408
---
 .../apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java| 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
index 2da7d10..367edb9 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
@@ -128,11 +128,14 @@ public class TableEnvHiveConnectorITCase {
 
@Test
public void testDifferentFormats() throws Exception {
-   String[] formats = new String[]{"orc", "parquet", 
"sequencefile", "csv"};
+   String[] formats = new String[]{"orc", "parquet", 
"sequencefile", "csv", "avro"};
for (String format : formats) {
if (format.equals("orc") && 
HiveShimLoader.getHiveVersion().startsWith("2.0")) {
// Ignore orc test for Hive version 2.0.x for 
now due to FLINK-13998
continue;
+   } else if (format.equals("avro") && 
!HiveVersionTestUtil.HIVE_110_OR_LATER) {
+   // timestamp is not supported for avro tables 
before 1.1.0
+   continue;
}
readWriteFormat(format);
}



[flink] branch master updated (2be8239 -> fd9214e)

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2be8239  [FLINK-16291][hive] Ban count from HiveModule
 add fd9214e  [FLINK-16559][hive] Add test for avro table

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java| 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)



[flink] branch release-1.11 updated: [FLINK-16291][hive] Ban count from HiveModule

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 513ac5a  [FLINK-16291][hive] Ban count from HiveModule
513ac5a is described below

commit 513ac5ae8d4fc1e679a7faf07a7d020f073a09fc
Author: Rui Li 
AuthorDate: Tue Jun 9 12:03:23 2020 +0800

[FLINK-16291][hive] Ban count from HiveModule


This closes #12382
---
 .../java/org/apache/flink/table/module/hive/HiveModule.java  |  2 +-
 .../org/apache/flink/table/module/hive/HiveModuleTest.java   | 12 ++--
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
index e34965b..12fc5f1 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
@@ -44,7 +44,7 @@ public class HiveModule implements Module {
// a set of functions that shouldn't be overridden by HiveModule
@VisibleForTesting
static final Set BUILT_IN_FUNC_BLACKLIST = 
Collections.unmodifiableSet(new HashSet<>(
-   Arrays.asList("dense_rank", "first_value", "lag", 
"last_value", "lead", "rank", "row_number",
+   Arrays.asList("count", "dense_rank", "first_value", 
"lag", "last_value", "lead", "rank", "row_number",
"hop", "hop_end", "hop_proctime", 
"hop_rowtime", "hop_start",
"session", "session_end", 
"session_proctime", "session_rowtime", "session_start",
"tumble", "tumble_end", 
"tumble_proctime", "tumble_rowtime", "tumble_start")));
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
index 65cbda51..374984a 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
@@ -62,22 +62,22 @@ public class HiveModuleTest {
 
switch (hiveVersion) {
case HIVE_VERSION_V1_2_0:
-   assertEquals(232, 
hiveModule.listFunctions().size());
+   assertEquals(231, 
hiveModule.listFunctions().size());
break;
case HIVE_VERSION_V2_0_0:
-   assertEquals(236, 
hiveModule.listFunctions().size());
+   assertEquals(235, 
hiveModule.listFunctions().size());
break;
case HIVE_VERSION_V2_1_1:
-   assertEquals(246, 
hiveModule.listFunctions().size());
+   assertEquals(245, 
hiveModule.listFunctions().size());
break;
case HIVE_VERSION_V2_2_0:
-   assertEquals(262, 
hiveModule.listFunctions().size());
+   assertEquals(261, 
hiveModule.listFunctions().size());
break;
case HIVE_VERSION_V2_3_4:
-   assertEquals(280, 
hiveModule.listFunctions().size());
+   assertEquals(279, 
hiveModule.listFunctions().size());
break;
case HIVE_VERSION_V3_1_1:
-   assertEquals(299, 
hiveModule.listFunctions().size());
+   assertEquals(298, 
hiveModule.listFunctions().size());
break;
}
}



[flink] branch master updated (02b915a -> 2be8239)

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 02b915a  [FLINK-17893][sql-client] SQL CLI should print the root cause 
if the statement is invalid
 add 2be8239  [FLINK-16291][hive] Ban count from HiveModule

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/flink/table/module/hive/HiveModule.java  |  2 +-
 .../org/apache/flink/table/module/hive/HiveModuleTest.java   | 12 ++--
 2 files changed, 7 insertions(+), 7 deletions(-)



[flink] branch release-1.11 updated (882fdda -> c211946)

2020-06-08 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 882fdda  [FLINK-16101][docs-zh][table] Translate "Hive Functions" page 
of "Hive Integration" into Chinese
 add c211946  [FLINK-17893][sql-client] SQL CLI should print the root cause 
if the statement is invalid

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/table/client/cli/CliClient.java   |  11 +-
 .../apache/flink/table/client/cli/CliStrings.java  |   6 -
 .../flink/table/client/cli/SqlCommandParser.java   |  42 +++---
 .../flink/table/client/cli/CliClientTest.java  |  23 
 .../table/client/cli/SqlCommandParserTest.java | 152 +++--
 5 files changed, 137 insertions(+), 97 deletions(-)



[flink] branch master updated (dee868d -> 02b915a)

2020-06-08 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from dee868d  [FLINK-16101][docs-zh][table] Translate "Hive Functions" page 
of "Hive Integration" into Chinese
 add 02b915a  [FLINK-17893][sql-client] SQL CLI should print the root cause 
if the statement is invalid

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/table/client/cli/CliClient.java   |  11 +-
 .../apache/flink/table/client/cli/CliStrings.java  |   6 -
 .../flink/table/client/cli/SqlCommandParser.java   |  42 +++---
 .../flink/table/client/cli/CliClientTest.java  |  23 
 .../table/client/cli/SqlCommandParserTest.java | 152 +++--
 5 files changed, 137 insertions(+), 97 deletions(-)



[flink] branch release-1.11 updated: [FLINK-16101][docs-zh][table] Translate "Hive Functions" page of "Hive Integration" into Chinese

2020-06-08 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 882fdda  [FLINK-16101][docs-zh][table] Translate "Hive Functions" page 
of "Hive Integration" into Chinese
882fdda is described below

commit 882fdda3a33dd4bf8fc749e559ba44da7f7df8fd
Author: 20010079 
AuthorDate: Wed Apr 8 10:19:04 2020 +0800

[FLINK-16101][docs-zh][table] Translate "Hive Functions" page of "Hive 
Integration" into Chinese

This closes #11664
---
 docs/dev/table/hive/hive_functions.zh.md | 46 +++-
 1 file changed, 22 insertions(+), 24 deletions(-)

diff --git a/docs/dev/table/hive/hive_functions.zh.md 
b/docs/dev/table/hive/hive_functions.zh.md
index 537d684..c2e46cf 100644
--- a/docs/dev/table/hive/hive_functions.zh.md
+++ b/docs/dev/table/hive/hive_functions.zh.md
@@ -1,5 +1,5 @@
 ---
-title: "Hive functions"
+title: "Hive 函数"
 nav-parent_id: hive_tableapi
 nav-pos: 3
 ---
@@ -22,11 +22,11 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## Use Hive Built-in Functions via HiveModule
+## 通过 HiveModule 使用 Hive 内置函数
 
-The `HiveModule` provides Hive built-in functions as Flink system (built-in) 
functions to Flink SQL and Table API users.
+在 Flink SQL 和 Table API 中,可以通过系统内置的 `HiveModule` 来使用 Hive 内置函数,
 
-For detailed information, please refer to [HiveModule]({{ site.baseurl 
}}/dev/table/modules.html#hivemodule).
+详细信息,请参考 [HiveModule]({{ site.baseurl }}/zh/dev/table/modules.html#hivemodule)。
 
 
 
@@ -58,14 +58,14 @@ modules:
 
 
 
-* NOTE that some Hive built-in functions in older versions have [thread safety 
issues](https://issues.apache.org/jira/browse/HIVE-16183).
-We recommend users patch their own Hive to fix them.
+* 请注意旧版本的部分 Hive 
内置函数存在[线程安全问题](https://issues.apache.org/jira/browse/HIVE-16183)。
+我们建议用户及时通过补丁修正 Hive 中的这些问题。
 
-## Hive User Defined Functions
+## Hive 用户自定义函数(User Defined Functions)
 
-Users can use their existing Hive User Defined Functions in Flink.
+在 Flink 中用户可以使用 Hive 里已经存在的 UDF 函数。
 
-Supported UDF types include:
+支持的 UDF 类型包括:
 
 - UDF
 - GenericUDF
@@ -73,24 +73,23 @@ Supported UDF types include:
 - UDAF
 - GenericUDAFResolver2
 
-Upon query planning and execution, Hive's UDF and GenericUDF are automatically 
translated into Flink's ScalarFunction,
-Hive's GenericUDTF is automatically translated into Flink's TableFunction,
-and Hive's UDAF and GenericUDAFResolver2 are translated into Flink's 
AggregateFunction.
+在进行查询规划和执行时,Hive UDF 和 GenericUDF 函数会自动转换成 Flink 中的 ScalarFunction,GenericUDTF 
会被自动转换成 Flink 中的
+ TableFunction,UDAF 和 GenericUDAFResolver2 则转换成 Flink 聚合函数(AggregateFunction).
 
-To use a Hive User Defined Function, user have to
+想要使用 Hive UDF 函数,需要如下几步:
 
-- set a HiveCatalog backed by Hive Metastore that contains that function as 
current catalog of the session
-- include a jar that contains that function in Flink's classpath
-- use Blink planner.
+- 通过 Hive Metastore 将带有 UDF 的 HiveCatalog 设置为当前会话的 catalog 后端。
+- 将带有 UDF 的 jar 包放入 Flink classpath 中,并在代码中引入。
+- 使用 Blink planner。
 
-## Using Hive User Defined Functions
+## 使用 Hive UDF
 
-Assuming we have the following Hive functions registered in Hive Metastore:
+假设我们在 Hive Metastore 中已经注册了下面的 UDF 函数:
 
 
 {% highlight java %}
 /**
- * Test simple udf. Registered under name 'myudf'
+ * 注册为 'myudf' 的简单 UDF 测试类. 
  */
 public class TestHiveSimpleUDF extends UDF {
 
@@ -104,7 +103,7 @@ public class TestHiveSimpleUDF extends UDF {
 }
 
 /**
- * Test generic udf. Registered under name 'mygenericudf'
+ * 注册为 'mygenericudf' 的普通 UDF 测试类
  */
 public class TestHiveGenericUDF extends GenericUDF {
 
@@ -137,7 +136,7 @@ public class TestHiveGenericUDF extends GenericUDF {
 }
 
 /**
- * Test split udtf. Registered under name 'mygenericudtf'
+ * 注册为 'mygenericudtf' 的字符串分割 UDF 测试类
  */
 public class TestHiveUDTF extends GenericUDTF {
 
@@ -172,7 +171,7 @@ public class TestHiveUDTF extends GenericUDTF {
 
 {% endhighlight %}
 
-From Hive CLI, we can see they are registered:
+在 Hive CLI 中,可以查询到已经注册的 UDF 函数:
 
 {% highlight bash %}
 hive> show functions;
@@ -184,8 +183,7 @@ myudtf
 
 {% endhighlight %}
 
-
-Then, users can use them in SQL as:
+此时,用户如果想使用这些 UDF,在 SQL 中就可以这样写:
 
 
 {% highlight bash %}



[flink] branch master updated (926523e -> dee868d)

2020-06-08 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 926523e  [FLINK-17625][table-runtime-blink] Fix 
ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction
 add dee868d  [FLINK-16101][docs-zh][table] Translate "Hive Functions" page 
of "Hive Integration" into Chinese

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/hive/hive_functions.zh.md | 46 +++-
 1 file changed, 22 insertions(+), 24 deletions(-)



[flink] branch release-1.11 updated: [FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction

2020-06-08 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 11958b0  [FLINK-17625][table-runtime-blink] Fix 
ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction
11958b0 is described below

commit 11958b0dd424c143184ed8fd5a2943049b27a4b4
Author: lsy 
AuthorDate: Tue Jun 9 11:04:59 2020 +0800

[FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException in 
AppendOnlyTopNFunction

This closes #12303
---
 .../operators/rank/AppendOnlyTopNFunction.java | 16 +++--
 .../table/runtime/operators/rank/TopNBuffer.java   | 83 +++---
 2 files changed, 68 insertions(+), 31 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
index 6a3cf5a..82e2fae 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
@@ -211,16 +211,22 @@ public class AppendOnlyTopNFunction extends 
AbstractTopNFunction {
if (buffer.getCurrentTopNum() > rankEnd) {
Map.Entry> lastEntry = 
buffer.lastEntry();
RowData lastKey = lastEntry.getKey();
-   List lastList = (List) 
lastEntry.getValue();
+   Collection lastList = lastEntry.getValue();
+   RowData lastElement = buffer.lastElement();
+   int size = lastList.size();
// remove last one
-   RowData lastElement = lastList.remove(lastList.size() - 
1);
-   if (lastList.isEmpty()) {
+   if (size <= 1) {
buffer.removeAll(lastKey);
dataState.remove(lastKey);
} else {
-   dataState.put(lastKey, lastList);
+   buffer.removeLast();
+   // last element has been removed from lastList, 
we have to copy a new collection
+   // for lastList to avoid mutating state values, 
see CopyOnWriteStateMap,
+   // otherwise, the result might be corrupt.
+   // don't need to perform a deep copy, because 
RowData elements will not be updated
+   dataState.put(lastKey, new 
ArrayList<>(lastList));
}
-   if (input.equals(lastElement)) {
+   if (size == 0 || input.equals(lastElement)) {
return;
} else {
// lastElement shouldn't be null
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java
index 5d3ec52..3c8f7f5 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.data.RowData;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -94,12 +94,12 @@ class TopNBuffer implements Serializable {
}
 
public void remove(RowData sortKey, RowData value) {
-   Collection list = treeMap.get(sortKey);
-   if (list != null) {
-   if (list.remove(value)) {
+   Collection collection = treeMap.get(sortKey);
+   if (collection != null) {
+   if (collection.remove(value)) {
currentTopNum -= 1;
}
-   if (list.size() == 0) {
+   if (collection.size() == 0) {
treeMap.remove(sortKey);
}
}
@@ -111,9 +111,9 @@ class TopNBuffer implements Serializable {
 * @param sortKey key to remove
 */
void removeAll(RowData sortKey) {
-   Collection list = treeMap.get(sortKey);
-   if (list != null) {
-   

[flink] branch master updated (87d6a76 -> 926523e)

2020-06-08 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 87d6a76  [FLINK-18075][hotfix] Use DeserializationSchema instead of 
KeyedDeserializationSchema in DynamicTableSink
 add 926523e  [FLINK-17625][table-runtime-blink] Fix 
ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction

No new revisions were added by this update.

Summary of changes:
 .../operators/rank/AppendOnlyTopNFunction.java | 16 +++--
 .../table/runtime/operators/rank/TopNBuffer.java   | 83 +++---
 2 files changed, 68 insertions(+), 31 deletions(-)



[flink] branch release-1.11 updated: [FLINK-17722][python][build system] (followups) Keeps all jars of plugins and bin directory in CachedFiles

2020-06-08 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new aeef207  [FLINK-17722][python][build system] (followups) Keeps all 
jars of plugins and bin directory in CachedFiles
aeef207 is described below

commit aeef20744dc1a867c447e489be8c009bfa7be246
Author: huangxingbo 
AuthorDate: Mon Jun 8 20:30:20 2020 +0800

[FLINK-17722][python][build system] (followups) Keeps all jars of plugins 
and bin directory in CachedFiles

This closes #12528.
---
 tools/azure_controller.sh | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/tools/azure_controller.sh b/tools/azure_controller.sh
index 35848c2..0d305b7 100755
--- a/tools/azure_controller.sh
+++ b/tools/azure_controller.sh
@@ -122,6 +122,8 @@ if [ $STAGE == "$STAGE_COMPILE" ]; then
 ! -path 
"$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/lib/*.jar" \
 ! -path 
"$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/opt/flink-python*.jar" \
 ! -path 
"$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/opt/flink-sql-client_*.jar"
 \
+! -path 
"$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/plugins/*.jar" \
+! -path 
"$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/bin/*.jar" \
 ! -path 
"$CACHE_FLINK_DIR/flink-connectors/flink-connector-elasticsearch-base/target/flink-*.jar"
 \
 ! -path 
"$CACHE_FLINK_DIR/flink-connectors/flink-connector-kafka-base/target/flink-*.jar"
 \
 ! -path 
"$CACHE_FLINK_DIR/flink-table/flink-table-planner/target/flink-table-planner*tests.jar"
 | xargs rm -rf



[flink] branch release-1.11 updated (f323a79 -> aeef207)

2020-06-08 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from f323a79  [FLINK-18075][hotfix] Use DeserializationSchema instead of 
KeyedDeserializationSchema in DynamicTableSink
 add aeef207  [FLINK-17722][python][build system] (followups) Keeps all 
jars of plugins and bin directory in CachedFiles

No new revisions were added by this update.

Summary of changes:
 tools/azure_controller.sh | 2 ++
 1 file changed, 2 insertions(+)



[flink] branch master updated: [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink

2020-06-08 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 87d6a76  [FLINK-18075][hotfix] Use DeserializationSchema instead of 
KeyedDeserializationSchema in DynamicTableSink
87d6a76 is described below

commit 87d6a76923511f6b47ea2d5d2bba365e21cd3b96
Author: Dawid Wysakowicz 
AuthorDate: Mon Jun 8 20:02:24 2020 +0200

[FLINK-18075][hotfix] Use DeserializationSchema instead of 
KeyedDeserializationSchema in DynamicTableSink
---
 .../org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java | 3 +--
 .../flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java| 3 +--
 .../org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java| 3 +--
 .../flink/streaming/connectors/kafka/table/KafkaDynamicSink.java   | 3 +--
 4 files changed, 4 insertions(+), 8 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
index cb42bd7..8abc2a2 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.types.Row;
@@ -57,7 +56,7 @@ public class Kafka011TableSink extends KafkaTableSinkBase {
Optional> partitioner) {
return new FlinkKafkaProducer011<>(
topic,
-   new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
+   serializationSchema,
properties,
partitioner);
}
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
index 110dd9a..d632928 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -60,7 +59,7 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase 
{
Optional> partitioner) {
return new FlinkKafkaProducer011<>(
topic,
-   new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
+   serializationSchema,
properties,
partitioner);
}
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 50e4381..861e5d7 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import 

[flink] branch release-1.11 updated: [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink

2020-06-08 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new f323a79  [FLINK-18075][hotfix] Use DeserializationSchema instead of 
KeyedDeserializationSchema in DynamicTableSink
f323a79 is described below

commit f323a793b9776c56178446ee08445aaf99f8231c
Author: Dawid Wysakowicz 
AuthorDate: Mon Jun 8 20:02:24 2020 +0200

[FLINK-18075][hotfix] Use DeserializationSchema instead of 
KeyedDeserializationSchema in DynamicTableSink
---
 .../org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java | 3 +--
 .../flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java| 3 +--
 .../org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java| 3 +--
 .../flink/streaming/connectors/kafka/table/KafkaDynamicSink.java   | 3 +--
 4 files changed, 4 insertions(+), 8 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
index cb42bd7..8abc2a2 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.types.Row;
@@ -57,7 +56,7 @@ public class Kafka011TableSink extends KafkaTableSinkBase {
Optional> partitioner) {
return new FlinkKafkaProducer011<>(
topic,
-   new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
+   serializationSchema,
properties,
partitioner);
}
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
index 110dd9a..d632928 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -60,7 +59,7 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase 
{
Optional> partitioner) {
return new FlinkKafkaProducer011<>(
topic,
-   new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
+   serializationSchema,
properties,
partitioner);
}
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 50e4381..861e5d7 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import 

[flink] branch master updated: [FLINK-17260] Embellish assert output of StreamingKafkaITCase to help debugging

2020-06-08 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 50b6d9d  [FLINK-17260] Embellish assert output of StreamingKafkaITCase 
to help debugging
50b6d9d is described below

commit 50b6d9d5ffbcfbcc20c843e8758a0d2326b44ee1
Author: Aljoscha Krettek 
AuthorDate: Mon Jun 8 19:03:42 2020 +0200

[FLINK-17260] Embellish assert output of StreamingKafkaITCase to help 
debugging
---
 .../org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java   | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
index 5e64159..00d050b 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
@@ -163,10 +163,10 @@ public class StreamingKafkaITCase extends TestLogger {
final List bees = 
filterMessages(messages, "bee");
final List giraffes = 
filterMessages(messages, "giraffe");
 
-   
Assert.assertEquals(Arrays.asList("elephant,27,64213"), elephants);
-   
Assert.assertEquals(Arrays.asList("squirrel,52,66413"), squirrels);
-   
Assert.assertEquals(Arrays.asList("bee,18,65647"), bees);
-   
Assert.assertEquals(Arrays.asList("giraffe,9,6"), giraffes);
+   Assert.assertEquals(String.format("Messages 
from Kafka %s: %s", kafkaVersion, messages), 
Arrays.asList("elephant,27,64213"), elephants);
+   Assert.assertEquals(String.format("Messages 
from Kafka %s: %s", kafkaVersion, messages), 
Arrays.asList("squirrel,52,66413"), squirrels);
+   Assert.assertEquals(String.format("Messages 
from Kafka %s: %s", kafkaVersion, messages), Arrays.asList("bee,18,65647"), 
bees);
+   Assert.assertEquals(String.format("Messages 
from Kafka %s: %s", kafkaVersion, messages), Arrays.asList("giraffe,9,6"), 
giraffes);
}
}
}



[flink] branch master updated (74fd291 -> 0e8a53b)

2020-06-08 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 74fd291  [FLINK-18075] Wrap the SerializationSchema in 
KafkaSerializationSchema in Kafka connector
 add 0e8a53b  [FLINK-13782][table-api] Implement type strategies for IF 
ELSE expression

No new revisions were added by this update.

Summary of changes:
 .../functions/BuiltInFunctionDefinitions.java  |  11 +-
 .../table/types/inference/InputTypeStrategies.java |  24 +-
 ...eStrategy.java => CommonInputTypeStrategy.java} |  73 --
 .../strategies/SubsequenceInputTypeStrategy.java   | 256 +
 .../types/inference/InputTypeStrategiesTest.java   |   3 +-
 .../SubsequenceInputTypeStrategyTest.java  | 143 
 .../expressions/PlannerExpressionConverter.scala   |   4 -
 .../flink/table/planner/expressions/logic.scala|  51 
 8 files changed, 487 insertions(+), 78 deletions(-)
 rename 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/{ArrayInputTypeStrategy.java
 => CommonInputTypeStrategy.java} (52%)
 create mode 100644 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubsequenceInputTypeStrategy.java
 create mode 100644 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/SubsequenceInputTypeStrategyTest.java
 delete mode 100644 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala



[flink] branch master updated (74fd291 -> 0e8a53b)

2020-06-08 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 74fd291  [FLINK-18075] Wrap the SerializationSchema in 
KafkaSerializationSchema in Kafka connector
 add 0e8a53b  [FLINK-13782][table-api] Implement type strategies for IF 
ELSE expression

No new revisions were added by this update.

Summary of changes:
 .../functions/BuiltInFunctionDefinitions.java  |  11 +-
 .../table/types/inference/InputTypeStrategies.java |  24 +-
 ...eStrategy.java => CommonInputTypeStrategy.java} |  73 --
 .../strategies/SubsequenceInputTypeStrategy.java   | 256 +
 .../types/inference/InputTypeStrategiesTest.java   |   3 +-
 .../SubsequenceInputTypeStrategyTest.java  | 143 
 .../expressions/PlannerExpressionConverter.scala   |   4 -
 .../flink/table/planner/expressions/logic.scala|  51 
 8 files changed, 487 insertions(+), 78 deletions(-)
 rename 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/{ArrayInputTypeStrategy.java
 => CommonInputTypeStrategy.java} (52%)
 create mode 100644 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubsequenceInputTypeStrategy.java
 create mode 100644 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/SubsequenceInputTypeStrategyTest.java
 delete mode 100644 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala



[flink] branch release-1.11 updated (5399e03 -> 663e9f3)

2020-06-08 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 5399e03  [FLINK-17635][docs][table] Add documentation about view 
support
 new 8d9e3a0  [FLINK-18075] Remove deprecation of Kafka producer ctor that 
take SerializationSchema
 new 663e9f3  [FLINK-18075] Wrap the SerializationSchema in 
KafkaSerializationSchema in Kafka connector

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  19 +++-
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  33 +-
 .../internals/KafkaSerializationSchemaWrapper.java |  95 +
 .../connectors/kafka/KafkaConsumerTestBase.java|  23 ++--
 .../connectors/kafka/KafkaProducerTestBase.java|  11 +-
 .../kafka/KafkaShortRetentionTestBase.java |   3 +-
 .../connectors/kafka/KafkaTestEnvironment.java |  24 -
 .../connectors/kafka/testutils/DataGenerators.java |   7 +-
 .../connectors/kafka/FlinkKafkaProducer.java   | 108 +++
 .../connectors/kafka/FlinkKafkaProducerTest.java   | 118 +
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  27 -
 .../connectors/kafka/table/KafkaTableITCase.java   |   5 +-
 14 files changed, 534 insertions(+), 83 deletions(-)
 create mode 100644 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
 create mode 100644 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
 create mode 100644 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
 create mode 100644 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java



[flink] 01/02: [FLINK-18075] Remove deprecation of Kafka producer ctor that take SerializationSchema

2020-06-08 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8d9e3a034963f00f657a0f6dc33b660c663f391f
Author: Dawid Wysakowicz 
AuthorDate: Thu Jun 4 13:22:56 2020 +0200

[FLINK-18075] Remove deprecation of Kafka producer ctor that take
SerializationSchema

SerializationSchema is an important interface that is widely spread and
used in other components such as Table API. It is also the most common
interface for reusable interfaces. Therefore we should support it long
term in our connectors. This commit removes the deprecation of ctors
that take this interface.

Moreover it adds the most general ctor that takes all producer
configuration options along with SerializationSchema. This makes it
feature equivalent with KafkaSerializationSchema in respect to
configuration of the producer.
---
 .../connectors/kafka/FlinkKafkaProducer.java   | 84 +++---
 1 file changed, 57 insertions(+), 27 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 25b359c..3bda1fe 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -288,16 +288,9 @@ public class FlinkKafkaProducer
 *  ID of the Kafka topic.
 * @param serializationSchema
 *  User defined (keyless) serialization schema.
-*
-* @deprecated use {@link #FlinkKafkaProducer(String, 
KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)}
 */
-   @Deprecated
public FlinkKafkaProducer(String brokerList, String topicId, 
SerializationSchema serializationSchema) {
-   this(
-   topicId,
-   new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
-   getPropertiesFromBrokerList(brokerList),
-   Optional.of(new FlinkFixedPartitioner()));
+   this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList));
}
 
/**
@@ -318,16 +311,41 @@ public class FlinkKafkaProducer
 *  User defined key-less serialization schema.
 * @param producerConfig
 *  Properties with the producer configuration.
-*
-* @deprecated use {@link #FlinkKafkaProducer(String, 
KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)}
 */
-   @Deprecated
public FlinkKafkaProducer(String topicId, SerializationSchema 
serializationSchema, Properties producerConfig) {
+   this(topicId, serializationSchema, producerConfig, 
Optional.of(new FlinkFixedPartitioner<>()));
+   }
+
+   /**
+* Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+* the topic. It accepts a key-less {@link SerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+*
+* Since a key-less {@link SerializationSchema} is used, all records 
sent to Kafka will not have an
+* attached key. Therefore, if a partitioner is also not provided, 
records will be distributed to Kafka
+* partitions in a round-robin fashion.
+*
+* @param topicId
+*  The topic to write data to
+* @param serializationSchema
+*  A key-less serializable serialization schema for turning 
user objects into a kafka-consumable byte[]
+* @param producerConfig
+*  Configuration properties for the KafkaProducer. 
'bootstrap.servers.' is the only required argument.
+* @param customPartitioner
+*  A serializable partitioner for assigning messages to Kafka 
partitions. If a partitioner is not
+*  provided, records will be distributed to Kafka partitions 
in a round-robin fashion.
+*/
+   public FlinkKafkaProducer(
+   String topicId,
+   SerializationSchema serializationSchema,
+   Properties producerConfig,
+   Optional> customPartitioner) {
this(
topicId,
-   new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
+   serializationSchema,
producerConfig,
-   Optional.of(new FlinkFixedPartitioner()));
+   customPartitioner.orElse(null),
+  

[flink] 02/02: [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector

2020-06-08 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 663e9f3f0e903c9e20a9e786dd56364f6482b8f2
Author: Dawid Wysakowicz 
AuthorDate: Thu Jun 4 13:17:27 2020 +0200

[FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in 
Kafka connector
---
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  19 +++-
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  33 +-
 .../internals/KafkaSerializationSchemaWrapper.java |  95 +
 .../connectors/kafka/KafkaConsumerTestBase.java|  23 ++--
 .../connectors/kafka/KafkaProducerTestBase.java|  11 +-
 .../kafka/KafkaShortRetentionTestBase.java |   3 +-
 .../connectors/kafka/KafkaTestEnvironment.java |  24 -
 .../connectors/kafka/testutils/DataGenerators.java |   7 +-
 .../connectors/kafka/FlinkKafkaProducer.java   |  28 ++---
 .../connectors/kafka/FlinkKafkaProducerTest.java   | 118 +
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  27 -
 .../connectors/kafka/table/KafkaTableITCase.java   |   5 +-
 14 files changed, 479 insertions(+), 58 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
new file mode 100644
index 000..2bd54f1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link FlinkKafkaProducer010}.
+ */
+public class FlinkKafkaProducerTest {
+   @Test
+   public void testOpenProducer() throws Exception {
+
+   OpenTestingSerializationSchema schema = new 
OpenTestingSerializationSchema();
+   FlinkKafkaProducer010 kafkaProducer = new 
FlinkKafkaProducer010<>(
+   "localhost:9092",
+   "test-topic",
+   schema
+   );
+
+   OneInputStreamOperatorTestHarness testHarness 
= new OneInputStreamOperatorTestHarness<>(
+   new StreamSink<>(kafkaProducer),
+   1,
+   1,
+   0,
+   IntSerializer.INSTANCE,
+   new OperatorID(1, 1));
+
+   testHarness.open();
+
+   assertThat(schema.openCalled, equalTo(true));
+   }
+
+   private static class OpenTestingSerializationSchema implements 
SerializationSchema {
+   private boolean openCalled;
+
+   @Override
+   public void open(InitializationContext context) throws 
Exception {
+   openCalled = true;
+   }
+
+   @Override
+   public byte[] serialize(Integer element) {
+   return new byte[0];
+   }
+   }
+}
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 9ae751b..33abb05 100644
--- 

[flink] 01/02: [FLINK-18075] Remove deprecation of Kafka producer ctor that take SerializationSchema

2020-06-08 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bebe50346d9485fa2d962c5ed1d00da2778c8feb
Author: Dawid Wysakowicz 
AuthorDate: Thu Jun 4 13:22:56 2020 +0200

[FLINK-18075] Remove deprecation of Kafka producer ctor that take
SerializationSchema

SerializationSchema is an important interface that is widely spread and
used in other components such as Table API. It is also the most common
interface for reusable interfaces. Therefore we should support it long
term in our connectors. This commit removes the deprecation of ctors
that take this interface.

Moreover it adds the most general ctor that takes all producer
configuration options along with SerializationSchema. This makes it
feature equivalent with KafkaSerializationSchema in respect to
configuration of the producer.
---
 .../connectors/kafka/FlinkKafkaProducer.java   | 84 +++---
 1 file changed, 57 insertions(+), 27 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 25b359c..3bda1fe 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -288,16 +288,9 @@ public class FlinkKafkaProducer
 *  ID of the Kafka topic.
 * @param serializationSchema
 *  User defined (keyless) serialization schema.
-*
-* @deprecated use {@link #FlinkKafkaProducer(String, 
KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)}
 */
-   @Deprecated
public FlinkKafkaProducer(String brokerList, String topicId, 
SerializationSchema serializationSchema) {
-   this(
-   topicId,
-   new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
-   getPropertiesFromBrokerList(brokerList),
-   Optional.of(new FlinkFixedPartitioner()));
+   this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList));
}
 
/**
@@ -318,16 +311,41 @@ public class FlinkKafkaProducer
 *  User defined key-less serialization schema.
 * @param producerConfig
 *  Properties with the producer configuration.
-*
-* @deprecated use {@link #FlinkKafkaProducer(String, 
KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)}
 */
-   @Deprecated
public FlinkKafkaProducer(String topicId, SerializationSchema 
serializationSchema, Properties producerConfig) {
+   this(topicId, serializationSchema, producerConfig, 
Optional.of(new FlinkFixedPartitioner<>()));
+   }
+
+   /**
+* Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+* the topic. It accepts a key-less {@link SerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+*
+* Since a key-less {@link SerializationSchema} is used, all records 
sent to Kafka will not have an
+* attached key. Therefore, if a partitioner is also not provided, 
records will be distributed to Kafka
+* partitions in a round-robin fashion.
+*
+* @param topicId
+*  The topic to write data to
+* @param serializationSchema
+*  A key-less serializable serialization schema for turning 
user objects into a kafka-consumable byte[]
+* @param producerConfig
+*  Configuration properties for the KafkaProducer. 
'bootstrap.servers.' is the only required argument.
+* @param customPartitioner
+*  A serializable partitioner for assigning messages to Kafka 
partitions. If a partitioner is not
+*  provided, records will be distributed to Kafka partitions 
in a round-robin fashion.
+*/
+   public FlinkKafkaProducer(
+   String topicId,
+   SerializationSchema serializationSchema,
+   Properties producerConfig,
+   Optional> customPartitioner) {
this(
topicId,
-   new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
+   serializationSchema,
producerConfig,
-   Optional.of(new FlinkFixedPartitioner()));
+   customPartitioner.orElse(null),
+

[flink] branch master updated (0358292 -> 74fd291)

2020-06-08 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 0358292  [FLINK-16572][e2e][pubsub] Acknowledge message in previous 
test
 new bebe503  [FLINK-18075] Remove deprecation of Kafka producer ctor that 
take SerializationSchema
 new 74fd291  [FLINK-18075] Wrap the SerializationSchema in 
KafkaSerializationSchema in Kafka connector

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  19 +++-
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  33 +-
 .../internals/KafkaSerializationSchemaWrapper.java |  95 +
 .../connectors/kafka/KafkaConsumerTestBase.java|  23 ++--
 .../connectors/kafka/KafkaProducerTestBase.java|  11 +-
 .../kafka/KafkaShortRetentionTestBase.java |   3 +-
 .../connectors/kafka/KafkaTestEnvironment.java |  24 -
 .../connectors/kafka/testutils/DataGenerators.java |   7 +-
 .../connectors/kafka/FlinkKafkaProducer.java   | 108 +++
 .../connectors/kafka/FlinkKafkaProducerTest.java   | 118 +
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  27 -
 .../connectors/kafka/table/KafkaTableITCase.java   |   5 +-
 14 files changed, 534 insertions(+), 83 deletions(-)
 create mode 100644 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
 create mode 100644 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
 create mode 100644 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
 create mode 100644 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java



[flink] 02/02: [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector

2020-06-08 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 74fd2912d146cc2db103825f0aee2282a99b5774
Author: Dawid Wysakowicz 
AuthorDate: Thu Jun 4 13:17:27 2020 +0200

[FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in 
Kafka connector
---
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  19 +++-
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  33 +-
 .../internals/KafkaSerializationSchemaWrapper.java |  95 +
 .../connectors/kafka/KafkaConsumerTestBase.java|  23 ++--
 .../connectors/kafka/KafkaProducerTestBase.java|  11 +-
 .../kafka/KafkaShortRetentionTestBase.java |   3 +-
 .../connectors/kafka/KafkaTestEnvironment.java |  24 -
 .../connectors/kafka/testutils/DataGenerators.java |   7 +-
 .../connectors/kafka/FlinkKafkaProducer.java   |  28 ++---
 .../connectors/kafka/FlinkKafkaProducerTest.java   | 118 +
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  27 -
 .../connectors/kafka/table/KafkaTableITCase.java   |   5 +-
 14 files changed, 479 insertions(+), 58 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
new file mode 100644
index 000..2bd54f1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link FlinkKafkaProducer010}.
+ */
+public class FlinkKafkaProducerTest {
+   @Test
+   public void testOpenProducer() throws Exception {
+
+   OpenTestingSerializationSchema schema = new 
OpenTestingSerializationSchema();
+   FlinkKafkaProducer010 kafkaProducer = new 
FlinkKafkaProducer010<>(
+   "localhost:9092",
+   "test-topic",
+   schema
+   );
+
+   OneInputStreamOperatorTestHarness testHarness 
= new OneInputStreamOperatorTestHarness<>(
+   new StreamSink<>(kafkaProducer),
+   1,
+   1,
+   0,
+   IntSerializer.INSTANCE,
+   new OperatorID(1, 1));
+
+   testHarness.open();
+
+   assertThat(schema.openCalled, equalTo(true));
+   }
+
+   private static class OpenTestingSerializationSchema implements 
SerializationSchema {
+   private boolean openCalled;
+
+   @Override
+   public void open(InitializationContext context) throws 
Exception {
+   openCalled = true;
+   }
+
+   @Override
+   public byte[] serialize(Integer element) {
+   return new byte[0];
+   }
+   }
+}
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 9ae751b..33abb05 100644
--- 

[flink] branch master updated (3952bfa -> 0358292)

2020-06-08 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 3952bfa  [FLINK-17635][docs][table] Add documentation about view 
support
 add a08061b  [hotfix][pubsub] Use TestLogger
 add 0358292  [FLINK-16572][e2e][pubsub] Acknowledge message in previous 
test

No new revisions were added by this update.

Summary of changes:
 .../pom.xml|  7 ++-
 .../gcp/pubsub/CheckPubSubEmulatorTest.java| 23 +-
 .../gcp/pubsub/emulator/GCloudUnitTestBase.java|  4 +++-
 3 files changed, 18 insertions(+), 16 deletions(-)



[flink] branch master updated (3952bfa -> 0358292)

2020-06-08 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 3952bfa  [FLINK-17635][docs][table] Add documentation about view 
support
 add a08061b  [hotfix][pubsub] Use TestLogger
 add 0358292  [FLINK-16572][e2e][pubsub] Acknowledge message in previous 
test

No new revisions were added by this update.

Summary of changes:
 .../pom.xml|  7 ++-
 .../gcp/pubsub/CheckPubSubEmulatorTest.java| 23 +-
 .../gcp/pubsub/emulator/GCloudUnitTestBase.java|  4 +++-
 3 files changed, 18 insertions(+), 16 deletions(-)



[flink-training] branch master updated (22420e1 -> 163558d)

2020-06-08 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git.


from 22420e1  [hotfix] use correct main class references
 new b7fecd4  [hotfix][training] git-ignore eclipse build files
 new 163558d  [FLINK-18178][build] fix Eclipse import not using 
flinkShadowJar dependencies

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitignore   | 10 --
 build.gradle |  7 +++
 2 files changed, 15 insertions(+), 2 deletions(-)



[flink-training] 02/02: [FLINK-18178][build] fix Eclipse import not using flinkShadowJar dependencies

2020-06-08 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git

commit 163558d39bc2c15796d180dc5ae8a7943d608679
Author: Nico Kruber 
AuthorDate: Mon Jun 8 14:05:56 2020 +0200

[FLINK-18178][build] fix Eclipse import not using flinkShadowJar 
dependencies
---
 build.gradle | 7 +++
 1 file changed, 7 insertions(+)

diff --git a/build.gradle b/build.gradle
index 5e54c95..5aae798 100644
--- a/build.gradle
+++ b/build.gradle
@@ -27,6 +27,7 @@ subprojects {
 }
 apply plugin: 'com.github.johnrengelman.shadow'
 apply plugin: 'checkstyle'
+apply plugin: 'eclipse'
 
 // artifact properties
 group = 'org.apache.flink'
@@ -113,6 +114,12 @@ subprojects {
 javadoc.classpath += configurations.flinkShadowJar
 }
 
+eclipse {
+classpath {
+plusConfigurations += [configurations.flinkShadowJar]
+}
+}
+
 if (plugins.findPlugin('application')) {
 applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
 run.classpath = sourceSets.main.runtimeClasspath



[flink-training] 01/02: [hotfix][training] git-ignore eclipse build files

2020-06-08 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git

commit b7fecd43861cbb7ef15c55f5887cc121f4b18acc
Author: Nico Kruber 
AuthorDate: Mon Jun 8 12:33:37 2020 +0200

[hotfix][training] git-ignore eclipse build files
---
 .gitignore | 10 --
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/.gitignore b/.gitignore
index ba34970..d47fbf8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,9 +10,15 @@
 # Debugger
 .attach_*
 
+# Eclipse
+.project
+.settings
+.classpath
+bin/
+
 # Gradle build process files
-**/.gradle/**/*
-**/build/**/*
+/.gradle/
+build/
 **/.gradletasknamecache
 
 # IntelliJ



[flink-training] branch master updated: [hotfix] use correct main class references

2020-06-08 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git


The following commit(s) were added to refs/heads/master by this push:
 new 22420e1  [hotfix] use correct main class references
22420e1 is described below

commit 22420e10bc970eea0eb84379fbf68a7d0d0eac78
Author: David Anderson 
AuthorDate: Mon Jun 8 12:31:46 2020 +0200

[hotfix] use correct main class references
---
 hourly-tips/build.gradle  | 2 +-
 long-ride-alerts/build.gradle | 2 +-
 ride-cleansing/build.gradle   | 2 +-
 rides-and-fares/build.gradle  | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/hourly-tips/build.gradle b/hourly-tips/build.gradle
index e0b929e..11df511 100644
--- a/hourly-tips/build.gradle
+++ b/hourly-tips/build.gradle
@@ -1 +1 @@
-mainClassName = 
'org.apache.flink.training.exercises.windows.hourlytips.HourlyTipsExercise'
+mainClassName = 
'org.apache.flink.training.exercises.hourlytips.HourlyTipsExercise'
diff --git a/long-ride-alerts/build.gradle b/long-ride-alerts/build.gradle
index 61aa69b..62e4ffd 100644
--- a/long-ride-alerts/build.gradle
+++ b/long-ride-alerts/build.gradle
@@ -1 +1 @@
-mainClassName = 
'org.apache.flink.training.exercises.process.longrides.LongRidesExercise'
+mainClassName = 
'org.apache.flink.training.exercises.longrides.LongRidesExercise'
diff --git a/ride-cleansing/build.gradle b/ride-cleansing/build.gradle
index 2a9d0aa..106e3b9 100644
--- a/ride-cleansing/build.gradle
+++ b/ride-cleansing/build.gradle
@@ -1 +1 @@
-mainClassName = 
'org.apache.flink.training.exercises.basics.ridecleansing.RideCleansingExercise'
+mainClassName = 
'org.apache.flink.training.exercises.ridecleansing.RideCleansingExercise'
diff --git a/rides-and-fares/build.gradle b/rides-and-fares/build.gradle
index 1d013ef..18366af 100644
--- a/rides-and-fares/build.gradle
+++ b/rides-and-fares/build.gradle
@@ -1 +1 @@
-mainClassName = 
'org.apache.flink.training.exercises.state.ridesandfares.RidesAndFaresExercise'
+mainClassName = 
'org.apache.flink.training.exercises.ridesandfares.RidesAndFaresExercise'



[flink] branch release-1.11 updated: [FLINK-17635][docs][table] Add documentation about view support

2020-06-08 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 5399e03  [FLINK-17635][docs][table] Add documentation about view 
support
5399e03 is described below

commit 5399e0315698778e579dcf6557f14e74e08dcc2e
Author: Leonard Xu 
AuthorDate: Mon Jun 8 19:52:21 2020 +0800

[FLINK-17635][docs][table] Add documentation about view support

This closes #12502
---
 docs/dev/table/sql/create.md | 20 
 docs/dev/table/sql/create.zh.md  | 20 
 docs/dev/table/sql/drop.md   | 20 
 docs/dev/table/sql/drop.zh.md| 20 
 docs/dev/table/sql/index.md  |  4 ++--
 docs/dev/table/sql/index.zh.md   |  4 ++--
 docs/dev/table/sql/queries.md| 22 +-
 docs/dev/table/sql/queries.zh.md | 22 +-
 8 files changed, 126 insertions(+), 6 deletions(-)

diff --git a/docs/dev/table/sql/create.md b/docs/dev/table/sql/create.md
index d3e454d..dc799bb 100644
--- a/docs/dev/table/sql/create.md
+++ b/docs/dev/table/sql/create.md
@@ -31,6 +31,7 @@ Flink SQL supports the following CREATE statements for now:
 
 - CREATE TABLE
 - CREATE DATABASE
+- CREATE VIEW
 - CREATE FUNCTION
 
 ## Run a CREATE statement
@@ -351,6 +352,25 @@ The key and value of expression `key1=val1` should both be 
string literal.
 
 {% top %}
 
+## CREATE VIEW
+{% highlight sql %}
+CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
+  [{columnName [, columnName ]* }] [COMMENT view_comment]
+  AS query_expression
+{% endhighlight %}
+
+Create a view with the given query expression. If a view with the same name 
already exists in the catalog, an exception is thrown.
+
+**TEMPORARY**
+
+Create temporary view that has catalog and database namespaces and overrides 
views.
+
+**IF NOT EXISTS**
+
+If the view already exists, nothing happens.
+
+{% top %}
+
 ## CREATE FUNCTION
 {% highlight sql%}
 CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION 
diff --git a/docs/dev/table/sql/create.zh.md b/docs/dev/table/sql/create.zh.md
index d4f9d79..8348d52 100644
--- a/docs/dev/table/sql/create.zh.md
+++ b/docs/dev/table/sql/create.zh.md
@@ -31,6 +31,7 @@ CREATE 语句用于向当前或指定的 [Catalog]({{ site.baseurl }}/zh/dev/tab
 
 - CREATE TABLE
 - CREATE DATABASE
+- CREATE VIEW
 - CREATE FUNCTION
 
 ## 执行 CREATE 语句
@@ -344,6 +345,25 @@ CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
 
 {% top %}
 
+## CREATE VIEW
+{% highlight sql %}
+CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
+  [{columnName [, columnName ]* }] [COMMENT view_comment]
+  AS query_expression
+{% endhighlight %}
+
+根据给定的 query 语句创建一个视图。若数据库中已经存在同名视图会抛出异常.
+
+**TEMPORARY**
+
+创建一个有 catalog 和数据库命名空间的临时视图,并覆盖原有的视图。
+
+**IF NOT EXISTS**
+
+若该视图已经存在,则不会进行任何操作。
+
+{% top %}
+
 ## CREATE FUNCTION
 {% highlight sql%}
 CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
diff --git a/docs/dev/table/sql/drop.md b/docs/dev/table/sql/drop.md
index 6f340e8..bcaea0b 100644
--- a/docs/dev/table/sql/drop.md
+++ b/docs/dev/table/sql/drop.md
@@ -31,6 +31,7 @@ Flink SQL supports the following DROP statements for now:
 
 - DROP TABLE
 - DROP DATABASE
+- DROP VIEW
 - DROP FUNCTION
 
 ## Run a DROP statement
@@ -143,6 +144,25 @@ Dropping a non-empty database triggers an exception. 
Enabled by default.
 
 Dropping a non-empty database also drops all associated tables and functions.
 
+## DROP VIEW
+
+{% highlight sql %}
+DROP [TEMPORARY] VIEW  [IF EXISTS] [catalog_name.][db_name.]view_name
+{% endhighlight %}
+
+Drop a view that has catalog and database namespaces. If the view to drop does 
not exist, an exception is thrown.
+
+**TEMPORARY**
+
+Drop temporary view that has catalog and database namespaces.
+
+**IF EXISTS**
+
+If the view does not exist, nothing happens.
+
+**MAINTAIN DEPENDENCIES**
+Flink does not maintain dependencies of view by CASCADE/RESTRICT keywords, the 
current way is producing postpone error message when user tries to use the view 
under the scenarios like the underlying table of view has been dropped.
+
 ## DROP FUNCTION
 
 {% highlight sql%}
diff --git a/docs/dev/table/sql/drop.zh.md b/docs/dev/table/sql/drop.zh.md
index 51ac456..8d1b76d 100644
--- a/docs/dev/table/sql/drop.zh.md
+++ b/docs/dev/table/sql/drop.zh.md
@@ -31,6 +31,7 @@ Flink SQL 目前支持以下 DROP 语句:
 
 - DROP TABLE
 - DROP DATABASE
+- DROP VIEW
 - DROP FUNCTION
 
 ## 执行 DROP 语句
@@ -143,6 +144,25 @@ DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ 
(RESTRICT | CASCADE) ]
 
 删除一个非空数据库时,把相关联的表与函数一并删除。
 
+## DROP VIEW
+
+{% highlight sql %}
+DROP [TEMPORARY] VIEW  [IF EXISTS] [catalog_name.][db_name.]view_name
+{% endhighlight %}
+
+删除一个有 catalog 和数据库命名空间的视图。若需要删除的视图不存在,则会产生异常。
+
+**TEMPORARY**
+
+删除一个有 catalog 和数据库命名空间的临时视图。
+
+**IF EXISTS**
+
+若视图不存在,则不会进行任何操作。
+
+**依赖管理**
+Flink 没有使用 

[flink] branch master updated: [FLINK-17635][docs][table] Add documentation about view support

2020-06-08 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 3952bfa  [FLINK-17635][docs][table] Add documentation about view 
support
3952bfa is described below

commit 3952bfa029df4d278a8f694d3e89db9de398006b
Author: Leonard Xu 
AuthorDate: Mon Jun 8 19:52:21 2020 +0800

[FLINK-17635][docs][table] Add documentation about view support

This closes #12502
---
 docs/dev/table/sql/create.md | 20 
 docs/dev/table/sql/create.zh.md  | 20 
 docs/dev/table/sql/drop.md   | 20 
 docs/dev/table/sql/drop.zh.md| 20 
 docs/dev/table/sql/index.md  |  4 ++--
 docs/dev/table/sql/index.zh.md   |  4 ++--
 docs/dev/table/sql/queries.md| 22 +-
 docs/dev/table/sql/queries.zh.md | 22 +-
 8 files changed, 126 insertions(+), 6 deletions(-)

diff --git a/docs/dev/table/sql/create.md b/docs/dev/table/sql/create.md
index d3e454d..dc799bb 100644
--- a/docs/dev/table/sql/create.md
+++ b/docs/dev/table/sql/create.md
@@ -31,6 +31,7 @@ Flink SQL supports the following CREATE statements for now:
 
 - CREATE TABLE
 - CREATE DATABASE
+- CREATE VIEW
 - CREATE FUNCTION
 
 ## Run a CREATE statement
@@ -351,6 +352,25 @@ The key and value of expression `key1=val1` should both be 
string literal.
 
 {% top %}
 
+## CREATE VIEW
+{% highlight sql %}
+CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
+  [{columnName [, columnName ]* }] [COMMENT view_comment]
+  AS query_expression
+{% endhighlight %}
+
+Create a view with the given query expression. If a view with the same name 
already exists in the catalog, an exception is thrown.
+
+**TEMPORARY**
+
+Create temporary view that has catalog and database namespaces and overrides 
views.
+
+**IF NOT EXISTS**
+
+If the view already exists, nothing happens.
+
+{% top %}
+
 ## CREATE FUNCTION
 {% highlight sql%}
 CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION 
diff --git a/docs/dev/table/sql/create.zh.md b/docs/dev/table/sql/create.zh.md
index d4f9d79..8348d52 100644
--- a/docs/dev/table/sql/create.zh.md
+++ b/docs/dev/table/sql/create.zh.md
@@ -31,6 +31,7 @@ CREATE 语句用于向当前或指定的 [Catalog]({{ site.baseurl }}/zh/dev/tab
 
 - CREATE TABLE
 - CREATE DATABASE
+- CREATE VIEW
 - CREATE FUNCTION
 
 ## 执行 CREATE 语句
@@ -344,6 +345,25 @@ CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
 
 {% top %}
 
+## CREATE VIEW
+{% highlight sql %}
+CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
+  [{columnName [, columnName ]* }] [COMMENT view_comment]
+  AS query_expression
+{% endhighlight %}
+
+根据给定的 query 语句创建一个视图。若数据库中已经存在同名视图会抛出异常.
+
+**TEMPORARY**
+
+创建一个有 catalog 和数据库命名空间的临时视图,并覆盖原有的视图。
+
+**IF NOT EXISTS**
+
+若该视图已经存在,则不会进行任何操作。
+
+{% top %}
+
 ## CREATE FUNCTION
 {% highlight sql%}
 CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
diff --git a/docs/dev/table/sql/drop.md b/docs/dev/table/sql/drop.md
index 6f340e8..bcaea0b 100644
--- a/docs/dev/table/sql/drop.md
+++ b/docs/dev/table/sql/drop.md
@@ -31,6 +31,7 @@ Flink SQL supports the following DROP statements for now:
 
 - DROP TABLE
 - DROP DATABASE
+- DROP VIEW
 - DROP FUNCTION
 
 ## Run a DROP statement
@@ -143,6 +144,25 @@ Dropping a non-empty database triggers an exception. 
Enabled by default.
 
 Dropping a non-empty database also drops all associated tables and functions.
 
+## DROP VIEW
+
+{% highlight sql %}
+DROP [TEMPORARY] VIEW  [IF EXISTS] [catalog_name.][db_name.]view_name
+{% endhighlight %}
+
+Drop a view that has catalog and database namespaces. If the view to drop does 
not exist, an exception is thrown.
+
+**TEMPORARY**
+
+Drop temporary view that has catalog and database namespaces.
+
+**IF EXISTS**
+
+If the view does not exist, nothing happens.
+
+**MAINTAIN DEPENDENCIES**
+Flink does not maintain dependencies of view by CASCADE/RESTRICT keywords, the 
current way is producing postpone error message when user tries to use the view 
under the scenarios like the underlying table of view has been dropped.
+
 ## DROP FUNCTION
 
 {% highlight sql%}
diff --git a/docs/dev/table/sql/drop.zh.md b/docs/dev/table/sql/drop.zh.md
index 51ac456..8d1b76d 100644
--- a/docs/dev/table/sql/drop.zh.md
+++ b/docs/dev/table/sql/drop.zh.md
@@ -31,6 +31,7 @@ Flink SQL 目前支持以下 DROP 语句:
 
 - DROP TABLE
 - DROP DATABASE
+- DROP VIEW
 - DROP FUNCTION
 
 ## 执行 DROP 语句
@@ -143,6 +144,25 @@ DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ 
(RESTRICT | CASCADE) ]
 
 删除一个非空数据库时,把相关联的表与函数一并删除。
 
+## DROP VIEW
+
+{% highlight sql %}
+DROP [TEMPORARY] VIEW  [IF EXISTS] [catalog_name.][db_name.]view_name
+{% endhighlight %}
+
+删除一个有 catalog 和数据库命名空间的视图。若需要删除的视图不存在,则会产生异常。
+
+**TEMPORARY**
+
+删除一个有 catalog 和数据库命名空间的临时视图。
+
+**IF EXISTS**
+
+若视图不存在,则不会进行任何操作。
+
+**依赖管理**
+Flink 没有使用 CASCADE / 

[flink] branch master updated (88e416f -> b8c3033)

2020-06-08 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 88e416f  [FLINK-18046][hive] Decimal column stats not supported for 
Hive table
 add 4a04a71  [FLINK-17287][github] Disable merge commit button
 add b8c3033   [FLINK-17512] Add notification settings to .asf.yaml

No new revisions were added by this update.

Summary of changes:
 .asf.yaml| 10 ++
 tools/releasing/create_source_release.sh |  2 +-
 2 files changed, 11 insertions(+), 1 deletion(-)
 create mode 100644 .asf.yaml



[flink] branch master updated (88e416f -> b8c3033)

2020-06-08 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 88e416f  [FLINK-18046][hive] Decimal column stats not supported for 
Hive table
 add 4a04a71  [FLINK-17287][github] Disable merge commit button
 add b8c3033   [FLINK-17512] Add notification settings to .asf.yaml

No new revisions were added by this update.

Summary of changes:
 .asf.yaml| 10 ++
 tools/releasing/create_source_release.sh |  2 +-
 2 files changed, 11 insertions(+), 1 deletion(-)
 create mode 100644 .asf.yaml



[flink] branch release-1.11 updated: [FLINK-18046][hive] Decimal column stats not supported for Hive table

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 406b5d2  [FLINK-18046][hive] Decimal column stats not supported for 
Hive table
406b5d2 is described below

commit 406b5d24404c3f72ce84d8308f47aa76246000d3
Author: Rui Li 
AuthorDate: Mon Jun 8 18:41:11 2020 +0800

[FLINK-18046][hive] Decimal column stats not supported for Hive table


This closes #12424
---
 .../table/catalog/hive/util/HiveStatsUtil.java | 51 ++
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 10 +++--
 2 files changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
index 9fd4394..d4ae6d2 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
@@ -31,12 +31,15 @@ import 
org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Decimal;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
@@ -50,6 +53,9 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -171,6 +177,20 @@ public class HiveStatsUtil {
stringStats.isSetAvgColLen() ? 
stringStats.getAvgColLen() : null,
stringStats.isSetNumDVs() ? 
stringStats.getNumDVs() : null,
stringStats.isSetNumDVs() ? 
stringStats.getNumNulls() : null);
+   } else if (stats.isSetDecimalStats()) {
+   DecimalColumnStatsData decimalStats = 
stats.getDecimalStats();
+   // for now, just return 
CatalogColumnStatisticsDataDouble for decimal columns
+   Double max = null;
+   if (decimalStats.isSetHighValue()) {
+   max = 
toHiveDecimal(decimalStats.getHighValue()).doubleValue();
+   }
+   Double min = null;
+   if (decimalStats.isSetLowValue()) {
+   min = 
toHiveDecimal(decimalStats.getLowValue()).doubleValue();
+   }
+   Long ndv = decimalStats.isSetNumDVs() ? 
decimalStats.getNumDVs() : null;
+   Long nullCount = decimalStats.isSetNumNulls() ? 
decimalStats.getNumNulls() : null;
+   return new CatalogColumnStatisticsDataDouble(min, max, 
ndv, nullCount);
} else {
LOG.warn("Flink does not support converting 
ColumnStatisticsData '{}' for Hive column type '{}' yet.", stats, colType);
return null;
@@ -288,11 +308,42 @@ public class HiveStatsUtil {
}
return 
ColumnStatisticsData.binaryStats(hiveBinaryColumnStats);
}
+   } else if (type.equals(LogicalTypeRoot.DECIMAL)) {
+   if (colStat instanceof 
CatalogColumnStatisticsDataDouble) {
+   CatalogColumnStatisticsDataDouble flinkStats = 
(CatalogColumnStatisticsDataDouble) colStat;
+   DecimalColumnStatsData hiveStats = new 
DecimalColumnStatsData();
+   if (flinkStats.getMax() != null) {
+   // in older versions we cannot create 
HiveDecimal from Double, so convert Double to BigDecimal first
+   

[flink] branch master updated: [FLINK-18046][hive] Decimal column stats not supported for Hive table

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 88e416f  [FLINK-18046][hive] Decimal column stats not supported for 
Hive table
88e416f is described below

commit 88e416f988bc286b7cc0df7bcf0679184e7bb805
Author: Rui Li 
AuthorDate: Mon Jun 8 18:41:11 2020 +0800

[FLINK-18046][hive] Decimal column stats not supported for Hive table


This closes #12424
---
 .../table/catalog/hive/util/HiveStatsUtil.java | 51 ++
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 10 +++--
 2 files changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
index 9fd4394..d4ae6d2 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
@@ -31,12 +31,15 @@ import 
org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Decimal;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
@@ -50,6 +53,9 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -171,6 +177,20 @@ public class HiveStatsUtil {
stringStats.isSetAvgColLen() ? 
stringStats.getAvgColLen() : null,
stringStats.isSetNumDVs() ? 
stringStats.getNumDVs() : null,
stringStats.isSetNumDVs() ? 
stringStats.getNumNulls() : null);
+   } else if (stats.isSetDecimalStats()) {
+   DecimalColumnStatsData decimalStats = 
stats.getDecimalStats();
+   // for now, just return 
CatalogColumnStatisticsDataDouble for decimal columns
+   Double max = null;
+   if (decimalStats.isSetHighValue()) {
+   max = 
toHiveDecimal(decimalStats.getHighValue()).doubleValue();
+   }
+   Double min = null;
+   if (decimalStats.isSetLowValue()) {
+   min = 
toHiveDecimal(decimalStats.getLowValue()).doubleValue();
+   }
+   Long ndv = decimalStats.isSetNumDVs() ? 
decimalStats.getNumDVs() : null;
+   Long nullCount = decimalStats.isSetNumNulls() ? 
decimalStats.getNumNulls() : null;
+   return new CatalogColumnStatisticsDataDouble(min, max, 
ndv, nullCount);
} else {
LOG.warn("Flink does not support converting 
ColumnStatisticsData '{}' for Hive column type '{}' yet.", stats, colType);
return null;
@@ -288,11 +308,42 @@ public class HiveStatsUtil {
}
return 
ColumnStatisticsData.binaryStats(hiveBinaryColumnStats);
}
+   } else if (type.equals(LogicalTypeRoot.DECIMAL)) {
+   if (colStat instanceof 
CatalogColumnStatisticsDataDouble) {
+   CatalogColumnStatisticsDataDouble flinkStats = 
(CatalogColumnStatisticsDataDouble) colStat;
+   DecimalColumnStatsData hiveStats = new 
DecimalColumnStatsData();
+   if (flinkStats.getMax() != null) {
+   // in older versions we cannot create 
HiveDecimal from Double, so convert Double to BigDecimal first
+   

[flink] branch master updated (ca89635 -> 88e416f)

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ca89635  [FLINK-17406][doc] Add documentation about dynamic table 
options
 add 88e416f  [FLINK-18046][hive] Decimal column stats not supported for 
Hive table

No new revisions were added by this update.

Summary of changes:
 .../table/catalog/hive/util/HiveStatsUtil.java | 51 ++
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 10 +++--
 2 files changed, 58 insertions(+), 3 deletions(-)



[flink] branch master updated (0b7c23e -> ca89635)

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 0b7c23e  [FLINK-17776][hive][doc] Add documentation for DDL in 
hive dialect
 add ca89635  [FLINK-17406][doc] Add documentation about dynamic table 
options

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/config.md |  6 +++
 docs/dev/table/config.zh.md  |  6 +++
 docs/dev/table/sql/hints.md  | 88 
 docs/dev/table/sql/hints.zh.md   | 88 
 docs/dev/table/sql/index.md  |  1 +
 docs/dev/table/sql/index.zh.md   |  1 +
 docs/dev/table/sql/queries.md| 11 -
 docs/dev/table/sql/queries.zh.md | 11 -
 8 files changed, 210 insertions(+), 2 deletions(-)
 create mode 100644 docs/dev/table/sql/hints.md
 create mode 100644 docs/dev/table/sql/hints.zh.md



[flink] branch release-1.11 updated: [FLINK-17406][doc] Add documentation about dynamic table options

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 256bd06  [FLINK-17406][doc] Add documentation about dynamic table 
options
256bd06 is described below

commit 256bd06c12ca2cde9c3038f1a66b313313c00e92
Author: Danny Chan 
AuthorDate: Mon Jun 8 16:27:52 2020 +0800

[FLINK-17406][doc] Add documentation about dynamic table options


This closes #12319
---
 docs/dev/table/config.md |  6 +++
 docs/dev/table/config.zh.md  |  6 +++
 docs/dev/table/sql/hints.md  | 88 
 docs/dev/table/sql/hints.zh.md   | 88 
 docs/dev/table/sql/index.md  |  1 +
 docs/dev/table/sql/index.zh.md   |  1 +
 docs/dev/table/sql/queries.md| 11 -
 docs/dev/table/sql/queries.zh.md | 11 -
 8 files changed, 210 insertions(+), 2 deletions(-)

diff --git a/docs/dev/table/config.md b/docs/dev/table/config.md
index dd312d6..68a73ba 100644
--- a/docs/dev/table/config.md
+++ b/docs/dev/table/config.md
@@ -104,3 +104,9 @@ The following options can be used to tune the performance 
of the query execution
 The following options can be used to adjust the behavior of the query 
optimizer to get a better execution plan.
 
 {% include generated/optimizer_config_configuration.html %}
+
+### Table Options
+
+The following options can be used to adjust the behavior of the table planner.
+
+{% include generated/table_config_configuration.html %}
diff --git a/docs/dev/table/config.zh.md b/docs/dev/table/config.zh.md
index 9d325e6..3c41bdc 100644
--- a/docs/dev/table/config.zh.md
+++ b/docs/dev/table/config.zh.md
@@ -96,3 +96,9 @@ configuration.set_string("table.exec.mini-batch.size", 
"5000");
 以下配置可以用于调整查询优化器的行为以获得更好的执行计划。
 
 {% include generated/optimizer_config_configuration.html %}
+
+### Planner 配置
+
+以下配置可以用于调整 planner 的行为。
+
+{% include generated/table_config_configuration.html %}
diff --git a/docs/dev/table/sql/hints.md b/docs/dev/table/sql/hints.md
new file mode 100644
index 000..d8c4c90
--- /dev/null
+++ b/docs/dev/table/sql/hints.md
@@ -0,0 +1,88 @@
+---
+title: "SQL Hints"
+nav-parent_id: sql
+nav-pos: 6
+---
+
+
+* This will be replaced by the TOC
+{:toc}
+
+SQL hints can be used with SQL statements to alter execution plans. This 
chapter explains how to use hints to force various approaches.
+
+Generally a hint can be used to:
+
+- Enforce planner: there's no perfect planner, so it makes sense to implement 
hints to
+allow user better control the execution;
+- Append meta data(or statistics): some statistics like “table index for scan” 
and
+“skew info of some shuffle keys” are somewhat dynamic for the query, it would 
be very
+convenient to config them with hints because our planning metadata from the 
planner is very
+often not that accurate;
+- Operator resource constraints: for many cases, we would give a default 
resource
+configuration for the execution operators, i.e. min parallelism or
+managed memory (resource consuming UDF) or special resource requirement (GPU 
or SSD disk)
+and so on, it would be very flexible to profile the resource with hints per 
query(instead of the Job).
+
+## Dynamic Table Options
+Dynamic table options allows to specify or override table options dynamically, 
different with static table options defined with SQL DDL or connect API, 
+these options can be specified flexibly in per-table scope within each query.
+
+Thus it is very suitable to use for the ad-hoc queries in interactive 
terminal, for example, in the SQL-CLI,
+you can specify to ignore the parse error for a CSV source just by adding a 
dynamic option `/*+ OPTIONS('csv.ignore-parse-errors'='true') */`.
+
+Note: Dynamic table options default is forbidden to use because it may 
change the semantics of the query.
+You need to set the config option `table.dynamic-table-options.enabled` to be 
`true` explicitly (default is false),
+See the Configuration 
for details on how to set up the config options.
+
+### Syntax
+In order to not break the SQL compatibility, we use the Oracle style SQL hint 
syntax:
+{% highlight sql %}
+table_path /*+ OPTIONS(key=val [, key=val]*) */
+
+key:
+stringLiteral
+val:
+stringLiteral
+
+{% endhighlight %}
+
+### Examples
+
+{% highlight sql %}
+
+CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
+CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);
+
+-- override table options in query source
+select id, name from kafka_table1 /*+ 
OPTIONS('scan.startup.mode'='earliest-offset') */;
+
+-- override table options in join
+select * from
+kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1
+join
+kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2
+on t1.id = t2.id;
+
+-- override table 

[flink] branch release-1.11 updated: [FLINK-17776][hive][doc] Add documentation for DDL in hive dialect

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 11eb83f  [FLINK-17776][hive][doc] Add documentation for DDL in 
hive dialect
11eb83f is described below

commit 11eb83f13d49cf5db02cf0db011a0121912f9331
Author: Rui Li 
AuthorDate: Mon Jun 8 16:21:22 2020 +0800

[FLINK-17776][hive][doc] Add documentation for DDL in hive dialect


This closes #12439
---
 docs/dev/table/hive/hive_dialect.md| 347 +
 docs/dev/table/hive/hive_dialect.zh.md | 347 +
 docs/dev/table/hive/index.md   |   3 +-
 3 files changed, 696 insertions(+), 1 deletion(-)

diff --git a/docs/dev/table/hive/hive_dialect.md 
b/docs/dev/table/hive/hive_dialect.md
new file mode 100644
index 000..3ac1177
--- /dev/null
+++ b/docs/dev/table/hive/hive_dialect.md
@@ -0,0 +1,347 @@
+---
+title: "Hive Dialect"
+nav-parent_id: hive_tableapi
+nav-pos: 1
+---
+
+
+Starting from 1.11.0, Flink allows users to write SQL statements in Hive 
syntax when Hive dialect
+is used. By providing compatibility with Hive syntax, we aim to improve the 
interoperability with
+Hive and reduce the scenarios when users need to switch between Flink and Hive 
in order to execute
+different statements.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Use Hive Dialect
+
+Flink currently supports two SQL dialects: `default` and `hive`. You need to 
switch to Hive dialect
+before you can write in Hive syntax. The following describes how to set 
dialect with
+SQL Client and Table API. Also notice that you can dynamically switch dialect 
for each
+statement you execute. There's no need to restart a session to use a different 
dialect.
+
+### SQL Client
+
+SQL dialect can be specified via the `table.sql-dialect` property. Therefore 
you can set the initial dialect to use in
+the `configuration` section of the yaml file for your SQL Client.
+
+{% highlight yaml %}
+
+execution:
+  planner: blink
+  type: batch
+  result-mode: table
+
+configuration:
+  table.sql-dialect: hive
+  
+{% endhighlight %}
+
+You can also set the dialect after the SQL Client has launched.
+
+{% highlight bash %}
+
+Flink SQL> set table.sql-dialect=hive; -- to use hive dialect
+[INFO] Session property has been set.
+
+Flink SQL> set table.sql-dialect=default; -- to use default dialect
+[INFO] Session property has been set.
+
+{% endhighlight %}
+
+### Table API
+
+You can set dialect for your TableEnvironment with Table API.
+
+{% highlight java %}
+
+EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner()...build();
+TableEnvironment tableEnv = TableEnvironment.create(settings);
+// to use hive dialect
+tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+// to use default dialect
+tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+
+{% endhighlight %}
+
+## DDL
+
+This section lists the supported DDLs with the Hive dialect. We'll mainly 
focus on the syntax
+here. You can refer to [Hive 
doc](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL)
+for the semantics of each DDL statement.
+
+### DATABASE
+
+ Show
+
+{% highlight sql %}
+SHOW DATABASES;
+{% endhighlight %}
+
+ Create
+
+{% highlight sql %}
+CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name
+  [COMMENT database_comment]
+  [LOCATION fs_path]
+  [WITH DBPROPERTIES (property_name=property_value, ...)];
+{% endhighlight %}
+
+ Alter
+
+# Update Properties
+
+{% highlight sql %}
+ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES 
(property_name=property_value, ...);
+{% endhighlight %}
+
+# Update Owner
+
+{% highlight sql %}
+ALTER (DATABASE|SCHEMA) database_name SET OWNER [USER|ROLE] user_or_role;
+{% endhighlight %}
+
+# Update Location
+
+{% highlight sql %}
+ALTER (DATABASE|SCHEMA) database_name SET LOCATION fs_path;
+{% endhighlight %}
+
+ Drop
+
+{% highlight sql %}
+DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT|CASCADE];
+{% endhighlight %}
+
+ Use
+
+{% highlight sql %}
+USE database_name;
+{% endhighlight %}
+
+### TABLE
+
+ Show
+
+{% highlight sql %}
+SHOW TABLES;
+{% endhighlight %}
+
+ Create
+
+{% highlight sql %}
+CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
+  [(col_name data_type [column_constraint] [COMMENT col_comment], ... 
[table_constraint])]
+  [COMMENT table_comment]
+  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
+  [
+[ROW FORMAT row_format]
+[STORED AS file_format]
+  ]
+  [LOCATION fs_path]
+  [TBLPROPERTIES (property_name=property_value, ...)]
+  
+row_format:
+  : DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS 
TERMINATED BY char]
+  [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
+  [NULL DEFINED AS char]
+  | SERDE 

[flink] branch master updated (f2dd4b8 -> 0b7c23e)

2020-06-08 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from f2dd4b8  [FLINK-18050][task][checkpointing] Simplify 
ChannelStateCheckpointWriter interface
 add 0b7c23e  [FLINK-17776][hive][doc] Add documentation for DDL in 
hive dialect

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/hive/hive_dialect.md| 347 +
 docs/dev/table/hive/hive_dialect.zh.md | 347 +
 docs/dev/table/hive/index.md   |   3 +-
 3 files changed, 696 insertions(+), 1 deletion(-)
 create mode 100644 docs/dev/table/hive/hive_dialect.md
 create mode 100644 docs/dev/table/hive/hive_dialect.zh.md



[flink] branch master updated (44af789 -> f2dd4b8)

2020-06-08 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 44af789  [FLINK-16350] Support Zookeeper 3.5 in 
test_ha_per_job_cluster_datastream.sh
 add ed7b0b1  [FLINK-18050][task][checkpointing] Use CloseableIterator to 
write ResultSubpartition state
 add f2dd4b8  [FLINK-18050][task][checkpointing] Simplify 
ChannelStateCheckpointWriter interface

No new revisions were added by this update.

Summary of changes:
 .../channel/ChannelStateCheckpointWriter.java  | 16 +++--
 .../channel/ChannelStateWriteRequest.java  | 22 +--
 .../checkpoint/channel/ChannelStateWriterImpl.java | 23 +---
 ...ChannelStateWriteRequestDispatcherImplTest.java | 69 ++
 .../ChannelStateWriteRequestDispatcherTest.java|  2 +-
 5 files changed, 94 insertions(+), 38 deletions(-)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java



[flink] branch release-1.11 updated: [FLINK-16350] Support Zookeeper 3.5 in test_ha_per_job_cluster_datastream.sh

2020-06-08 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new e604784  [FLINK-16350] Support Zookeeper 3.5 in 
test_ha_per_job_cluster_datastream.sh
e604784 is described below

commit e60478429fe09df9126022eca0a756203a3dc17b
Author: Till Rohrmann 
AuthorDate: Fri Jun 5 16:29:31 2020 +0200

[FLINK-16350] Support Zookeeper 3.5 in test_ha_per_job_cluster_datastream.sh

This closes #12504.
---
 flink-end-to-end-tests/test-scripts/test_ha_datastream.sh| 2 +-
 .../test-scripts/test_ha_per_job_cluster_datastream.sh   | 9 +
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh 
b/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
index dd9b51e..bbe0ac8 100755
--- a/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
+++ b/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
@@ -57,7 +57,7 @@ function run_ha_test() {
 start_local_zk
 start_cluster
 
-echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, 
asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}."
+echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, 
asyncSnapshots=${ASYNC}, incremSnapshots=${INCREM} and zk=${ZOOKEEPER_VERSION}."
 
 # submit a job in detached mode and let it run
 local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
diff --git 
a/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh 
b/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh
index 13a1522..58b3e6f 100755
--- a/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh
+++ b/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh
@@ -99,6 +99,7 @@ function run_ha_test() {
 local BACKEND=$2
 local ASYNC=$3
 local INCREM=$4
+local ZOOKEEPER_VERSION=$5
 
 local JM_KILLS=3
 
@@ -114,9 +115,10 @@ function run_ha_test() {
 # jm killing loop
 set_config_key "env.pid.dir" "${TEST_DATA_DIR}"
 
+setup_flink_shaded_zookeeper ${ZOOKEEPER_VERSION}
 start_local_zk
 
-echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, 
asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}."
+echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, 
asyncSnapshots=${ASYNC}, incremSnapshots=${INCREM} and zk=${ZOOKEEPER_VERSION}."
 
 # submit a job in detached mode and let it run
 run_job ${PARALLELISM} ${BACKEND} ${ASYNC} ${INCREM}
@@ -152,6 +154,7 @@ function run_ha_test() {
 STATE_BACKEND_TYPE=${1:-file}
 STATE_BACKEND_FILE_ASYNC=${2:-true}
 STATE_BACKEND_ROCKS_INCREMENTAL=${3:-false}
+ZOOKEEPER_VERSION=${4:-3.4}
 
 function kill_test_watchdog() {
 local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
@@ -169,7 +172,5 @@ on_exit kill_test_watchdog
 kill "$cmdpid") & watchdog_pid=$!
 echo $watchdog_pid > $TEST_DATA_DIR/job_watchdog.pid
 
-run_ha_test 4 ${STATE_BACKEND_TYPE} ${STATE_BACKEND_FILE_ASYNC} 
${STATE_BACKEND_ROCKS_INCREMENTAL}
+run_ha_test 4 ${STATE_BACKEND_TYPE} ${STATE_BACKEND_FILE_ASYNC} 
${STATE_BACKEND_ROCKS_INCREMENTAL} ${ZOOKEEPER_VERSION}
 )
-
-



[flink] branch master updated (a4a99ba -> 44af789)

2020-06-08 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from a4a99ba  [FLINK-18149][k8s] Do not add 
DeploymentOptionsInternal#CONF_DIR to config map
 add 44af789  [FLINK-16350] Support Zookeeper 3.5 in 
test_ha_per_job_cluster_datastream.sh

No new revisions were added by this update.

Summary of changes:
 flink-end-to-end-tests/test-scripts/test_ha_datastream.sh  | 2 +-
 .../test-scripts/test_ha_per_job_cluster_datastream.sh | 7 +--
 2 files changed, 6 insertions(+), 3 deletions(-)



[flink] branch release-1.11 updated: [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map

2020-06-08 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 0965b47  [FLINK-18149][k8s] Do not add 
DeploymentOptionsInternal#CONF_DIR to config map
0965b47 is described below

commit 0965b47f5b55ce64c5ed5687c37d4b740aceec36
Author: wangyang0918 
AuthorDate: Fri Jun 5 16:33:37 2020 +0800

[FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config 
map

DeploymentOptionsInternal#CONF_DIR is an internal option and stores the 
client config path. It should not be added to config map and used by JobManager 
pod. Instead, KubernetesConfigOptions#FLINK_CONF_DIR will be used.

This closes #12501.
---
 .../decorators/FlinkConfMountDecorator.java| 11 -
 .../parameters/AbstractKubernetesParameters.java   |  4 +++-
 .../flink/kubernetes/KubernetesTestUtils.java  | 15 +
 .../decorators/FlinkConfMountDecoratorTest.java| 26 +++---
 .../AbstractKubernetesParametersTest.java  | 16 +
 5 files changed, 58 insertions(+), 14 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 79eb7aa..ef24fa3 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.kubeclient.decorators;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import 
org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
@@ -144,11 +145,11 @@ public class FlinkConfMountDecorator extends 
AbstractKubernetesStepDecorator {
 * Get properties map for the cluster-side after removal of some keys.
 */
private Map getClusterSidePropertiesMap(Configuration 
flinkConfig) {
-   final Map propertiesMap = flinkConfig.toMap();
-
-   // remove kubernetes.config.file
-   
propertiesMap.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key());
-   return propertiesMap;
+   final Configuration clusterSideConfig = flinkConfig.clone();
+   // Remove some configuration options that should not be taken 
to cluster side.
+   
clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
+   
clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
+   return clusterSideConfig.toMap();
}
 
@VisibleForTesting
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index c655b63..85a33d6 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -56,7 +56,9 @@ public abstract class AbstractKubernetesParameters implements 
KubernetesParamete
 
@Override
public String getConfigDirectory() {
-   final String configDir = 
flinkConfig.get(DeploymentOptionsInternal.CONF_DIR);
+   final String configDir = 
flinkConfig.getOptional(DeploymentOptionsInternal.CONF_DIR).orElse(
+   
flinkConfig.getString(KubernetesConfigOptions.FLINK_CONF_DIR));
+
checkNotNull(configDir);
return configDir;
}
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
index f9dea84..6d1f8e7 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
@@ -18,8 +18,12 @@
 
 package org.apache.flink.kubernetes;
 
+import org.apache.flink.configuration.Configuration;
+
 import org.apache.flink.shaded.guava18.com.google.common.io.Files;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -32,4 +36,15 @@ public class 

[flink] branch master updated: [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map

2020-06-08 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new a4a99ba  [FLINK-18149][k8s] Do not add 
DeploymentOptionsInternal#CONF_DIR to config map
a4a99ba is described below

commit a4a99bac919d57387d54a2db80a249becf3ba680
Author: wangyang0918 
AuthorDate: Fri Jun 5 16:33:37 2020 +0800

[FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config 
map

DeploymentOptionsInternal#CONF_DIR is an internal option and stores the 
client config path. It should not be added to config map and used by JobManager 
pod. Instead, KubernetesConfigOptions#FLINK_CONF_DIR will be used.

This closes #12501.
---
 .../decorators/FlinkConfMountDecorator.java| 11 -
 .../parameters/AbstractKubernetesParameters.java   |  4 +++-
 .../flink/kubernetes/KubernetesTestUtils.java  | 15 +
 .../decorators/FlinkConfMountDecoratorTest.java| 26 +++---
 .../AbstractKubernetesParametersTest.java  | 16 +
 5 files changed, 58 insertions(+), 14 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 79eb7aa..ef24fa3 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.kubeclient.decorators;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import 
org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
@@ -144,11 +145,11 @@ public class FlinkConfMountDecorator extends 
AbstractKubernetesStepDecorator {
 * Get properties map for the cluster-side after removal of some keys.
 */
private Map getClusterSidePropertiesMap(Configuration 
flinkConfig) {
-   final Map propertiesMap = flinkConfig.toMap();
-
-   // remove kubernetes.config.file
-   
propertiesMap.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key());
-   return propertiesMap;
+   final Configuration clusterSideConfig = flinkConfig.clone();
+   // Remove some configuration options that should not be taken 
to cluster side.
+   
clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
+   
clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
+   return clusterSideConfig.toMap();
}
 
@VisibleForTesting
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index c655b63..85a33d6 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -56,7 +56,9 @@ public abstract class AbstractKubernetesParameters implements 
KubernetesParamete
 
@Override
public String getConfigDirectory() {
-   final String configDir = 
flinkConfig.get(DeploymentOptionsInternal.CONF_DIR);
+   final String configDir = 
flinkConfig.getOptional(DeploymentOptionsInternal.CONF_DIR).orElse(
+   
flinkConfig.getString(KubernetesConfigOptions.FLINK_CONF_DIR));
+
checkNotNull(configDir);
return configDir;
}
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
index f9dea84..6d1f8e7 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
@@ -18,8 +18,12 @@
 
 package org.apache.flink.kubernetes;
 
+import org.apache.flink.configuration.Configuration;
+
 import org.apache.flink.shaded.guava18.com.google.common.io.Files;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -32,4 +36,15 @@ public class