[flink-kubernetes-operator] branch main updated: [hotfix] Remove strange configs from helm defaults
This is an automated email from the ASF dual-hosted git repository. mbalassi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new c84f923f [hotfix] Remove strange configs from helm defaults c84f923f is described below commit c84f923ff5da82fa7120298e2b2c5fafcfa31ed2 Author: Gyula Fora AuthorDate: Wed Sep 21 06:07:10 2022 +0200 [hotfix] Remove strange configs from helm defaults --- helm/flink-kubernetes-operator/conf/flink-conf.yaml | 6 -- 1 file changed, 6 deletions(-) diff --git a/helm/flink-kubernetes-operator/conf/flink-conf.yaml b/helm/flink-kubernetes-operator/conf/flink-conf.yaml index be6a85c9..b310cd6e 100644 --- a/helm/flink-kubernetes-operator/conf/flink-conf.yaml +++ b/helm/flink-kubernetes-operator/conf/flink-conf.yaml @@ -18,12 +18,6 @@ # Flink job/cluster related configs taskmanager.numberOfTaskSlots: 2 -blob.server.port: 6124 -jobmanager.rpc.port: 6123 -taskmanager.rpc.port: 6122 -queryable-state.proxy.ports: 6125 -jobmanager.memory.process.size: 1600m -taskmanager.memory.process.size: 1728m parallelism.default: 2 # Flink operator related configs
[flink] branch release-1.16 updated: [hotfix] Make ParquetProtoWriters.ParquetProtoWriterBuilder public to support customized scenarios
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.16 by this push: new c963ba6589b [hotfix] Make ParquetProtoWriters.ParquetProtoWriterBuilder public to support customized scenarios c963ba6589b is described below commit c963ba6589bf8fb547dd97c5f47d3d881d5cf46b Author: Jin AuthorDate: Tue Aug 30 18:34:30 2022 -0700 [hotfix] Make ParquetProtoWriters.ParquetProtoWriterBuilder public to support customized scenarios This closes #20786. --- .../apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java index 022b2a6e63e..698b32b14ea 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java @@ -45,12 +45,12 @@ public class ParquetProtoWriters { // /** The builder for Protobuf {@link ParquetWriter}. */ -private static class ParquetProtoWriterBuilder +public static class ParquetProtoWriterBuilder extends ParquetWriter.Builder> { private final Class clazz; -protected ParquetProtoWriterBuilder(OutputFile outputFile, Class clazz) { +public ParquetProtoWriterBuilder(OutputFile outputFile, Class clazz) { super(outputFile); this.clazz = clazz; }
[flink] branch master updated (05600f844a9 -> aab13977bea)
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 05600f844a9 [FLINK-29325][docs] Fix documentation bug on how to enable batch mode for streaming examples add aab13977bea [hotfix] Make ParquetProtoWriters.ParquetProtoWriterBuilder public to support customized scenarios No new revisions were added by this update. Summary of changes: .../apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
[flink-table-store] branch master updated: [FLINK-29345] Create reusing reader/writer config in orc format
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new 835632c6 [FLINK-29345] Create reusing reader/writer config in orc format 835632c6 is described below commit 835632c6e4758ad7d11ccbdb3a8ebb8dfa6aa709 Author: shammon AuthorDate: Wed Sep 21 11:37:28 2022 +0800 [FLINK-29345] Create reusing reader/writer config in orc format This closes #296 --- .../store/format/orc/OrcBulkWriterFactory.java | 116 + .../table/store/format/orc/OrcFileFormat.java | 35 --- .../store/format/orc/OrcBulkWriterFactoryTest.java | 83 +++ .../table/store/format/orc/OrcFileFormatTest.java | 9 +- 4 files changed, 225 insertions(+), 18 deletions(-) diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java new file mode 100644 index ..b6670392 --- /dev/null +++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.format.orc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.orc.vector.Vectorizer; +import org.apache.flink.orc.writer.PhysicalWriterImpl; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.OrcFile; +import org.apache.orc.Writer; +import org.apache.orc.impl.WriterImpl; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Orc {@link BulkWriter.Factory}. The main code is copied from Flink {@code OrcBulkWriterFactory}. + */ +public class OrcBulkWriterFactory implements BulkWriter.Factory { + +private final Vectorizer vectorizer; +private final OrcFile.WriterOptions writerOptions; + +/** + * Creates a new OrcBulkWriterFactory using the provided Vectorizer, ORC WriterOptions. + * + * @param vectorizer The vectorizer implementation to convert input record to a + * VectorizerRowBatch. + * @param writerOptions ORC WriterOptions. + */ +public OrcBulkWriterFactory(Vectorizer vectorizer, OrcFile.WriterOptions writerOptions) { +this.vectorizer = checkNotNull(vectorizer); +this.writerOptions = checkNotNull(writerOptions); +} + +@Override +public BulkWriter create(FSDataOutputStream out) throws IOException { +OrcFile.WriterOptions opts = getWriterOptions(); +opts.physicalWriter(new PhysicalWriterImpl(out, opts)); + +// The path of the Writer is not used to indicate the destination file +// in this case since we have used a dedicated physical writer to write +// to the give output stream directly. However, the path would be used as +// the key of writer in the ORC memory manager, thus we need to make it unique. +Path unusedPath = new Path(UUID.randomUUID().toString()); +return new OrcBulkWriter<>(vectorizer, new WriterImpl(null, unusedPath, opts)); +} + +@VisibleForTesting +protected OrcFile.WriterOptions getWriterOptions() { +return writerOptions; +} + +/** Orc {@link BulkWriter}. The main code is copied from Flink {@code OrcBulkWriter}. */ +private static class OrcBulkWriter implements BulkWriter { + +private final Writer writer; +private final Vectorizer vectorizer; +private final VectorizedRowBatch rowBatch; + +OrcBulkWriter(Vectorizer vectorizer, Writer writer) { +this.vectorizer = checkNotNull(vectorizer); +this.writer = checkNotNull(writer); +this.rowBatch = vectorizer.getSchema().createRowBatch(); + +// Configure
[flink] branch release-1.15 updated: [FLINK-29325][docs] Fix documentation bug on how to enable batch mode for streaming examples
This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.15 by this push: new 9ee1589c425 [FLINK-29325][docs] Fix documentation bug on how to enable batch mode for streaming examples 9ee1589c425 is described below commit 9ee1589c42565f47fdae8b82d488e6610bdb7fc6 Author: Jun He AuthorDate: Sat Sep 17 10:56:07 2022 +0800 [FLINK-29325][docs] Fix documentation bug on how to enable batch mode for streaming examples This closes #20849. --- docs/content.zh/docs/dev/datastream/execution_mode.md | 2 +- docs/content/docs/dev/datastream/execution_mode.md| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/execution_mode.md b/docs/content.zh/docs/dev/datastream/execution_mode.md index 2cf146a0c53..d243517cbc8 100644 --- a/docs/content.zh/docs/dev/datastream/execution_mode.md +++ b/docs/content.zh/docs/dev/datastream/execution_mode.md @@ -60,7 +60,7 @@ Apache Flink 对流处理和批处理统一方法,意味着无论配置何种 下面是如何通过命令行配置执行模式: ```bash -$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar +$ bin/flink run -Dexecution.runtime-mode=BATCH ``` 这个例子展示了如何在代码中配置执行模式: diff --git a/docs/content/docs/dev/datastream/execution_mode.md b/docs/content/docs/dev/datastream/execution_mode.md index 415a82d7f6a..3f903907cdc 100644 --- a/docs/content/docs/dev/datastream/execution_mode.md +++ b/docs/content/docs/dev/datastream/execution_mode.md @@ -96,7 +96,7 @@ programmatically when creating/configuring the `StreamExecutionEnvironment`. Here's how you can configure the execution mode via the command line: ```bash -$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar +$ bin/flink run -Dexecution.runtime-mode=BATCH ``` This example shows how you can configure the execution mode in code:
[flink] branch release-1.16 updated: [FLINK-29325][docs] Fix documentation bug on how to enable batch mode for streaming examples
This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.16 by this push: new de4aa4b7fee [FLINK-29325][docs] Fix documentation bug on how to enable batch mode for streaming examples de4aa4b7fee is described below commit de4aa4b7fee0f112fa3cfe66d0ad620841e18d74 Author: Jun He AuthorDate: Sat Sep 17 10:56:07 2022 +0800 [FLINK-29325][docs] Fix documentation bug on how to enable batch mode for streaming examples This closes #20849. --- docs/content.zh/docs/dev/datastream/execution_mode.md | 2 +- docs/content/docs/dev/datastream/execution_mode.md| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/execution_mode.md b/docs/content.zh/docs/dev/datastream/execution_mode.md index 2cf146a0c53..d243517cbc8 100644 --- a/docs/content.zh/docs/dev/datastream/execution_mode.md +++ b/docs/content.zh/docs/dev/datastream/execution_mode.md @@ -60,7 +60,7 @@ Apache Flink 对流处理和批处理统一方法,意味着无论配置何种 下面是如何通过命令行配置执行模式: ```bash -$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar +$ bin/flink run -Dexecution.runtime-mode=BATCH ``` 这个例子展示了如何在代码中配置执行模式: diff --git a/docs/content/docs/dev/datastream/execution_mode.md b/docs/content/docs/dev/datastream/execution_mode.md index 415a82d7f6a..3f903907cdc 100644 --- a/docs/content/docs/dev/datastream/execution_mode.md +++ b/docs/content/docs/dev/datastream/execution_mode.md @@ -96,7 +96,7 @@ programmatically when creating/configuring the `StreamExecutionEnvironment`. Here's how you can configure the execution mode via the command line: ```bash -$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar +$ bin/flink run -Dexecution.runtime-mode=BATCH ``` This example shows how you can configure the execution mode in code:
[flink] branch master updated (64c550c67c2 -> 05600f844a9)
This is an automated email from the ASF dual-hosted git repository. tangyun pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 64c550c67c2 [FLINK-29191][hive] Fix Hive dialect can't get value for the variables set by set command add 05600f844a9 [FLINK-29325][docs] Fix documentation bug on how to enable batch mode for streaming examples No new revisions were added by this update. Summary of changes: docs/content.zh/docs/dev/datastream/execution_mode.md | 2 +- docs/content/docs/dev/datastream/execution_mode.md| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-)
[flink-table-store] branch master updated: [FLINK-29297] Group Table Store file writers into SingleFileWriter and RollingFileWriter
This is an automated email from the ASF dual-hosted git repository. czweng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new 736f936a [FLINK-29297] Group Table Store file writers into SingleFileWriter and RollingFileWriter 736f936a is described below commit 736f936aa1c8b1ad4f3acae8d52821ed6597f751 Author: tsreaper AuthorDate: Wed Sep 21 10:25:22 2022 +0800 [FLINK-29297] Group Table Store file writers into SingleFileWriter and RollingFileWriter This closes #295 --- .../source/FileStoreSourceSplitReaderTest.java | 2 +- .../source/TestChangelogDataReadWrite.java | 2 +- .../table/store/file/data/AppendOnlyWriter.java| 135 ++-- .../table/store/file/data/DataFileReader.java | 56 +-- .../table/store/file/data/DataFileWriter.java | 179 - .../store/file/{writer => io}/FileWriter.java | 54 --- .../KeyValueDataFileRecordReader.java} | 33 ++-- .../store/file/io/KeyValueDataFileWriter.java | 146 + .../file/{writer => io}/RollingFileWriter.java | 87 ++ .../RowDataFileRecordReader.java} | 14 +- .../table/store/file/io/RowDataFileWriter.java | 79 + .../store/file/io/RowDataRollingFileWriter.java| 50 ++ .../table/store/file/io/SingleFileWriter.java | 152 + .../file/io/StatsCollectingSingleFileWriter.java | 76 + .../table/store/file/manifest/ManifestFile.java| 52 ++ .../store/file/mergetree/MergeTreeWriter.java | 5 +- .../file/operation/AppendOnlyFileStoreRead.java| 4 +- .../file/operation/AppendOnlyFileStoreWrite.java | 11 +- .../table/store/file/operation/FileStoreWrite.java | 2 +- .../file/operation/KeyValueFileStoreWrite.java | 2 +- .../store/file/{writer => utils}/RecordWriter.java | 13 +- .../table/store/file/writer/BaseFileWriter.java| 118 -- .../flink/table/store/file/writer/Metric.java | 48 -- .../table/store/file/writer/MetricFileWriter.java | 179 - .../store/table/AppendOnlyFileStoreTable.java | 2 +- .../table/ChangelogValueCountFileStoreTable.java | 2 +- .../table/ChangelogWithKeyFileStoreTable.java | 2 +- .../table/store/table/sink/AbstractTableWrite.java | 2 +- .../table/store/table/sink/MemoryTableWrite.java | 2 +- .../flink/table/store/file/TestFileStore.java | 2 +- .../store/file/data/AppendOnlyWriterTest.java | 12 +- .../flink/table/store/file/data/DataFileTest.java | 11 +- .../store/file/format/FileFormatSuffixTest.java| 7 +- .../table/store/file/mergetree/MergeTreeTest.java | 14 +- 34 files changed, 723 insertions(+), 832 deletions(-) diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java index 25211fe6..049b6c85 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java @@ -30,7 +30,7 @@ import org.apache.flink.table.store.file.KeyValue; import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.schema.UpdateSchema; -import org.apache.flink.table.store.file.writer.RecordWriter; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java index 8c7a504b..4145bf54 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java @@ -36,8 +36,8 @@ import org.apache.flink.table.store.file.predicate.Predicate; import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.store.file.utils.SnapshotManager; -import
[flink] branch master updated: [FLINK-29191][hive] Fix Hive dialect can't get value for the variables set by set command
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 64c550c67c2 [FLINK-29191][hive] Fix Hive dialect can't get value for the variables set by set command 64c550c67c2 is described below commit 64c550c67c2d580f369dfaa6ff48e2e6816c6fcd Author: yuxia Luo AuthorDate: Wed Sep 21 09:46:50 2022 +0800 [FLINK-29191][hive] Fix Hive dialect can't get value for the variables set by set command This closes #20774 --- .../delegation/hive/HiveOperationExecutor.java | 7 +-- .../table/planner/delegation/hive/HiveParser.java | 18 ++-- .../delegation/hive/copy/HiveSetProcessor.java | 21 --- .../flink/connectors/hive/HiveDialectITCase.java | 13 ++-- .../flink-sql-client/src/test/resources/sql/set.q | 24 ++ 5 files changed, 57 insertions(+), 26 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java index 597584cc3fd..664ce4e6e35 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java @@ -85,7 +85,7 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor { catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null); if (!(currentCatalog instanceof HiveCatalog)) { throw new FlinkHiveException( -"Only support SET command when the current catalog is HiveCatalog ing Hive dialect."); +"Only support SET command when the current catalog is HiveCatalog in Hive dialect."); } HiveConf hiveConf = ((HiveCatalog) currentCatalog).getHiveConf(); @@ -112,7 +112,10 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor { // set key String option = HiveSetProcessor.getVariable( -hiveConf, hiveVariables, hiveSetOperation.getKey().get()); +tableConfig.getConfiguration().toMap(), +hiveConf, +hiveVariables, +hiveSetOperation.getKey().get()); return Optional.of(buildResultForShowVariable(Collections.singletonList(option))); } else { HiveSetProcessor.setVariable( diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java index 0dd5613a8e3..c8d1630133e 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java @@ -217,15 +217,16 @@ public class HiveParser extends ParserImpl { } private Optional tryProcessHiveNonSqlStatement(HiveConf hiveConf, String statement) { +statement = statement.trim(); +if (statement.endsWith(";")) { +// the command may end with ";" since it won't be removed by Flink SQL CLI, +// so, we need to remove ";" +statement = statement.substring(0, statement.length() - 1); +} String[] commandTokens = statement.split("\\s+"); HiveCommand hiveCommand = HiveCommand.find(commandTokens); if (hiveCommand != null) { String cmdArgs = statement.substring(commandTokens[0].length()).trim(); -// the command may end with ";" since it won't be removed by Flink SQL CLI, -// so, we need to remove ";" -if (cmdArgs.endsWith(";")) { -cmdArgs = cmdArgs.substring(0, cmdArgs.length() - 1); -} if (hiveCommand == HiveCommand.SET) { return Optional.of(processSetCmd(statement, cmdArgs)); } else if (hiveCommand == HiveCommand.RESET) { @@ -242,9 +243,14 @@ public class HiveParser extends ParserImpl { private Operation processSetCmd(String originCmd, String setCmdArgs) { if (setCmdArgs.equals("")) { -return new HiveSetOperation(); +// the command is "set", if we follow Hive's behavior, it will output all configurations +// including hiveconf, hivevar, env, ... which are too
[flink] 04/04: [FLINK-29191][hive] fix Hive dialect can't get value for the variables set by set command
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git commit d4d855a3c08733afac935d87df6544f0811aef84 Author: luoyuxia AuthorDate: Wed Sep 7 10:54:26 2022 +0800 [FLINK-29191][hive] fix Hive dialect can't get value for the variables set by set command --- .../delegation/hive/HiveOperationExecutor.java | 7 +-- .../table/planner/delegation/hive/HiveParser.java | 18 ++-- .../delegation/hive/copy/HiveSetProcessor.java | 21 --- .../flink/connectors/hive/HiveDialectITCase.java | 13 ++-- .../flink-sql-client/src/test/resources/sql/set.q | 24 ++ 5 files changed, 57 insertions(+), 26 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java index 597584cc3fd..664ce4e6e35 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java @@ -85,7 +85,7 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor { catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null); if (!(currentCatalog instanceof HiveCatalog)) { throw new FlinkHiveException( -"Only support SET command when the current catalog is HiveCatalog ing Hive dialect."); +"Only support SET command when the current catalog is HiveCatalog in Hive dialect."); } HiveConf hiveConf = ((HiveCatalog) currentCatalog).getHiveConf(); @@ -112,7 +112,10 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor { // set key String option = HiveSetProcessor.getVariable( -hiveConf, hiveVariables, hiveSetOperation.getKey().get()); +tableConfig.getConfiguration().toMap(), +hiveConf, +hiveVariables, +hiveSetOperation.getKey().get()); return Optional.of(buildResultForShowVariable(Collections.singletonList(option))); } else { HiveSetProcessor.setVariable( diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java index 0dd5613a8e3..c8d1630133e 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java @@ -217,15 +217,16 @@ public class HiveParser extends ParserImpl { } private Optional tryProcessHiveNonSqlStatement(HiveConf hiveConf, String statement) { +statement = statement.trim(); +if (statement.endsWith(";")) { +// the command may end with ";" since it won't be removed by Flink SQL CLI, +// so, we need to remove ";" +statement = statement.substring(0, statement.length() - 1); +} String[] commandTokens = statement.split("\\s+"); HiveCommand hiveCommand = HiveCommand.find(commandTokens); if (hiveCommand != null) { String cmdArgs = statement.substring(commandTokens[0].length()).trim(); -// the command may end with ";" since it won't be removed by Flink SQL CLI, -// so, we need to remove ";" -if (cmdArgs.endsWith(";")) { -cmdArgs = cmdArgs.substring(0, cmdArgs.length() - 1); -} if (hiveCommand == HiveCommand.SET) { return Optional.of(processSetCmd(statement, cmdArgs)); } else if (hiveCommand == HiveCommand.RESET) { @@ -242,9 +243,14 @@ public class HiveParser extends ParserImpl { private Operation processSetCmd(String originCmd, String setCmdArgs) { if (setCmdArgs.equals("")) { -return new HiveSetOperation(); +// the command is "set", if we follow Hive's behavior, it will output all configurations +// including hiveconf, hivevar, env, ... which are too many. +// So in here, for this case, just delegate to Flink's own behavior +// which will only output the flink configuration. +return super.parse(originCmd).get(0); } if
[flink] 02/04: [FLINK-29045][hive] Optimize error message in Flink SQL Client and Gateway when try to use Hive Dialect
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git commit 9e16d54b9ea0422a97bcbe20ebb244be54dc1c3c Author: yuxia Luo AuthorDate: Tue Sep 20 22:28:35 2022 +0800 [FLINK-29045][hive] Optimize error message in Flink SQL Client and Gateway when try to use Hive Dialect This closes #20695 --- .../client/gateway/context/SessionContext.java | 11 +++ .../gateway/service/context/SessionContext.java| 34 +++--- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java index 7e44b65dad6..ef5917ea740 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java @@ -24,6 +24,8 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.FunctionCatalog; @@ -34,6 +36,7 @@ import org.apache.flink.table.client.resource.ClientResourceManager; import org.apache.flink.table.client.util.ClientClassloaderUtil; import org.apache.flink.table.client.util.ClientWrapperClassLoader; import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TemporaryClassLoaderContext; import org.slf4j.Logger; @@ -156,6 +159,14 @@ public class SessionContext { } catch (Exception e) { // get error and reset the key with old value resetSessionConfigurationToDefault(originConfiguration); +if (value.equalsIgnoreCase(SqlDialect.HIVE.name()) +&& e instanceof ValidationException) { +String additionErrorMsg = +"Note: if you want to use Hive dialect, " ++ "please first move the jar `flink-table-planner_2.12` located in `FLINK_HOME/opt` " ++ "to `FLINK_HOME/lib` and then move out the jar `flink-table-planner-loader` from `FLINK_HOME/lib`."; +ExceptionUtils.updateDetailMessage(e, t -> t.getMessage() + additionErrorMsg); +} throw new SqlExecutionException( String.format("Failed to set key %s with value %s.", key, value), e); } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java index 94cc9d13b32..67d73a6afc7 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java @@ -23,8 +23,10 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentInternal; @@ -46,6 +48,7 @@ import org.apache.flink.table.gateway.service.operation.OperationManager; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.resource.ResourceManager; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkUserCodeClassLoaders; import org.apache.flink.util.MutableURLClassLoader; @@ -293,16 +296,27 @@ public class SessionContext { catalogManager, functionCatalog); -return new StreamTableEnvironmentImpl( -catalogManager, -moduleManager, -resourceManager, -functionCatalog, -tableConfig, -
[flink] 03/04: [FLINK-29222][hive] Fix wrong behavior for Hive's load data inpath
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git commit bff0985aef4ed43681e6ad3bd81fc460bef3c6a5 Author: yuxia Luo AuthorDate: Tue Sep 20 22:30:08 2022 +0800 [FLINK-29222][hive] Fix wrong behavior for Hive's load data inpath This closes #20778 --- .../planner/delegation/hive/HiveOperationExecutor.java | 8 .../flink/connectors/hive/HiveDialectQueryITCase.java | 13 ++--- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java index 2010062fe4c..597584cc3fd 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java @@ -153,14 +153,14 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor { hiveLoadDataOperation.getPath(), hiveLoadDataOperation.getTablePath(), hiveLoadDataOperation.getPartitionSpec(), -hiveLoadDataOperation.isSrcLocal(), -hiveLoadDataOperation.isOverwrite()); +hiveLoadDataOperation.isOverwrite(), +hiveLoadDataOperation.isSrcLocal()); } else { hiveCatalog.loadTable( hiveLoadDataOperation.getPath(), hiveLoadDataOperation.getTablePath(), -hiveLoadDataOperation.isSrcLocal(), -hiveLoadDataOperation.isOverwrite()); +hiveLoadDataOperation.isOverwrite(), +hiveLoadDataOperation.isSrcLocal()); } return Optional.of(TableResultImpl.TABLE_RESULT_OK); } finally { diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java index d305e34ef2d..a2fc20bb1e0 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java @@ -715,17 +715,22 @@ public class HiveDialectQueryITCase { .replace("$filepath", testLoadCsvFilePath)); // test load data into table -tableEnv.executeSql("insert into tab1 values (1, 1), (1, 2), (2, 1), (2, 2)").await(); +tableEnv.executeSql("insert into tab1 values (1, 1), (1, 2)").await(); tableEnv.executeSql( String.format( "load data local inpath '%s' INTO TABLE tab2", warehouse + "/tab1")); List result = CollectionUtil.iteratorToList( tableEnv.executeSql("select * from tab2").collect()); -assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2], +I[2, 1], +I[2, 2]]"); +assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2]]"); +// there should still exist data in tab1 +result = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select * from tab1").collect()); +assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2]]"); // test load data overwrite -tableEnv.executeSql("insert into tab1 values (2, 1), (2, 2)").await(); +tableEnv.executeSql("insert overwrite table tab1 values (2, 1), (2, 2)").await(); tableEnv.executeSql( String.format( "load data local inpath '%s' overwrite into table tab2", @@ -741,6 +746,8 @@ public class HiveDialectQueryITCase { "load data inpath '%s' into table p_table partition (dateint=2022) ", testLoadCsvFilePath)) .await(); +// the file should be removed +assertThat(new File(testLoadCsvFilePath).exists()).isFalse(); result = CollectionUtil.iteratorToList( tableEnv.executeSql("select * from p_table where dateint=2022")
[flink] 01/04: [FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY FUNCTION USING JAR with Hive dialect
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git commit 82ab2918e992f747043dbe49d900b36fe28df282 Author: luoyuxia AuthorDate: Wed Sep 7 14:38:04 2022 +0800 [FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY FUNCTION USING JAR with Hive dialect This closes #20776 --- .../table/planner/delegation/hive/HiveParser.java | 2 +- .../hive/parse/HiveParserDDLSemanticAnalyzer.java | 9 +++-- .../flink/connectors/hive/HiveDialectITCase.java | 22 +++--- .../flink/table/catalog/FunctionCatalog.java | 2 +- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java index 713ee146a7a..0dd5613a8e3 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java @@ -330,7 +330,7 @@ public class HiveParser extends ParserImpl { dmlHelper, frameworkConfig, plannerContext.getCluster(), - plannerContext.getFlinkContext().getClassLoader()); +plannerContext.getFlinkContext()); return Collections.singletonList(ddlAnalyzer.convertToOperation(node)); } else { return processQuery(context, hiveConf, hiveShim, node); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java index 6730d5cde76..d104a1de0f3 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java @@ -43,6 +43,7 @@ import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.CatalogViewImpl; import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.FunctionLanguage; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; @@ -88,6 +89,7 @@ import org.apache.flink.table.operations.ddl.DropPartitionsOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropViewOperation; +import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.delegation.hive.HiveParser; import org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner; import org.apache.flink.table.planner.delegation.hive.HiveParserConstants; @@ -204,6 +206,7 @@ public class HiveParserDDLSemanticAnalyzer { private final FrameworkConfig frameworkConfig; private final RelOptCluster cluster; private final ClassLoader classLoader; +private final FunctionCatalog functionCatalog; static { TokenToTypeName.put(HiveASTParser.TOK_BOOLEAN, serdeConstants.BOOLEAN_TYPE_NAME); @@ -267,7 +270,7 @@ public class HiveParserDDLSemanticAnalyzer { HiveParserDMLHelper dmlHelper, FrameworkConfig frameworkConfig, RelOptCluster cluster, -ClassLoader classLoader) +FlinkContext flinkContext) throws SemanticException { this.queryState = queryState; this.conf = queryState.getConf(); @@ -281,7 +284,8 @@ public class HiveParserDDLSemanticAnalyzer { this.dmlHelper = dmlHelper; this.frameworkConfig = frameworkConfig; this.cluster = cluster; -this.classLoader = classLoader; +this.classLoader = flinkContext.getClassLoader(); +this.functionCatalog = flinkContext.getFunctionCatalog(); reservedPartitionValues = new HashSet<>(); // Partition can't have this name reservedPartitionValues.add(HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME)); @@ -530,6 +534,7 @@ public class HiveParserDDLSemanticAnalyzer { List resources = getResourceList(ast); if (isTemporaryFunction) { +
[flink] branch release-1.16 updated (22086c67a6a -> d4d855a3c08)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from 22086c67a6a [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853) new 82ab2918e99 [FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY FUNCTION USING JAR with Hive dialect new 9e16d54b9ea [FLINK-29045][hive] Optimize error message in Flink SQL Client and Gateway when try to use Hive Dialect new bff0985aef4 [FLINK-29222][hive] Fix wrong behavior for Hive's load data inpath new d4d855a3c08 [FLINK-29191][hive] fix Hive dialect can't get value for the variables set by set command The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../delegation/hive/HiveOperationExecutor.java | 15 ++ .../table/planner/delegation/hive/HiveParser.java | 20 - .../delegation/hive/copy/HiveSetProcessor.java | 21 - .../hive/parse/HiveParserDDLSemanticAnalyzer.java | 9 -- .../flink/connectors/hive/HiveDialectITCase.java | 35 +++--- .../connectors/hive/HiveDialectQueryITCase.java| 13 ++-- .../client/gateway/context/SessionContext.java | 11 +++ .../flink-sql-client/src/test/resources/sql/set.q | 24 +++ .../gateway/service/context/SessionContext.java| 34 ++--- .../flink/table/catalog/FunctionCatalog.java | 2 +- 10 files changed, 130 insertions(+), 54 deletions(-)
[flink-jira-bot] branch dependabot/pip/urllib3-1.26.5 created (now fe0f3d4)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/pip/urllib3-1.26.5 in repository https://gitbox.apache.org/repos/asf/flink-jira-bot.git at fe0f3d4 Bump urllib3 from 1.26.4 to 1.26.5 No new revisions were added by this update.
[flink] branch release-1.15 updated: [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853)
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.15 by this push: new eb65655f8ce [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853) eb65655f8ce is described below commit eb65655f8ce39627a6bd28c8b0c92db44d2e Author: harker2015 AuthorDate: Tue Sep 20 17:31:44 2022 +0200 [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853) * [FLINK-29324] Fix NPE for Kinesis connector when closing * [FLINK-29324] Add unit test case --- .../flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java | 7 ++- .../streaming/connectors/kinesis/FlinkKinesisConsumerTest.java| 8 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index b0a729fb11e..488a1f54e85 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -418,7 +418,12 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction public void close() throws Exception { cancel(); // safe-guard when the fetcher has been interrupted, make sure to not leak resources -fetcher.awaitTermination(); +// application might be stopped before connector subtask has been started +// so we must check if the fetcher is actually created +KinesisDataFetcher fetcher = this.fetcher; +if (fetcher != null) { +fetcher.awaitTermination(); +} this.fetcher = null; super.close(); } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index d48e04ed4e6..6ad94f823e8 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -1194,6 +1194,14 @@ public class FlinkKinesisConsumerTest extends TestLogger { testHarness.close(); } +@Test +public void testCloseConnectorBeforeSubtaskStart() throws Exception { +Properties config = TestUtils.getStandardProperties(); +FlinkKinesisConsumer consumer = +new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); +consumer.close(); +} + private void awaitRecordCount(ConcurrentLinkedQueue queue, int count) throws Exception { Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
[flink] branch release-1.16 updated: [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853)
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.16 by this push: new 22086c67a6a [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853) 22086c67a6a is described below commit 22086c67a6a97148eb74ed32b281eec393721738 Author: harker2015 AuthorDate: Tue Sep 20 17:31:44 2022 +0200 [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853) * [FLINK-29324] Fix NPE for Kinesis connector when closing * [FLINK-29324] Add unit test case --- .../flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java | 7 ++- .../streaming/connectors/kinesis/FlinkKinesisConsumerTest.java| 8 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index b0a729fb11e..488a1f54e85 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -418,7 +418,12 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction public void close() throws Exception { cancel(); // safe-guard when the fetcher has been interrupted, make sure to not leak resources -fetcher.awaitTermination(); +// application might be stopped before connector subtask has been started +// so we must check if the fetcher is actually created +KinesisDataFetcher fetcher = this.fetcher; +if (fetcher != null) { +fetcher.awaitTermination(); +} this.fetcher = null; super.close(); } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index deef7b38057..51367a6a110 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -1184,6 +1184,14 @@ public class FlinkKinesisConsumerTest extends TestLogger { testHarness.close(); } +@Test +public void testCloseConnectorBeforeSubtaskStart() throws Exception { +Properties config = TestUtils.getStandardProperties(); +FlinkKinesisConsumer consumer = +new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); +consumer.close(); +} + private void awaitRecordCount(ConcurrentLinkedQueue queue, int count) throws Exception { Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
[flink] branch master updated: [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853)
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 71fea9a4522 [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853) 71fea9a4522 is described below commit 71fea9a4522505a6c0f23f1de599b7f87a633ccf Author: harker2015 AuthorDate: Tue Sep 20 17:31:44 2022 +0200 [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853) * [FLINK-29324] Fix NPE for Kinesis connector when closing * [FLINK-29324] Add unit test case --- .../flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java | 7 ++- .../streaming/connectors/kinesis/FlinkKinesisConsumerTest.java| 8 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index b0a729fb11e..488a1f54e85 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -418,7 +418,12 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction public void close() throws Exception { cancel(); // safe-guard when the fetcher has been interrupted, make sure to not leak resources -fetcher.awaitTermination(); +// application might be stopped before connector subtask has been started +// so we must check if the fetcher is actually created +KinesisDataFetcher fetcher = this.fetcher; +if (fetcher != null) { +fetcher.awaitTermination(); +} this.fetcher = null; super.close(); } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index deef7b38057..51367a6a110 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -1184,6 +1184,14 @@ public class FlinkKinesisConsumerTest extends TestLogger { testHarness.close(); } +@Test +public void testCloseConnectorBeforeSubtaskStart() throws Exception { +Properties config = TestUtils.getStandardProperties(); +FlinkKinesisConsumer consumer = +new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); +consumer.close(); +} + private void awaitRecordCount(ConcurrentLinkedQueue queue, int count) throws Exception { Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
[flink] branch master updated (791d8396163 -> 4448d9fd5e3)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 791d8396163 [FLINK-29045][hive] Optimize error message in Flink SQL Client and Gateway when try to use Hive Dialect add 4448d9fd5e3 [FLINK-29222][hive] Fix wrong behavior for Hive's load data inpath No new revisions were added by this update. Summary of changes: .../planner/delegation/hive/HiveOperationExecutor.java | 8 .../flink/connectors/hive/HiveDialectQueryITCase.java | 13 ++--- 2 files changed, 14 insertions(+), 7 deletions(-)
[flink] branch master updated: [FLINK-29045][hive] Optimize error message in Flink SQL Client and Gateway when try to use Hive Dialect
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 791d8396163 [FLINK-29045][hive] Optimize error message in Flink SQL Client and Gateway when try to use Hive Dialect 791d8396163 is described below commit 791d8396163a8eb045493f7333218c5d881cc6ff Author: yuxia Luo AuthorDate: Tue Sep 20 22:28:35 2022 +0800 [FLINK-29045][hive] Optimize error message in Flink SQL Client and Gateway when try to use Hive Dialect This closes #20695 --- .../client/gateway/context/SessionContext.java | 11 +++ .../gateway/service/context/SessionContext.java| 34 +++--- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java index 7e44b65dad6..ef5917ea740 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java @@ -24,6 +24,8 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.FunctionCatalog; @@ -34,6 +36,7 @@ import org.apache.flink.table.client.resource.ClientResourceManager; import org.apache.flink.table.client.util.ClientClassloaderUtil; import org.apache.flink.table.client.util.ClientWrapperClassLoader; import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TemporaryClassLoaderContext; import org.slf4j.Logger; @@ -156,6 +159,14 @@ public class SessionContext { } catch (Exception e) { // get error and reset the key with old value resetSessionConfigurationToDefault(originConfiguration); +if (value.equalsIgnoreCase(SqlDialect.HIVE.name()) +&& e instanceof ValidationException) { +String additionErrorMsg = +"Note: if you want to use Hive dialect, " ++ "please first move the jar `flink-table-planner_2.12` located in `FLINK_HOME/opt` " ++ "to `FLINK_HOME/lib` and then move out the jar `flink-table-planner-loader` from `FLINK_HOME/lib`."; +ExceptionUtils.updateDetailMessage(e, t -> t.getMessage() + additionErrorMsg); +} throw new SqlExecutionException( String.format("Failed to set key %s with value %s.", key, value), e); } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java index 94cc9d13b32..67d73a6afc7 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java @@ -23,8 +23,10 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentInternal; @@ -46,6 +48,7 @@ import org.apache.flink.table.gateway.service.operation.OperationManager; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.resource.ResourceManager; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkUserCodeClassLoaders; import org.apache.flink.util.MutableURLClassLoader; @@ -293,16 +296,27 @@ public class SessionContext { catalogManager, functionCatalog); -
[flink-table-store] branch master updated: [hotfix] Note execution.checkpointing.mode in write documentation
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new 9b84de44 [hotfix] Note execution.checkpointing.mode in write documentation 9b84de44 is described below commit 9b84de44ab8e2c28490c8fc98aab22369cee8a0d Author: JingsongLi AuthorDate: Tue Sep 20 22:00:48 2022 +0800 [hotfix] Note execution.checkpointing.mode in write documentation --- docs/content/docs/development/write-table.md | 9 - 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/docs/content/docs/development/write-table.md b/docs/content/docs/development/write-table.md index cbb80825..9bbf0e7e 100644 --- a/docs/content/docs/development/write-table.md +++ b/docs/content/docs/development/write-table.md @@ -38,11 +38,10 @@ column_list: ``` {{< hint info >}} -__IMPORTANT:__ Checkpointing needs to be enabled when writing to the Table Store in STREAMING mode. -{{< /hint >}} - -{{< hint info >}} -__IMPORTANT:__ `execution.checkpointing.unaligned` is not supported when writing to the Table Store in STREAMING mode. +__IMPORTANT:__ +- Checkpointing needs to be enabled when writing to the Table Store in STREAMING mode. +- `execution.checkpointing.unaligned=true` is not supported when writing to the Table Store in STREAMING mode. +- `execution.checkpointing.mode=AT_LEAST_ONCE` is not supported when writing to the Table Store in STREAMING mode. {{< /hint >}} ## Parallelism
[flink-table-store] branch release-0.2 updated: [hotfix] Note execution.checkpointing.mode in write documentation
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.2 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/release-0.2 by this push: new 4775c360 [hotfix] Note execution.checkpointing.mode in write documentation 4775c360 is described below commit 4775c3600a9d798eece83adf2d34720cb48a4eec Author: JingsongLi AuthorDate: Tue Sep 20 22:00:48 2022 +0800 [hotfix] Note execution.checkpointing.mode in write documentation --- docs/content/docs/development/write-table.md | 9 - 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/docs/content/docs/development/write-table.md b/docs/content/docs/development/write-table.md index cbb80825..9bbf0e7e 100644 --- a/docs/content/docs/development/write-table.md +++ b/docs/content/docs/development/write-table.md @@ -38,11 +38,10 @@ column_list: ``` {{< hint info >}} -__IMPORTANT:__ Checkpointing needs to be enabled when writing to the Table Store in STREAMING mode. -{{< /hint >}} - -{{< hint info >}} -__IMPORTANT:__ `execution.checkpointing.unaligned` is not supported when writing to the Table Store in STREAMING mode. +__IMPORTANT:__ +- Checkpointing needs to be enabled when writing to the Table Store in STREAMING mode. +- `execution.checkpointing.unaligned=true` is not supported when writing to the Table Store in STREAMING mode. +- `execution.checkpointing.mode=AT_LEAST_ONCE` is not supported when writing to the Table Store in STREAMING mode. {{< /hint >}} ## Parallelism
[flink-table-store] branch master updated: [hotfix] Document is not supported
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new 4a28edd3 [hotfix] Document is not supported 4a28edd3 is described below commit 4a28edd3372dac8e21d831655020bbc03753ec28 Author: JingsongLi AuthorDate: Tue Sep 20 18:08:10 2022 +0800 [hotfix] Document is not supported --- docs/content/docs/development/write-table.md | 4 1 file changed, 4 insertions(+) diff --git a/docs/content/docs/development/write-table.md b/docs/content/docs/development/write-table.md index 6873fc81..cbb80825 100644 --- a/docs/content/docs/development/write-table.md +++ b/docs/content/docs/development/write-table.md @@ -41,6 +41,10 @@ column_list: __IMPORTANT:__ Checkpointing needs to be enabled when writing to the Table Store in STREAMING mode. {{< /hint >}} +{{< hint info >}} +__IMPORTANT:__ `execution.checkpointing.unaligned` is not supported when writing to the Table Store in STREAMING mode. +{{< /hint >}} + ## Parallelism It is recommended that the parallelism of sink should be less than or
[flink-table-store] branch release-0.2 updated: [hotfix] Document is not supported
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.2 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/release-0.2 by this push: new 828b8519 [hotfix] Document is not supported 828b8519 is described below commit 828b851962e1c4adf011abfb4943e8c186010041 Author: JingsongLi AuthorDate: Tue Sep 20 18:08:10 2022 +0800 [hotfix] Document is not supported --- docs/content/docs/development/write-table.md | 4 1 file changed, 4 insertions(+) diff --git a/docs/content/docs/development/write-table.md b/docs/content/docs/development/write-table.md index 6873fc81..cbb80825 100644 --- a/docs/content/docs/development/write-table.md +++ b/docs/content/docs/development/write-table.md @@ -41,6 +41,10 @@ column_list: __IMPORTANT:__ Checkpointing needs to be enabled when writing to the Table Store in STREAMING mode. {{< /hint >}} +{{< hint info >}} +__IMPORTANT:__ `execution.checkpointing.unaligned` is not supported when writing to the Table Store in STREAMING mode. +{{< /hint >}} + ## Parallelism It is recommended that the parallelism of sink should be less than or
[flink] branch master updated: [FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY FUNCTION USING JAR with Hive dialect
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3994788892f [FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY FUNCTION USING JAR with Hive dialect 3994788892f is described below commit 3994788892fc761cf0c2fd09f362d4dab8f14c61 Author: luoyuxia AuthorDate: Wed Sep 7 14:38:04 2022 +0800 [FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY FUNCTION USING JAR with Hive dialect This closes #20776 --- .../table/planner/delegation/hive/HiveParser.java | 2 +- .../hive/parse/HiveParserDDLSemanticAnalyzer.java | 9 +++-- .../flink/connectors/hive/HiveDialectITCase.java | 22 +++--- .../flink/table/catalog/FunctionCatalog.java | 2 +- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java index 713ee146a7a..0dd5613a8e3 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java @@ -330,7 +330,7 @@ public class HiveParser extends ParserImpl { dmlHelper, frameworkConfig, plannerContext.getCluster(), - plannerContext.getFlinkContext().getClassLoader()); +plannerContext.getFlinkContext()); return Collections.singletonList(ddlAnalyzer.convertToOperation(node)); } else { return processQuery(context, hiveConf, hiveShim, node); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java index 6730d5cde76..d104a1de0f3 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java @@ -43,6 +43,7 @@ import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.CatalogViewImpl; import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.FunctionLanguage; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; @@ -88,6 +89,7 @@ import org.apache.flink.table.operations.ddl.DropPartitionsOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropViewOperation; +import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.delegation.hive.HiveParser; import org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner; import org.apache.flink.table.planner.delegation.hive.HiveParserConstants; @@ -204,6 +206,7 @@ public class HiveParserDDLSemanticAnalyzer { private final FrameworkConfig frameworkConfig; private final RelOptCluster cluster; private final ClassLoader classLoader; +private final FunctionCatalog functionCatalog; static { TokenToTypeName.put(HiveASTParser.TOK_BOOLEAN, serdeConstants.BOOLEAN_TYPE_NAME); @@ -267,7 +270,7 @@ public class HiveParserDDLSemanticAnalyzer { HiveParserDMLHelper dmlHelper, FrameworkConfig frameworkConfig, RelOptCluster cluster, -ClassLoader classLoader) +FlinkContext flinkContext) throws SemanticException { this.queryState = queryState; this.conf = queryState.getConf(); @@ -281,7 +284,8 @@ public class HiveParserDDLSemanticAnalyzer { this.dmlHelper = dmlHelper; this.frameworkConfig = frameworkConfig; this.cluster = cluster; -this.classLoader = classLoader; +this.classLoader = flinkContext.getClassLoader(); +this.functionCatalog = flinkContext.getFunctionCatalog(); reservedPartitionValues = new HashSet<>(); // Partition can't have this name reservedPartitionValues.add(HiveConf.getVar(conf,
[flink-kubernetes-operator] branch main updated (f8d85b0c -> c3b81943)
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git from f8d85b0c [FLINK-29257] Clarify description of SAVEPOINT upgrade mode add c3b81943 [FLINK-28574] Bump the JOSDK version to 3.2.2 No new revisions were added by this update. Summary of changes: .../main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java | 3 --- flink-kubernetes-operator/src/main/resources/META-INF/NOTICE | 4 ++-- pom.xml | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-)