[flink-kubernetes-operator] branch main updated: [hotfix] Remove strange configs from helm defaults

2022-09-20 Thread mbalassi
This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new c84f923f [hotfix] Remove strange configs from helm defaults
c84f923f is described below

commit c84f923ff5da82fa7120298e2b2c5fafcfa31ed2
Author: Gyula Fora 
AuthorDate: Wed Sep 21 06:07:10 2022 +0200

[hotfix] Remove strange configs from helm defaults
---
 helm/flink-kubernetes-operator/conf/flink-conf.yaml | 6 --
 1 file changed, 6 deletions(-)

diff --git a/helm/flink-kubernetes-operator/conf/flink-conf.yaml 
b/helm/flink-kubernetes-operator/conf/flink-conf.yaml
index be6a85c9..b310cd6e 100644
--- a/helm/flink-kubernetes-operator/conf/flink-conf.yaml
+++ b/helm/flink-kubernetes-operator/conf/flink-conf.yaml
@@ -18,12 +18,6 @@
 
 # Flink job/cluster related configs
 taskmanager.numberOfTaskSlots: 2
-blob.server.port: 6124
-jobmanager.rpc.port: 6123
-taskmanager.rpc.port: 6122
-queryable-state.proxy.ports: 6125
-jobmanager.memory.process.size: 1600m
-taskmanager.memory.process.size: 1728m
 parallelism.default: 2
 
 # Flink operator related configs



[flink] branch release-1.16 updated: [hotfix] Make ParquetProtoWriters.ParquetProtoWriterBuilder public to support customized scenarios

2022-09-20 Thread gaoyunhaii
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
 new c963ba6589b [hotfix] Make 
ParquetProtoWriters.ParquetProtoWriterBuilder public to support customized 
scenarios
c963ba6589b is described below

commit c963ba6589bf8fb547dd97c5f47d3d881d5cf46b
Author: Jin 
AuthorDate: Tue Aug 30 18:34:30 2022 -0700

[hotfix] Make ParquetProtoWriters.ParquetProtoWriterBuilder public to 
support customized scenarios

This closes #20786.
---
 .../apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java| 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java
index 022b2a6e63e..698b32b14ea 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java
@@ -45,12 +45,12 @@ public class ParquetProtoWriters {
 // 
 
 /** The builder for Protobuf {@link ParquetWriter}. */
-private static class ParquetProtoWriterBuilder
+public static class ParquetProtoWriterBuilder
 extends ParquetWriter.Builder> {
 
 private final Class clazz;
 
-protected ParquetProtoWriterBuilder(OutputFile outputFile, Class 
clazz) {
+public ParquetProtoWriterBuilder(OutputFile outputFile, Class 
clazz) {
 super(outputFile);
 this.clazz = clazz;
 }



[flink] branch master updated (05600f844a9 -> aab13977bea)

2022-09-20 Thread gaoyunhaii
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 05600f844a9 [FLINK-29325][docs] Fix documentation bug on how to enable 
batch mode for streaming examples
 add aab13977bea [hotfix] Make 
ParquetProtoWriters.ParquetProtoWriterBuilder public to support customized 
scenarios

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java| 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



[flink-table-store] branch master updated: [FLINK-29345] Create reusing reader/writer config in orc format

2022-09-20 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new 835632c6 [FLINK-29345] Create reusing reader/writer config in orc 
format
835632c6 is described below

commit 835632c6e4758ad7d11ccbdb3a8ebb8dfa6aa709
Author: shammon 
AuthorDate: Wed Sep 21 11:37:28 2022 +0800

[FLINK-29345] Create reusing reader/writer config in orc format

This closes #296
---
 .../store/format/orc/OrcBulkWriterFactory.java | 116 +
 .../table/store/format/orc/OrcFileFormat.java  |  35 ---
 .../store/format/orc/OrcBulkWriterFactoryTest.java |  83 +++
 .../table/store/format/orc/OrcFileFormatTest.java  |   9 +-
 4 files changed, 225 insertions(+), 18 deletions(-)

diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java
new file mode 100644
index ..b6670392
--- /dev/null
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.orc.vector.Vectorizer;
+import org.apache.flink.orc.writer.PhysicalWriterImpl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Writer;
+import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Orc {@link BulkWriter.Factory}. The main code is copied from Flink {@code 
OrcBulkWriterFactory}.
+ */
+public class OrcBulkWriterFactory implements BulkWriter.Factory {
+
+private final Vectorizer vectorizer;
+private final OrcFile.WriterOptions writerOptions;
+
+/**
+ * Creates a new OrcBulkWriterFactory using the provided Vectorizer, ORC 
WriterOptions.
+ *
+ * @param vectorizer The vectorizer implementation to convert input record 
to a
+ * VectorizerRowBatch.
+ * @param writerOptions ORC WriterOptions.
+ */
+public OrcBulkWriterFactory(Vectorizer vectorizer, 
OrcFile.WriterOptions writerOptions) {
+this.vectorizer = checkNotNull(vectorizer);
+this.writerOptions = checkNotNull(writerOptions);
+}
+
+@Override
+public BulkWriter create(FSDataOutputStream out) throws IOException {
+OrcFile.WriterOptions opts = getWriterOptions();
+opts.physicalWriter(new PhysicalWriterImpl(out, opts));
+
+// The path of the Writer is not used to indicate the destination file
+// in this case since we have used a dedicated physical writer to write
+// to the give output stream directly. However, the path would be used 
as
+// the key of writer in the ORC memory manager, thus we need to make 
it unique.
+Path unusedPath = new Path(UUID.randomUUID().toString());
+return new OrcBulkWriter<>(vectorizer, new WriterImpl(null, 
unusedPath, opts));
+}
+
+@VisibleForTesting
+protected OrcFile.WriterOptions getWriterOptions() {
+return writerOptions;
+}
+
+/** Orc {@link BulkWriter}. The main code is copied from Flink {@code 
OrcBulkWriter}. */
+private static class OrcBulkWriter implements BulkWriter {
+
+private final Writer writer;
+private final Vectorizer vectorizer;
+private final VectorizedRowBatch rowBatch;
+
+OrcBulkWriter(Vectorizer vectorizer, Writer writer) {
+this.vectorizer = checkNotNull(vectorizer);
+this.writer = checkNotNull(writer);
+this.rowBatch = vectorizer.getSchema().createRowBatch();
+
+// Configure 

[flink] branch release-1.15 updated: [FLINK-29325][docs] Fix documentation bug on how to enable batch mode for streaming examples

2022-09-20 Thread tangyun
This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
 new 9ee1589c425 [FLINK-29325][docs] Fix documentation bug on how to enable 
batch mode for streaming examples
9ee1589c425 is described below

commit 9ee1589c42565f47fdae8b82d488e6610bdb7fc6
Author: Jun He 
AuthorDate: Sat Sep 17 10:56:07 2022 +0800

[FLINK-29325][docs] Fix documentation bug on how to enable batch mode for 
streaming examples

This closes #20849.
---
 docs/content.zh/docs/dev/datastream/execution_mode.md | 2 +-
 docs/content/docs/dev/datastream/execution_mode.md| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/execution_mode.md 
b/docs/content.zh/docs/dev/datastream/execution_mode.md
index 2cf146a0c53..d243517cbc8 100644
--- a/docs/content.zh/docs/dev/datastream/execution_mode.md
+++ b/docs/content.zh/docs/dev/datastream/execution_mode.md
@@ -60,7 +60,7 @@ Apache Flink 对流处理和批处理统一方法,意味着无论配置何种
 下面是如何通过命令行配置执行模式:
 
 ```bash
-$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
+$ bin/flink run -Dexecution.runtime-mode=BATCH 
 ```
 
 这个例子展示了如何在代码中配置执行模式:
diff --git a/docs/content/docs/dev/datastream/execution_mode.md 
b/docs/content/docs/dev/datastream/execution_mode.md
index 415a82d7f6a..3f903907cdc 100644
--- a/docs/content/docs/dev/datastream/execution_mode.md
+++ b/docs/content/docs/dev/datastream/execution_mode.md
@@ -96,7 +96,7 @@ programmatically when creating/configuring the 
`StreamExecutionEnvironment`.
 Here's how you can configure the execution mode via the command line:
 
 ```bash
-$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
+$ bin/flink run -Dexecution.runtime-mode=BATCH 
 ```
 
 This example shows how you can configure the execution mode in code:



[flink] branch release-1.16 updated: [FLINK-29325][docs] Fix documentation bug on how to enable batch mode for streaming examples

2022-09-20 Thread tangyun
This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
 new de4aa4b7fee [FLINK-29325][docs] Fix documentation bug on how to enable 
batch mode for streaming examples
de4aa4b7fee is described below

commit de4aa4b7fee0f112fa3cfe66d0ad620841e18d74
Author: Jun He 
AuthorDate: Sat Sep 17 10:56:07 2022 +0800

[FLINK-29325][docs] Fix documentation bug on how to enable batch mode for 
streaming examples

This closes #20849.
---
 docs/content.zh/docs/dev/datastream/execution_mode.md | 2 +-
 docs/content/docs/dev/datastream/execution_mode.md| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/execution_mode.md 
b/docs/content.zh/docs/dev/datastream/execution_mode.md
index 2cf146a0c53..d243517cbc8 100644
--- a/docs/content.zh/docs/dev/datastream/execution_mode.md
+++ b/docs/content.zh/docs/dev/datastream/execution_mode.md
@@ -60,7 +60,7 @@ Apache Flink 对流处理和批处理统一方法,意味着无论配置何种
 下面是如何通过命令行配置执行模式:
 
 ```bash
-$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
+$ bin/flink run -Dexecution.runtime-mode=BATCH 
 ```
 
 这个例子展示了如何在代码中配置执行模式:
diff --git a/docs/content/docs/dev/datastream/execution_mode.md 
b/docs/content/docs/dev/datastream/execution_mode.md
index 415a82d7f6a..3f903907cdc 100644
--- a/docs/content/docs/dev/datastream/execution_mode.md
+++ b/docs/content/docs/dev/datastream/execution_mode.md
@@ -96,7 +96,7 @@ programmatically when creating/configuring the 
`StreamExecutionEnvironment`.
 Here's how you can configure the execution mode via the command line:
 
 ```bash
-$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
+$ bin/flink run -Dexecution.runtime-mode=BATCH 
 ```
 
 This example shows how you can configure the execution mode in code:



[flink] branch master updated (64c550c67c2 -> 05600f844a9)

2022-09-20 Thread tangyun
This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 64c550c67c2 [FLINK-29191][hive] Fix Hive dialect can't get value for 
the variables set by set command
 add 05600f844a9 [FLINK-29325][docs] Fix documentation bug on how to enable 
batch mode for streaming examples

No new revisions were added by this update.

Summary of changes:
 docs/content.zh/docs/dev/datastream/execution_mode.md | 2 +-
 docs/content/docs/dev/datastream/execution_mode.md| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)



[flink-table-store] branch master updated: [FLINK-29297] Group Table Store file writers into SingleFileWriter and RollingFileWriter

2022-09-20 Thread czweng
This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new 736f936a [FLINK-29297] Group Table Store file writers into 
SingleFileWriter and RollingFileWriter
736f936a is described below

commit 736f936aa1c8b1ad4f3acae8d52821ed6597f751
Author: tsreaper 
AuthorDate: Wed Sep 21 10:25:22 2022 +0800

[FLINK-29297] Group Table Store file writers into SingleFileWriter and 
RollingFileWriter

This closes #295
---
 .../source/FileStoreSourceSplitReaderTest.java |   2 +-
 .../source/TestChangelogDataReadWrite.java |   2 +-
 .../table/store/file/data/AppendOnlyWriter.java| 135 ++--
 .../table/store/file/data/DataFileReader.java  |  56 +--
 .../table/store/file/data/DataFileWriter.java  | 179 -
 .../store/file/{writer => io}/FileWriter.java  |  54 ---
 .../KeyValueDataFileRecordReader.java} |  33 ++--
 .../store/file/io/KeyValueDataFileWriter.java  | 146 +
 .../file/{writer => io}/RollingFileWriter.java |  87 ++
 .../RowDataFileRecordReader.java}  |  14 +-
 .../table/store/file/io/RowDataFileWriter.java |  79 +
 .../store/file/io/RowDataRollingFileWriter.java|  50 ++
 .../table/store/file/io/SingleFileWriter.java  | 152 +
 .../file/io/StatsCollectingSingleFileWriter.java   |  76 +
 .../table/store/file/manifest/ManifestFile.java|  52 ++
 .../store/file/mergetree/MergeTreeWriter.java  |   5 +-
 .../file/operation/AppendOnlyFileStoreRead.java|   4 +-
 .../file/operation/AppendOnlyFileStoreWrite.java   |  11 +-
 .../table/store/file/operation/FileStoreWrite.java |   2 +-
 .../file/operation/KeyValueFileStoreWrite.java |   2 +-
 .../store/file/{writer => utils}/RecordWriter.java |  13 +-
 .../table/store/file/writer/BaseFileWriter.java| 118 --
 .../flink/table/store/file/writer/Metric.java  |  48 --
 .../table/store/file/writer/MetricFileWriter.java  | 179 -
 .../store/table/AppendOnlyFileStoreTable.java  |   2 +-
 .../table/ChangelogValueCountFileStoreTable.java   |   2 +-
 .../table/ChangelogWithKeyFileStoreTable.java  |   2 +-
 .../table/store/table/sink/AbstractTableWrite.java |   2 +-
 .../table/store/table/sink/MemoryTableWrite.java   |   2 +-
 .../flink/table/store/file/TestFileStore.java  |   2 +-
 .../store/file/data/AppendOnlyWriterTest.java  |  12 +-
 .../flink/table/store/file/data/DataFileTest.java  |  11 +-
 .../store/file/format/FileFormatSuffixTest.java|   7 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |  14 +-
 34 files changed, 723 insertions(+), 832 deletions(-)

diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index 25211fe6..049b6c85 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 8c7a504b..4145bf54 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -36,8 +36,8 @@ import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
-import 

[flink] branch master updated: [FLINK-29191][hive] Fix Hive dialect can't get value for the variables set by set command

2022-09-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 64c550c67c2 [FLINK-29191][hive] Fix Hive dialect can't get value for 
the variables set by set command
64c550c67c2 is described below

commit 64c550c67c2d580f369dfaa6ff48e2e6816c6fcd
Author: yuxia Luo 
AuthorDate: Wed Sep 21 09:46:50 2022 +0800

[FLINK-29191][hive] Fix Hive dialect can't get value for the variables set 
by set command

This closes #20774
---
 .../delegation/hive/HiveOperationExecutor.java |  7 +--
 .../table/planner/delegation/hive/HiveParser.java  | 18 ++--
 .../delegation/hive/copy/HiveSetProcessor.java | 21 ---
 .../flink/connectors/hive/HiveDialectITCase.java   | 13 ++--
 .../flink-sql-client/src/test/resources/sql/set.q  | 24 ++
 5 files changed, 57 insertions(+), 26 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
index 597584cc3fd..664ce4e6e35 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
@@ -85,7 +85,7 @@ public class HiveOperationExecutor implements 
ExtendedOperationExecutor {
 
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null);
 if (!(currentCatalog instanceof HiveCatalog)) {
 throw new FlinkHiveException(
-"Only support SET command when the current catalog is 
HiveCatalog ing Hive dialect.");
+"Only support SET command when the current catalog is 
HiveCatalog in Hive dialect.");
 }
 
 HiveConf hiveConf = ((HiveCatalog) currentCatalog).getHiveConf();
@@ -112,7 +112,10 @@ public class HiveOperationExecutor implements 
ExtendedOperationExecutor {
 // set key
 String option =
 HiveSetProcessor.getVariable(
-hiveConf, hiveVariables, 
hiveSetOperation.getKey().get());
+tableConfig.getConfiguration().toMap(),
+hiveConf,
+hiveVariables,
+hiveSetOperation.getKey().get());
 return 
Optional.of(buildResultForShowVariable(Collections.singletonList(option)));
 } else {
 HiveSetProcessor.setVariable(
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 0dd5613a8e3..c8d1630133e 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -217,15 +217,16 @@ public class HiveParser extends ParserImpl {
 }
 
 private Optional tryProcessHiveNonSqlStatement(HiveConf 
hiveConf, String statement) {
+statement = statement.trim();
+if (statement.endsWith(";")) {
+// the command may end with ";" since it won't be removed by Flink 
SQL CLI,
+// so, we need to remove ";"
+statement = statement.substring(0, statement.length() - 1);
+}
 String[] commandTokens = statement.split("\\s+");
 HiveCommand hiveCommand = HiveCommand.find(commandTokens);
 if (hiveCommand != null) {
 String cmdArgs = 
statement.substring(commandTokens[0].length()).trim();
-// the command may end with ";" since it won't be removed by Flink 
SQL CLI,
-// so, we need to remove ";"
-if (cmdArgs.endsWith(";")) {
-cmdArgs = cmdArgs.substring(0, cmdArgs.length() - 1);
-}
 if (hiveCommand == HiveCommand.SET) {
 return Optional.of(processSetCmd(statement, cmdArgs));
 } else if (hiveCommand == HiveCommand.RESET) {
@@ -242,9 +243,14 @@ public class HiveParser extends ParserImpl {
 
 private Operation processSetCmd(String originCmd, String setCmdArgs) {
 if (setCmdArgs.equals("")) {
-return new HiveSetOperation();
+// the command is "set", if we follow Hive's behavior, it will 
output all configurations
+// including hiveconf, hivevar, env, ... which are too 

[flink] 04/04: [FLINK-29191][hive] fix Hive dialect can't get value for the variables set by set command

2022-09-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4d855a3c08733afac935d87df6544f0811aef84
Author: luoyuxia 
AuthorDate: Wed Sep 7 10:54:26 2022 +0800

[FLINK-29191][hive] fix Hive dialect can't get value for the variables set 
by set command
---
 .../delegation/hive/HiveOperationExecutor.java |  7 +--
 .../table/planner/delegation/hive/HiveParser.java  | 18 ++--
 .../delegation/hive/copy/HiveSetProcessor.java | 21 ---
 .../flink/connectors/hive/HiveDialectITCase.java   | 13 ++--
 .../flink-sql-client/src/test/resources/sql/set.q  | 24 ++
 5 files changed, 57 insertions(+), 26 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
index 597584cc3fd..664ce4e6e35 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
@@ -85,7 +85,7 @@ public class HiveOperationExecutor implements 
ExtendedOperationExecutor {
 
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null);
 if (!(currentCatalog instanceof HiveCatalog)) {
 throw new FlinkHiveException(
-"Only support SET command when the current catalog is 
HiveCatalog ing Hive dialect.");
+"Only support SET command when the current catalog is 
HiveCatalog in Hive dialect.");
 }
 
 HiveConf hiveConf = ((HiveCatalog) currentCatalog).getHiveConf();
@@ -112,7 +112,10 @@ public class HiveOperationExecutor implements 
ExtendedOperationExecutor {
 // set key
 String option =
 HiveSetProcessor.getVariable(
-hiveConf, hiveVariables, 
hiveSetOperation.getKey().get());
+tableConfig.getConfiguration().toMap(),
+hiveConf,
+hiveVariables,
+hiveSetOperation.getKey().get());
 return 
Optional.of(buildResultForShowVariable(Collections.singletonList(option)));
 } else {
 HiveSetProcessor.setVariable(
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 0dd5613a8e3..c8d1630133e 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -217,15 +217,16 @@ public class HiveParser extends ParserImpl {
 }
 
 private Optional tryProcessHiveNonSqlStatement(HiveConf 
hiveConf, String statement) {
+statement = statement.trim();
+if (statement.endsWith(";")) {
+// the command may end with ";" since it won't be removed by Flink 
SQL CLI,
+// so, we need to remove ";"
+statement = statement.substring(0, statement.length() - 1);
+}
 String[] commandTokens = statement.split("\\s+");
 HiveCommand hiveCommand = HiveCommand.find(commandTokens);
 if (hiveCommand != null) {
 String cmdArgs = 
statement.substring(commandTokens[0].length()).trim();
-// the command may end with ";" since it won't be removed by Flink 
SQL CLI,
-// so, we need to remove ";"
-if (cmdArgs.endsWith(";")) {
-cmdArgs = cmdArgs.substring(0, cmdArgs.length() - 1);
-}
 if (hiveCommand == HiveCommand.SET) {
 return Optional.of(processSetCmd(statement, cmdArgs));
 } else if (hiveCommand == HiveCommand.RESET) {
@@ -242,9 +243,14 @@ public class HiveParser extends ParserImpl {
 
 private Operation processSetCmd(String originCmd, String setCmdArgs) {
 if (setCmdArgs.equals("")) {
-return new HiveSetOperation();
+// the command is "set", if we follow Hive's behavior, it will 
output all configurations
+// including hiveconf, hivevar, env, ... which are too many.
+// So in here, for this case, just delegate to Flink's own behavior
+// which will only output the flink configuration.
+return super.parse(originCmd).get(0);
 }
 if 

[flink] 02/04: [FLINK-29045][hive] Optimize error message in Flink SQL Client and Gateway when try to use Hive Dialect

2022-09-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9e16d54b9ea0422a97bcbe20ebb244be54dc1c3c
Author: yuxia Luo 
AuthorDate: Tue Sep 20 22:28:35 2022 +0800

[FLINK-29045][hive] Optimize error message in Flink SQL Client and Gateway 
when try to use Hive Dialect

This closes #20695
---
 .../client/gateway/context/SessionContext.java | 11 +++
 .../gateway/service/context/SessionContext.java| 34 +++---
 2 files changed, 35 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
index 7e44b65dad6..ef5917ea740 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
@@ -24,6 +24,8 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
@@ -34,6 +36,7 @@ import 
org.apache.flink.table.client.resource.ClientResourceManager;
 import org.apache.flink.table.client.util.ClientClassloaderUtil;
 import org.apache.flink.table.client.util.ClientWrapperClassLoader;
 import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import org.slf4j.Logger;
@@ -156,6 +159,14 @@ public class SessionContext {
 } catch (Exception e) {
 // get error and reset the key with old value
 resetSessionConfigurationToDefault(originConfiguration);
+if (value.equalsIgnoreCase(SqlDialect.HIVE.name())
+&& e instanceof ValidationException) {
+String additionErrorMsg =
+"Note: if you want to use Hive dialect, "
++ "please first move the jar 
`flink-table-planner_2.12` located in `FLINK_HOME/opt` "
++ "to `FLINK_HOME/lib` and then move out the 
jar `flink-table-planner-loader` from `FLINK_HOME/lib`.";
+ExceptionUtils.updateDetailMessage(e, t -> t.getMessage() + 
additionErrorMsg);
+}
 throw new SqlExecutionException(
 String.format("Failed to set key %s with value %s.", key, 
value), e);
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index 94cc9d13b32..67d73a6afc7 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -23,8 +23,10 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
 import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
@@ -46,6 +48,7 @@ import 
org.apache.flink.table.gateway.service.operation.OperationManager;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkUserCodeClassLoaders;
 import org.apache.flink.util.MutableURLClassLoader;
 
@@ -293,16 +296,27 @@ public class SessionContext {
 catalogManager,
 functionCatalog);
 
-return new StreamTableEnvironmentImpl(
-catalogManager,
-moduleManager,
-resourceManager,
-functionCatalog,
-tableConfig,
-

[flink] 03/04: [FLINK-29222][hive] Fix wrong behavior for Hive's load data inpath

2022-09-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bff0985aef4ed43681e6ad3bd81fc460bef3c6a5
Author: yuxia Luo 
AuthorDate: Tue Sep 20 22:30:08 2022 +0800

[FLINK-29222][hive] Fix wrong behavior for Hive's load data inpath

This closes #20778
---
 .../planner/delegation/hive/HiveOperationExecutor.java  |  8 
 .../flink/connectors/hive/HiveDialectQueryITCase.java   | 13 ++---
 2 files changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
index 2010062fe4c..597584cc3fd 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
@@ -153,14 +153,14 @@ public class HiveOperationExecutor implements 
ExtendedOperationExecutor {
 hiveLoadDataOperation.getPath(),
 hiveLoadDataOperation.getTablePath(),
 hiveLoadDataOperation.getPartitionSpec(),
-hiveLoadDataOperation.isSrcLocal(),
-hiveLoadDataOperation.isOverwrite());
+hiveLoadDataOperation.isOverwrite(),
+hiveLoadDataOperation.isSrcLocal());
 } else {
 hiveCatalog.loadTable(
 hiveLoadDataOperation.getPath(),
 hiveLoadDataOperation.getTablePath(),
-hiveLoadDataOperation.isSrcLocal(),
-hiveLoadDataOperation.isOverwrite());
+hiveLoadDataOperation.isOverwrite(),
+hiveLoadDataOperation.isSrcLocal());
 }
 return Optional.of(TableResultImpl.TABLE_RESULT_OK);
 } finally {
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index d305e34ef2d..a2fc20bb1e0 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -715,17 +715,22 @@ public class HiveDialectQueryITCase {
 .replace("$filepath", 
testLoadCsvFilePath));
 
 // test load data into table
-tableEnv.executeSql("insert into tab1 values (1, 1), (1, 2), (2, 
1), (2, 2)").await();
+tableEnv.executeSql("insert into tab1 values (1, 1), (1, 
2)").await();
 tableEnv.executeSql(
 String.format(
 "load data local inpath '%s' INTO TABLE tab2", 
warehouse + "/tab1"));
 List result =
 CollectionUtil.iteratorToList(
 tableEnv.executeSql("select * from 
tab2").collect());
-assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2], 
+I[2, 1], +I[2, 2]]");
+assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2]]");
+// there should still exist data in tab1
+result =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select * from 
tab1").collect());
+assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2]]");
 
 // test load data overwrite
-tableEnv.executeSql("insert into tab1 values (2, 1), (2, 
2)").await();
+tableEnv.executeSql("insert overwrite table tab1 values (2, 1), 
(2, 2)").await();
 tableEnv.executeSql(
 String.format(
 "load data local inpath '%s' overwrite into table 
tab2",
@@ -741,6 +746,8 @@ public class HiveDialectQueryITCase {
 "load data inpath '%s' into table p_table 
partition (dateint=2022) ",
 testLoadCsvFilePath))
 .await();
+// the file should be removed
+assertThat(new File(testLoadCsvFilePath).exists()).isFalse();
 result =
 CollectionUtil.iteratorToList(
 tableEnv.executeSql("select * from p_table where 
dateint=2022")



[flink] 01/04: [FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY FUNCTION USING JAR with Hive dialect

2022-09-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 82ab2918e992f747043dbe49d900b36fe28df282
Author: luoyuxia 
AuthorDate: Wed Sep 7 14:38:04 2022 +0800

[FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY 
FUNCTION USING JAR with Hive dialect

This closes #20776
---
 .../table/planner/delegation/hive/HiveParser.java  |  2 +-
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java  |  9 +++--
 .../flink/connectors/hive/HiveDialectITCase.java   | 22 +++---
 .../flink/table/catalog/FunctionCatalog.java   |  2 +-
 4 files changed, 24 insertions(+), 11 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 713ee146a7a..0dd5613a8e3 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -330,7 +330,7 @@ public class HiveParser extends ParserImpl {
 dmlHelper,
 frameworkConfig,
 plannerContext.getCluster(),
-
plannerContext.getFlinkContext().getClassLoader());
+plannerContext.getFlinkContext());
 return 
Collections.singletonList(ddlAnalyzer.convertToOperation(node));
 } else {
 return processQuery(context, hiveConf, hiveShim, node);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index 6730d5cde76..d104a1de0f3 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -43,6 +43,7 @@ import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.CatalogViewImpl;
 import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -88,6 +89,7 @@ import 
org.apache.flink.table.operations.ddl.DropPartitionsOperation;
 import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
+import org.apache.flink.table.planner.calcite.FlinkContext;
 import org.apache.flink.table.planner.delegation.hive.HiveParser;
 import org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner;
 import org.apache.flink.table.planner.delegation.hive.HiveParserConstants;
@@ -204,6 +206,7 @@ public class HiveParserDDLSemanticAnalyzer {
 private final FrameworkConfig frameworkConfig;
 private final RelOptCluster cluster;
 private final ClassLoader classLoader;
+private final FunctionCatalog functionCatalog;
 
 static {
 TokenToTypeName.put(HiveASTParser.TOK_BOOLEAN, 
serdeConstants.BOOLEAN_TYPE_NAME);
@@ -267,7 +270,7 @@ public class HiveParserDDLSemanticAnalyzer {
 HiveParserDMLHelper dmlHelper,
 FrameworkConfig frameworkConfig,
 RelOptCluster cluster,
-ClassLoader classLoader)
+FlinkContext flinkContext)
 throws SemanticException {
 this.queryState = queryState;
 this.conf = queryState.getConf();
@@ -281,7 +284,8 @@ public class HiveParserDDLSemanticAnalyzer {
 this.dmlHelper = dmlHelper;
 this.frameworkConfig = frameworkConfig;
 this.cluster = cluster;
-this.classLoader = classLoader;
+this.classLoader = flinkContext.getClassLoader();
+this.functionCatalog = flinkContext.getFunctionCatalog();
 reservedPartitionValues = new HashSet<>();
 // Partition can't have this name
 reservedPartitionValues.add(HiveConf.getVar(conf, 
HiveConf.ConfVars.DEFAULTPARTITIONNAME));
@@ -530,6 +534,7 @@ public class HiveParserDDLSemanticAnalyzer {
 List resources = getResourceList(ast);
 
 if (isTemporaryFunction) {
+

[flink] branch release-1.16 updated (22086c67a6a -> d4d855a3c08)

2022-09-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


from 22086c67a6a [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis 
connector when closing (#20853)
 new 82ab2918e99 [FLINK-29185][hive] Fix ClassNotFoundException for CREATE 
TEMPORARY FUNCTION USING JAR with Hive dialect
 new 9e16d54b9ea [FLINK-29045][hive] Optimize error message in Flink SQL 
Client and Gateway when try to use Hive Dialect
 new bff0985aef4 [FLINK-29222][hive] Fix wrong behavior for Hive's load 
data inpath
 new d4d855a3c08 [FLINK-29191][hive] fix Hive dialect can't get value for 
the variables set by set command

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../delegation/hive/HiveOperationExecutor.java | 15 ++
 .../table/planner/delegation/hive/HiveParser.java  | 20 -
 .../delegation/hive/copy/HiveSetProcessor.java | 21 -
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java  |  9 --
 .../flink/connectors/hive/HiveDialectITCase.java   | 35 +++---
 .../connectors/hive/HiveDialectQueryITCase.java| 13 ++--
 .../client/gateway/context/SessionContext.java | 11 +++
 .../flink-sql-client/src/test/resources/sql/set.q  | 24 +++
 .../gateway/service/context/SessionContext.java| 34 ++---
 .../flink/table/catalog/FunctionCatalog.java   |  2 +-
 10 files changed, 130 insertions(+), 54 deletions(-)



[flink-jira-bot] branch dependabot/pip/urllib3-1.26.5 created (now fe0f3d4)

2022-09-20 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch dependabot/pip/urllib3-1.26.5
in repository https://gitbox.apache.org/repos/asf/flink-jira-bot.git


  at fe0f3d4  Bump urllib3 from 1.26.4 to 1.26.5

No new revisions were added by this update.



[flink] branch release-1.15 updated: [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853)

2022-09-20 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
 new eb65655f8ce [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis 
connector when closing (#20853)
eb65655f8ce is described below

commit eb65655f8ce39627a6bd28c8b0c92db44d2e
Author: harker2015 
AuthorDate: Tue Sep 20 17:31:44 2022 +0200

[FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when 
closing (#20853)

* [FLINK-29324] Fix NPE for Kinesis connector when closing

* [FLINK-29324] Add unit test case
---
 .../flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java  | 7 ++-
 .../streaming/connectors/kinesis/FlinkKinesisConsumerTest.java| 8 
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index b0a729fb11e..488a1f54e85 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -418,7 +418,12 @@ public class FlinkKinesisConsumer extends 
RichParallelSourceFunction
 public void close() throws Exception {
 cancel();
 // safe-guard when the fetcher has been interrupted, make sure to not 
leak resources
-fetcher.awaitTermination();
+// application might be stopped before connector subtask has been 
started
+// so we must check if the fetcher is actually created
+KinesisDataFetcher fetcher = this.fetcher;
+if (fetcher != null) {
+fetcher.awaitTermination();
+}
 this.fetcher = null;
 super.close();
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index d48e04ed4e6..6ad94f823e8 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -1194,6 +1194,14 @@ public class FlinkKinesisConsumerTest extends TestLogger 
{
 testHarness.close();
 }
 
+@Test
+public void testCloseConnectorBeforeSubtaskStart() throws Exception {
+Properties config = TestUtils.getStandardProperties();
+FlinkKinesisConsumer consumer =
+new FlinkKinesisConsumer<>("fakeStream", new 
SimpleStringSchema(), config);
+consumer.close();
+}
+
 private void awaitRecordCount(ConcurrentLinkedQueue 
queue, int count)
 throws Exception {
 Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));



[flink] branch release-1.16 updated: [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853)

2022-09-20 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
 new 22086c67a6a [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis 
connector when closing (#20853)
22086c67a6a is described below

commit 22086c67a6a97148eb74ed32b281eec393721738
Author: harker2015 
AuthorDate: Tue Sep 20 17:31:44 2022 +0200

[FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when 
closing (#20853)

* [FLINK-29324] Fix NPE for Kinesis connector when closing

* [FLINK-29324] Add unit test case
---
 .../flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java  | 7 ++-
 .../streaming/connectors/kinesis/FlinkKinesisConsumerTest.java| 8 
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index b0a729fb11e..488a1f54e85 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -418,7 +418,12 @@ public class FlinkKinesisConsumer extends 
RichParallelSourceFunction
 public void close() throws Exception {
 cancel();
 // safe-guard when the fetcher has been interrupted, make sure to not 
leak resources
-fetcher.awaitTermination();
+// application might be stopped before connector subtask has been 
started
+// so we must check if the fetcher is actually created
+KinesisDataFetcher fetcher = this.fetcher;
+if (fetcher != null) {
+fetcher.awaitTermination();
+}
 this.fetcher = null;
 super.close();
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index deef7b38057..51367a6a110 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -1184,6 +1184,14 @@ public class FlinkKinesisConsumerTest extends TestLogger 
{
 testHarness.close();
 }
 
+@Test
+public void testCloseConnectorBeforeSubtaskStart() throws Exception {
+Properties config = TestUtils.getStandardProperties();
+FlinkKinesisConsumer consumer =
+new FlinkKinesisConsumer<>("fakeStream", new 
SimpleStringSchema(), config);
+consumer.close();
+}
+
 private void awaitRecordCount(ConcurrentLinkedQueue 
queue, int count)
 throws Exception {
 Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));



[flink] branch master updated: [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853)

2022-09-20 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 71fea9a4522 [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis 
connector when closing (#20853)
71fea9a4522 is described below

commit 71fea9a4522505a6c0f23f1de599b7f87a633ccf
Author: harker2015 
AuthorDate: Tue Sep 20 17:31:44 2022 +0200

[FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when 
closing (#20853)

* [FLINK-29324] Fix NPE for Kinesis connector when closing

* [FLINK-29324] Add unit test case
---
 .../flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java  | 7 ++-
 .../streaming/connectors/kinesis/FlinkKinesisConsumerTest.java| 8 
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index b0a729fb11e..488a1f54e85 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -418,7 +418,12 @@ public class FlinkKinesisConsumer extends 
RichParallelSourceFunction
 public void close() throws Exception {
 cancel();
 // safe-guard when the fetcher has been interrupted, make sure to not 
leak resources
-fetcher.awaitTermination();
+// application might be stopped before connector subtask has been 
started
+// so we must check if the fetcher is actually created
+KinesisDataFetcher fetcher = this.fetcher;
+if (fetcher != null) {
+fetcher.awaitTermination();
+}
 this.fetcher = null;
 super.close();
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index deef7b38057..51367a6a110 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -1184,6 +1184,14 @@ public class FlinkKinesisConsumerTest extends TestLogger 
{
 testHarness.close();
 }
 
+@Test
+public void testCloseConnectorBeforeSubtaskStart() throws Exception {
+Properties config = TestUtils.getStandardProperties();
+FlinkKinesisConsumer consumer =
+new FlinkKinesisConsumer<>("fakeStream", new 
SimpleStringSchema(), config);
+consumer.close();
+}
+
 private void awaitRecordCount(ConcurrentLinkedQueue 
queue, int count)
 throws Exception {
 Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));



[flink] branch master updated (791d8396163 -> 4448d9fd5e3)

2022-09-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 791d8396163 [FLINK-29045][hive] Optimize error message in Flink SQL 
Client and Gateway when try to use Hive Dialect
 add 4448d9fd5e3 [FLINK-29222][hive] Fix wrong behavior for Hive's load 
data inpath

No new revisions were added by this update.

Summary of changes:
 .../planner/delegation/hive/HiveOperationExecutor.java  |  8 
 .../flink/connectors/hive/HiveDialectQueryITCase.java   | 13 ++---
 2 files changed, 14 insertions(+), 7 deletions(-)



[flink] branch master updated: [FLINK-29045][hive] Optimize error message in Flink SQL Client and Gateway when try to use Hive Dialect

2022-09-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 791d8396163 [FLINK-29045][hive] Optimize error message in Flink SQL 
Client and Gateway when try to use Hive Dialect
791d8396163 is described below

commit 791d8396163a8eb045493f7333218c5d881cc6ff
Author: yuxia Luo 
AuthorDate: Tue Sep 20 22:28:35 2022 +0800

[FLINK-29045][hive] Optimize error message in Flink SQL Client and Gateway 
when try to use Hive Dialect

This closes #20695
---
 .../client/gateway/context/SessionContext.java | 11 +++
 .../gateway/service/context/SessionContext.java| 34 +++---
 2 files changed, 35 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
index 7e44b65dad6..ef5917ea740 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
@@ -24,6 +24,8 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
@@ -34,6 +36,7 @@ import 
org.apache.flink.table.client.resource.ClientResourceManager;
 import org.apache.flink.table.client.util.ClientClassloaderUtil;
 import org.apache.flink.table.client.util.ClientWrapperClassLoader;
 import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import org.slf4j.Logger;
@@ -156,6 +159,14 @@ public class SessionContext {
 } catch (Exception e) {
 // get error and reset the key with old value
 resetSessionConfigurationToDefault(originConfiguration);
+if (value.equalsIgnoreCase(SqlDialect.HIVE.name())
+&& e instanceof ValidationException) {
+String additionErrorMsg =
+"Note: if you want to use Hive dialect, "
++ "please first move the jar 
`flink-table-planner_2.12` located in `FLINK_HOME/opt` "
++ "to `FLINK_HOME/lib` and then move out the 
jar `flink-table-planner-loader` from `FLINK_HOME/lib`.";
+ExceptionUtils.updateDetailMessage(e, t -> t.getMessage() + 
additionErrorMsg);
+}
 throw new SqlExecutionException(
 String.format("Failed to set key %s with value %s.", key, 
value), e);
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index 94cc9d13b32..67d73a6afc7 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -23,8 +23,10 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
 import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
@@ -46,6 +48,7 @@ import 
org.apache.flink.table.gateway.service.operation.OperationManager;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkUserCodeClassLoaders;
 import org.apache.flink.util.MutableURLClassLoader;
 
@@ -293,16 +296,27 @@ public class SessionContext {
 catalogManager,
 functionCatalog);
 
-

[flink-table-store] branch master updated: [hotfix] Note execution.checkpointing.mode in write documentation

2022-09-20 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new 9b84de44 [hotfix] Note execution.checkpointing.mode in write 
documentation
9b84de44 is described below

commit 9b84de44ab8e2c28490c8fc98aab22369cee8a0d
Author: JingsongLi 
AuthorDate: Tue Sep 20 22:00:48 2022 +0800

[hotfix] Note execution.checkpointing.mode in write documentation
---
 docs/content/docs/development/write-table.md | 9 -
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/docs/content/docs/development/write-table.md 
b/docs/content/docs/development/write-table.md
index cbb80825..9bbf0e7e 100644
--- a/docs/content/docs/development/write-table.md
+++ b/docs/content/docs/development/write-table.md
@@ -38,11 +38,10 @@ column_list:
 ```
 
 {{< hint info >}}
-__IMPORTANT:__ Checkpointing needs to be enabled when writing to the Table 
Store in STREAMING mode.
-{{< /hint >}}
-
-{{< hint info >}}
-__IMPORTANT:__ `execution.checkpointing.unaligned` is not supported when 
writing to the Table Store in STREAMING mode.
+__IMPORTANT:__ 
+- Checkpointing needs to be enabled when writing to the Table Store in 
STREAMING mode.
+- `execution.checkpointing.unaligned=true` is not supported when writing to 
the Table Store in STREAMING mode.
+- `execution.checkpointing.mode=AT_LEAST_ONCE` is not supported when writing 
to the Table Store in STREAMING mode.
 {{< /hint >}}
 
 ## Parallelism



[flink-table-store] branch release-0.2 updated: [hotfix] Note execution.checkpointing.mode in write documentation

2022-09-20 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
 new 4775c360 [hotfix] Note execution.checkpointing.mode in write 
documentation
4775c360 is described below

commit 4775c3600a9d798eece83adf2d34720cb48a4eec
Author: JingsongLi 
AuthorDate: Tue Sep 20 22:00:48 2022 +0800

[hotfix] Note execution.checkpointing.mode in write documentation
---
 docs/content/docs/development/write-table.md | 9 -
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/docs/content/docs/development/write-table.md 
b/docs/content/docs/development/write-table.md
index cbb80825..9bbf0e7e 100644
--- a/docs/content/docs/development/write-table.md
+++ b/docs/content/docs/development/write-table.md
@@ -38,11 +38,10 @@ column_list:
 ```
 
 {{< hint info >}}
-__IMPORTANT:__ Checkpointing needs to be enabled when writing to the Table 
Store in STREAMING mode.
-{{< /hint >}}
-
-{{< hint info >}}
-__IMPORTANT:__ `execution.checkpointing.unaligned` is not supported when 
writing to the Table Store in STREAMING mode.
+__IMPORTANT:__ 
+- Checkpointing needs to be enabled when writing to the Table Store in 
STREAMING mode.
+- `execution.checkpointing.unaligned=true` is not supported when writing to 
the Table Store in STREAMING mode.
+- `execution.checkpointing.mode=AT_LEAST_ONCE` is not supported when writing 
to the Table Store in STREAMING mode.
 {{< /hint >}}
 
 ## Parallelism



[flink-table-store] branch master updated: [hotfix] Document is not supported

2022-09-20 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new 4a28edd3 [hotfix] Document  is not supported
4a28edd3 is described below

commit 4a28edd3372dac8e21d831655020bbc03753ec28
Author: JingsongLi 
AuthorDate: Tue Sep 20 18:08:10 2022 +0800

[hotfix] Document  is not supported
---
 docs/content/docs/development/write-table.md | 4 
 1 file changed, 4 insertions(+)

diff --git a/docs/content/docs/development/write-table.md 
b/docs/content/docs/development/write-table.md
index 6873fc81..cbb80825 100644
--- a/docs/content/docs/development/write-table.md
+++ b/docs/content/docs/development/write-table.md
@@ -41,6 +41,10 @@ column_list:
 __IMPORTANT:__ Checkpointing needs to be enabled when writing to the Table 
Store in STREAMING mode.
 {{< /hint >}}
 
+{{< hint info >}}
+__IMPORTANT:__ `execution.checkpointing.unaligned` is not supported when 
writing to the Table Store in STREAMING mode.
+{{< /hint >}}
+
 ## Parallelism
 
 It is recommended that the parallelism of sink should be less than or



[flink-table-store] branch release-0.2 updated: [hotfix] Document is not supported

2022-09-20 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
 new 828b8519 [hotfix] Document  is not supported
828b8519 is described below

commit 828b851962e1c4adf011abfb4943e8c186010041
Author: JingsongLi 
AuthorDate: Tue Sep 20 18:08:10 2022 +0800

[hotfix] Document  is not supported
---
 docs/content/docs/development/write-table.md | 4 
 1 file changed, 4 insertions(+)

diff --git a/docs/content/docs/development/write-table.md 
b/docs/content/docs/development/write-table.md
index 6873fc81..cbb80825 100644
--- a/docs/content/docs/development/write-table.md
+++ b/docs/content/docs/development/write-table.md
@@ -41,6 +41,10 @@ column_list:
 __IMPORTANT:__ Checkpointing needs to be enabled when writing to the Table 
Store in STREAMING mode.
 {{< /hint >}}
 
+{{< hint info >}}
+__IMPORTANT:__ `execution.checkpointing.unaligned` is not supported when 
writing to the Table Store in STREAMING mode.
+{{< /hint >}}
+
 ## Parallelism
 
 It is recommended that the parallelism of sink should be less than or



[flink] branch master updated: [FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY FUNCTION USING JAR with Hive dialect

2022-09-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 3994788892f [FLINK-29185][hive] Fix ClassNotFoundException for CREATE 
TEMPORARY FUNCTION USING JAR with Hive dialect
3994788892f is described below

commit 3994788892fc761cf0c2fd09f362d4dab8f14c61
Author: luoyuxia 
AuthorDate: Wed Sep 7 14:38:04 2022 +0800

[FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY 
FUNCTION USING JAR with Hive dialect

This closes #20776
---
 .../table/planner/delegation/hive/HiveParser.java  |  2 +-
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java  |  9 +++--
 .../flink/connectors/hive/HiveDialectITCase.java   | 22 +++---
 .../flink/table/catalog/FunctionCatalog.java   |  2 +-
 4 files changed, 24 insertions(+), 11 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 713ee146a7a..0dd5613a8e3 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -330,7 +330,7 @@ public class HiveParser extends ParserImpl {
 dmlHelper,
 frameworkConfig,
 plannerContext.getCluster(),
-
plannerContext.getFlinkContext().getClassLoader());
+plannerContext.getFlinkContext());
 return 
Collections.singletonList(ddlAnalyzer.convertToOperation(node));
 } else {
 return processQuery(context, hiveConf, hiveShim, node);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index 6730d5cde76..d104a1de0f3 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -43,6 +43,7 @@ import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.CatalogViewImpl;
 import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -88,6 +89,7 @@ import 
org.apache.flink.table.operations.ddl.DropPartitionsOperation;
 import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
+import org.apache.flink.table.planner.calcite.FlinkContext;
 import org.apache.flink.table.planner.delegation.hive.HiveParser;
 import org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner;
 import org.apache.flink.table.planner.delegation.hive.HiveParserConstants;
@@ -204,6 +206,7 @@ public class HiveParserDDLSemanticAnalyzer {
 private final FrameworkConfig frameworkConfig;
 private final RelOptCluster cluster;
 private final ClassLoader classLoader;
+private final FunctionCatalog functionCatalog;
 
 static {
 TokenToTypeName.put(HiveASTParser.TOK_BOOLEAN, 
serdeConstants.BOOLEAN_TYPE_NAME);
@@ -267,7 +270,7 @@ public class HiveParserDDLSemanticAnalyzer {
 HiveParserDMLHelper dmlHelper,
 FrameworkConfig frameworkConfig,
 RelOptCluster cluster,
-ClassLoader classLoader)
+FlinkContext flinkContext)
 throws SemanticException {
 this.queryState = queryState;
 this.conf = queryState.getConf();
@@ -281,7 +284,8 @@ public class HiveParserDDLSemanticAnalyzer {
 this.dmlHelper = dmlHelper;
 this.frameworkConfig = frameworkConfig;
 this.cluster = cluster;
-this.classLoader = classLoader;
+this.classLoader = flinkContext.getClassLoader();
+this.functionCatalog = flinkContext.getFunctionCatalog();
 reservedPartitionValues = new HashSet<>();
 // Partition can't have this name
 reservedPartitionValues.add(HiveConf.getVar(conf, 

[flink-kubernetes-operator] branch main updated (f8d85b0c -> c3b81943)

2022-09-20 Thread gyfora
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


from f8d85b0c [FLINK-29257] Clarify description of SAVEPOINT upgrade mode
 add c3b81943 [FLINK-28574] Bump the JOSDK version to 3.2.2

No new revisions were added by this update.

Summary of changes:
 .../main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java | 3 ---
 flink-kubernetes-operator/src/main/resources/META-INF/NOTICE  | 4 ++--
 pom.xml   | 2 +-
 3 files changed, 3 insertions(+), 6 deletions(-)