(flink) branch master updated: [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4cc24c1dd17 [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission 4cc24c1dd17 is described below commit 4cc24c1dd17b0abe3c4372652c7ab88fedc7e478 Author: yunfengzhou-hub AuthorDate: Mon Dec 18 14:27:51 2023 +0800 [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission --- .../flink/client/FlinkPipelineTranslationUtil.java | 15 +++-- .../handlers/JarHandlerParameterTest.java | 2 +- .../handlers/JarRunHandlerParameterTest.java | 26 +- .../flink/runtime/dispatcher/Dispatcher.java | 4 +++- .../flink/runtime/dispatcher/DispatcherTest.java | 11 - 5 files changed, 52 insertions(+), 6 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java index eedf204676e..0aefcb3ef79 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java @@ -21,6 +21,7 @@ package org.apache.flink.client; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.runtime.jobgraph.JobGraph; /** @@ -40,8 +41,18 @@ public final class FlinkPipelineTranslationUtil { FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(userClassloader, pipeline); -return pipelineTranslator.translateToJobGraph( -pipeline, optimizerConfiguration, defaultParallelism); +JobGraph jobGraph = +pipelineTranslator.translateToJobGraph( +pipeline, optimizerConfiguration, defaultParallelism); + +optimizerConfiguration +.getOptional(PipelineOptions.PARALLELISM_OVERRIDES) +.ifPresent( +map -> +jobGraph.getJobConfiguration() + .set(PipelineOptions.PARALLELISM_OVERRIDES, map)); + +return jobGraph; } /** diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java index 9bb662b1d16..e48c8c3ece8 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java @@ -85,7 +85,7 @@ abstract class JarHandlerParameterTest< static Time timeout = Time.seconds(10); static Map responseHeaders = Collections.emptyMap(); -private static Path jarWithManifest; +protected static Path jarWithManifest; private static Path jarWithoutManifest; static void init(File tmpDir) throws Exception { diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java index 2d236335163..f27c9d30513 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java @@ -28,6 +28,7 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -56,6 +57,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -87,7 +89,14 @@ class JarRunHandlerParameterTest .set(CoreOptions.DEFAULT_PARALLELISM, 57) .set(SavepointConfigOptions.SAVEPOINT_PATH, "/foo/bar/test") .set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, false) -.set(SavepointConfigOptions.RESTORE_MODE, RESTORE_MODE); +.set(SavepointConfigOption
(flink) branch release-1.18 updated: [FLINK-33531][python] Remove cython upper bounds
This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new 0d6ab1db6c0 [FLINK-33531][python] Remove cython upper bounds 0d6ab1db6c0 is described below commit 0d6ab1db6c04cd88f646d545075bea539bac9fcf Author: Sergey Nuyanzin AuthorDate: Mon Dec 18 23:14:17 2023 +0100 [FLINK-33531][python] Remove cython upper bounds Co-authored-by: HuangXingBo --- flink-python/dev/dev-requirements.txt | 2 +- flink-python/pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-python/dev/dev-requirements.txt b/flink-python/dev/dev-requirements.txt index bfede381ab1..723f196bc96 100755 --- a/flink-python/dev/dev-requirements.txt +++ b/flink-python/dev/dev-requirements.txt @@ -16,7 +16,7 @@ pip>=20.3 setuptools>=18.0 wheel apache-beam>=2.43.0,<2.49.0 -cython>=0.29.24,<3 +cython>=0.29.24 py4j==0.10.9.7 python-dateutil>=2.8.0,<3 cloudpickle~=2.2.0 diff --git a/flink-python/pyproject.toml b/flink-python/pyproject.toml index fbef9b70315..ccdfcc9ffa5 100644 --- a/flink-python/pyproject.toml +++ b/flink-python/pyproject.toml @@ -22,7 +22,7 @@ requires = [ "setuptools>=18.0", "wheel", "apache-beam>=2.43.0,<2.49.0", -"cython>=0.29.24,<3", +"cython>=0.29.24", "fastavro>=1.1.0,!=1.8.0" ]
(flink) branch master updated: [FLINK-32850][flink-runtime][JUnit5 Migration] The io.network.api package of flink-runtime module
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1c67cccd2fd [FLINK-32850][flink-runtime][JUnit5 Migration] The io.network.api package of flink-runtime module 1c67cccd2fd is described below commit 1c67cccd2fdd6c674a38e0c26fe990e1dd7b62ae Author: Jiabao Sun AuthorDate: Wed Oct 25 16:31:45 2023 +0800 [FLINK-32850][flink-runtime][JUnit5 Migration] The io.network.api package of flink-runtime module --- .../io/network/api/CheckpointBarrierTest.java | 26 +-- .../io/network/api/reader/AbstractReaderTest.java | 89 .../serialization/CheckpointSerializationTest.java | 26 +-- .../api/serialization/EventSerializerTest.java | 43 ++-- .../network/api/serialization/PagedViewsTest.java | 238 + .../SpanningRecordSerializationTest.java | 55 +++-- .../api/serialization/SpanningWrapperTest.java | 21 +- .../api/writer/BroadcastRecordWriterTest.java | 3 +- .../api/writer/RecordWriterDelegateTest.java | 61 +++--- .../network/api/writer/SubtaskStateMapperTest.java | 169 +++ 10 files changed, 273 insertions(+), 458 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java index 70de3450654..9b34ee62a64 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java @@ -22,37 +22,29 @@ import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.junit.Test; +import org.junit.jupiter.api.Test; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link CheckpointBarrier} type. */ -public class CheckpointBarrierTest { +class CheckpointBarrierTest { /** * Test serialization of the checkpoint barrier. The checkpoint barrier does not support its own * serialization, in order to be immutable. */ @Test -public void testSerialization() throws Exception { +void testSerialization() { long id = Integer.MAX_VALUE + 123123L; long timestamp = Integer.MAX_VALUE + 1228L; CheckpointOptions options = CheckpointOptions.forCheckpointWithDefaultLocation(); CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options); -try { -barrier.write(new DataOutputSerializer(1024)); -fail("should throw an exception"); -} catch (UnsupportedOperationException e) { -// expected -} - -try { -barrier.read(new DataInputDeserializer(new byte[32])); -fail("should throw an exception"); -} catch (UnsupportedOperationException e) { -// expected -} +assertThatThrownBy(() -> barrier.write(new DataOutputSerializer(1024))) +.isInstanceOf(UnsupportedOperationException.class); + +assertThatThrownBy(() -> barrier.read(new DataInputDeserializer(new byte[32]))) +.isInstanceOf(UnsupportedOperationException.class); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java index 969cae48997..32228784396 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java @@ -26,25 +26,24 @@ import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.util.event.EventListener; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.mockito.Matchers; import java.io.IOException; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** Tests for the event handling behaviour. */ -public class AbstractReaderTest { +class AbstractReaderTest { @Test @SuppressWarnings("unchecked") -public void te
(flink) branch master updated: [FLINK-32949][streaming] Add new taskmanager.collect-sink.port TaskManagerOptions. CollectSinkFunction$ServerThread uses the port configured in TaskManagerOptions#COLLEC
This is an automated email from the ASF dual-hosted git repository. jingge pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e6169cee12d [FLINK-32949][streaming] Add new taskmanager.collect-sink.port TaskManagerOptions. CollectSinkFunction$ServerThread uses the port configured in TaskManagerOptions#COLLECT_PORT. e6169cee12d is described below commit e6169cee12d66ce22f7907b8dc947d4e96228b30 Author: jingge AuthorDate: Tue Dec 5 00:03:38 2023 +0100 [FLINK-32949][streaming] Add new taskmanager.collect-sink.port TaskManagerOptions. CollectSinkFunction$ServerThread uses the port configured in TaskManagerOptions#COLLECT_PORT. --- .../generated/all_taskmanager_section.html | 6 +++ .../generated/common_host_port_section.html| 6 +++ .../generated/task_manager_configuration.html | 6 +++ .../flink/configuration/TaskManagerOptions.java| 13 ++ .../api/operators/collect/CollectSinkFunction.java | 31 +- .../operators/collect/CollectSinkFunctionTest.java | 50 -- .../utils/CollectSinkFunctionTestWrapper.java | 5 +++ 7 files changed, 92 insertions(+), 25 deletions(-) diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_section.html index 1f60107c70b..0e7ba018327 100644 --- a/docs/layouts/shortcodes/generated/all_taskmanager_section.html +++ b/docs/layouts/shortcodes/generated/all_taskmanager_section.html @@ -32,6 +32,12 @@ String The local address of the network interface that the task manager binds to. If not configured, '0.0.0.0' will be used. + +taskmanager.collect-sink.port +0 +Integer +The port used for the client to retrieve query results from the TaskManager. The default value is 0, which corresponds to a random port assignment. + taskmanager.data.bind-port (none) diff --git a/docs/layouts/shortcodes/generated/common_host_port_section.html b/docs/layouts/shortcodes/generated/common_host_port_section.html index 160cdd5328d..3c2c318fe6c 100644 --- a/docs/layouts/shortcodes/generated/common_host_port_section.html +++ b/docs/layouts/shortcodes/generated/common_host_port_section.html @@ -74,6 +74,12 @@ String The local address of the network interface that the task manager binds to. If not configured, '0.0.0.0' will be used. + +taskmanager.collect-sink.port +0 +Integer +The port used for the client to retrieve query results from the TaskManager. The default value is 0, which corresponds to a random port assignment. + taskmanager.data.bind-port (none) diff --git a/docs/layouts/shortcodes/generated/task_manager_configuration.html b/docs/layouts/shortcodes/generated/task_manager_configuration.html index 6f7ecf554e0..d9ba88f35c0 100644 --- a/docs/layouts/shortcodes/generated/task_manager_configuration.html +++ b/docs/layouts/shortcodes/generated/task_manager_configuration.html @@ -32,6 +32,12 @@ String The local address of the network interface that the task manager binds to. If not configured, '0.0.0.0' will be used. + +taskmanager.collect-sink.port +0 +Integer +The port used for the client to retrieve query results from the TaskManager. The default value is 0, which corresponds to a random port assignment. + taskmanager.debug.memory.log false diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index 5c8d33d7040..2c0a9801ac4 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -173,6 +173,19 @@ public class TaskManagerOptions { + RPC_PORT.key() + "') will be used."); +/** The default port that CollectSinkFunction$ServerThread is using. */ +@Documentation.Section({ +Documentation.Sections.COMMON_HOST_PORT, +Documentation.Sections.ALL_TASK_MANAGER +}) +public static final ConfigOption COLLECT_PORT = +key("taskmanager.collect-sink.port") +.intType() +.defaultValue(0) +.withDescription( +"The port used for the client to retrieve query results from the TaskManager. " ++ "The default value is 0, which corresponds to a random port ass
(flink-connector-aws) branch main updated: [hotfix][Connectors/AWS] Bumping default Flink version in source to 1.17.0
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git The following commit(s) were added to refs/heads/main by this push: new 27db878 [hotfix][Connectors/AWS] Bumping default Flink version in source to 1.17.0 27db878 is described below commit 27db878e7da1d56d395ce80984912b16e5b916d4 Author: Danny Cranmer AuthorDate: Mon Dec 18 13:03:53 2023 + [hotfix][Connectors/AWS] Bumping default Flink version in source to 1.17.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 62aaf70..324a98f 100644 --- a/pom.xml +++ b/pom.xml @@ -56,7 +56,7 @@ under the License. 1.12.439 2.20.144 4.1.86.Final -1.16.0 +1.17.0 2.14.3 1.1.14 32.1.3-jre
(flink) branch master updated (011f7770365 -> 7cd6547a902)
This is an automated email from the ASF dual-hosted git repository. jchan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 011f7770365 [FLINK-33818] Remove WindowDeduplicateJsonITCase test new c3c31dbe16f [hotfix] Fix DataGen connector `fields.#.var-len` option type new 7cd6547a902 [FLINK-32993][table] DataGen connector handles length-constrained fields according to the schema definition by default The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/content.zh/docs/connectors/table/datagen.md | 14 +- docs/content/docs/connectors/table/datagen.md | 18 ++- .../datagen/table/DataGenConnectorOptions.java | 6 +- .../datagen/table/DataGenTableSourceFactory.java | 43 +- .../datagen/table/RandomGeneratorVisitor.java | 91 .../factories/DataGenTableSourceFactoryTest.java | 155 + 6 files changed, 226 insertions(+), 101 deletions(-)
(flink) 02/02: [FLINK-32993][table] DataGen connector handles length-constrained fields according to the schema definition by default
This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 7cd6547a9027dfdc7ea97e496bb0e15213150529 Author: Yubin Li AuthorDate: Tue Nov 7 14:13:22 2023 +0800 [FLINK-32993][table] DataGen connector handles length-constrained fields according to the schema definition by default This closes #23678 --- docs/content.zh/docs/connectors/table/datagen.md | 14 +- docs/content/docs/connectors/table/datagen.md | 18 ++- .../datagen/table/DataGenTableSourceFactory.java | 43 +- .../datagen/table/RandomGeneratorVisitor.java | 91 .../factories/DataGenTableSourceFactoryTest.java | 155 + 5 files changed, 223 insertions(+), 98 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/datagen.md b/docs/content.zh/docs/connectors/table/datagen.md index 210ccc088d2..642382a7768 100644 --- a/docs/content.zh/docs/connectors/table/datagen.md +++ b/docs/content.zh/docs/connectors/table/datagen.md @@ -39,9 +39,14 @@ DataGen 连接器是内置的,不需要额外的依赖项。 - 默认情况下,DataGen 表将创建无限数量的行,每列都有一个随机值。 -对于 char、varchar、binary、varbinary、string、array、map 和 multiset 类型,可以指定长度。 还可以指定总行数,从而生成有界表。 +DataGen 连接器可以生成符合其 schema 的数据,应该注意的是,它按如下方式处理长度受限的字段: + +* 对于固定长度的数据类型(char、binary),字段长度只能由 schema 定义,且不支持自定义; +* 对于可变长度数据类型 (varchar、varbinary),字段默认长度由 schema 定义,且自定义长度不能大于 schema 定义; +* 对于超长字段(string、bytes),字段默认长度为 100,但可以定义为小于 2^31 的长度。 + 还支持序列生成器,您可以指定序列的起始和结束值。 如果表中有任一列是序列类型,则该表将是有界的,并在第一个序列完成时结束。 @@ -294,7 +299,12 @@ CREATE TABLE Orders ( 可选 100 Integer - 随机生成器生成字符的长度,适用于 char、varchar、binary、varbinary、string、array、map、multiset。 + + 随机生成器生成字符的长度,适用于 varchar、varbinary、string、bytes、array、map、multiset。 + 请注意对于可变长字段(varchar、varbinary),默认长度由 schema 定义,且长度不可设置为大于它; + 对于超长字段(string、bytes),默认长度是 100 且可设置为小于 2^31 的长度; + 对于结构化字段(数组、Map、多重集),默认元素数量为 3 且可以自定义。 + fields.#.var-len diff --git a/docs/content/docs/connectors/table/datagen.md b/docs/content/docs/connectors/table/datagen.md index b4fc81a4280..70253786bff 100644 --- a/docs/content/docs/connectors/table/datagen.md +++ b/docs/content/docs/connectors/table/datagen.md @@ -39,9 +39,16 @@ Usage - By default, a DataGen table will create an unbounded number of rows with a random value for each column. -For types, char/varchar/binary/varbinary/string/array/map/multiset, the length can be specified. Additionally, a total number of rows can be specified, resulting in a bounded table. +The DataGen connector can generate data that conforms to its defined schema, It should be noted that it handles length-constrained fields as follows: + +* For fixed-length data types (char/binary), the field length can only be defined by the schema, +and does not support customization. +* For variable-length data types (varchar/varbinary), the field length is initially defined by the schema, +and the customized length cannot be greater than the schema definition. +* For super-long fields (string/bytes), the default length is 100, but can be set to a length less than 2^31. + There also exists a sequence generator, where users specify a sequence of start and end values. If any column in a table is a sequence type, the table will be bounded and end with the first sequence completes. @@ -77,7 +84,7 @@ WITH ( LIKE Orders (EXCLUDING ALL) ``` -Further more, for variable sized types, varchar/string/varbinary/bytes, you can specify whether to enable variable-length data generation. +Furthermore, for variable sized types, varchar/string/varbinary/bytes, you can specify whether to enable variable-length data generation. ```sql CREATE TABLE Orders ( @@ -296,7 +303,12 @@ Connector Options optional 100 Integer - Size or length of the collection for generating char/varchar/binary/varbinary/string/array/map/multiset types. + + Size or length of the collection for generating varchar/varbinary/string/bytes/array/map/multiset types. + Please notice that for variable-length fields (varchar/varbinary), the default length is defined by the schema and cannot be set to a length greater than it. + for super-long fields (string/bytes), the default length is 100 and can be set to a length less than 2^31. + for constructed fields (array/map/multiset), the default number of elements is 3 and can be customized. + fields.#.var-len diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java index 0cb29d61990..72398c34b96 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/
(flink) 01/02: [hotfix] Fix DataGen connector `fields.#.var-len` option type
This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c3c31dbe16f36d5a1661e579ff4c46bdc88ae418 Author: Yubin Li AuthorDate: Mon Dec 11 11:22:33 2023 +0800 [hotfix] Fix DataGen connector `fields.#.var-len` option type This closes #23678 --- .../flink/connector/datagen/table/DataGenConnectorOptions.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java index 379e0456856..aa900754111 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java @@ -118,10 +118,10 @@ public class DataGenConnectorOptions { .withDescription("The proportion of null values."); /** Placeholder {@link ConfigOption}. Not used for retrieving values. */ -public static final ConfigOption FIELD_VAR_LEN = +public static final ConfigOption FIELD_VAR_LEN = ConfigOptions.key(String.format("%s.#.%s", FIELDS, VAR_LEN)) -.floatType() -.defaultValue(0f) +.booleanType() +.defaultValue(false) .withDescription( "Whether to generate a variable-length data, please notice that it should only be used for variable-length types (varchar, string, varbinary, bytes).");
(flink) branch master updated (3c86dcadf53 -> 011f7770365)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 3c86dcadf53 [FLINK-33778][table] Cleanup usage of deprecated TableConfig#setIdleStateRetentionTime new 5799d8d0622 [FLINK-33818] Implement restore tests for WindowDeduplicate node new 011f7770365 [FLINK-33818] Remove WindowDeduplicateJsonITCase test The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../WindowDeduplicateEventTimeRestoreTest.java | 49 ++ .../exec/stream/WindowDeduplicateTestPrograms.java | 303 ++ .../jsonplan/WindowDeduplicateJsonITCase.java | 110 .../window-deduplicate-asc-cumulate-first-row.json | 669 + .../savepoint/_metadata| Bin 0 -> 19751 bytes .../plan/window-deduplicate-asc-hop-first-row.json | 669 + .../savepoint/_metadata| Bin 0 -> 19897 bytes ...ate-asc-partition-by-item-tumble-first-row.json | 668 .../savepoint/_metadata| Bin 0 -> 22588 bytes ...duplicate-asc-tumble-first-row-condition-1.json | 667 .../savepoint/_metadata| Bin 0 -> 19567 bytes ...duplicate-asc-tumble-first-row-condition-3.json | 667 .../savepoint/_metadata| Bin 0 -> 19567 bytes .../window-deduplicate-asc-tumble-first-row.json | 667 .../savepoint/_metadata| Bin 0 -> 19571 bytes ...te-desc-partition-by-item-tumble-first-row.json | 668 .../savepoint/_metadata| Bin 0 -> 22592 bytes .../window-deduplicate-desc-tumble-last-row.json | 667 .../savepoint/_metadata| Bin 0 -> 19571 bytes 19 files changed, 5694 insertions(+), 110 deletions(-) create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowDeduplicateEventTimeRestoreTest.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowDeduplicateTestPrograms.java delete mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-deduplicate_1/window-deduplicate-asc-cumulate-first-row/plan/window-deduplicate-asc-cumulate-first-row.json create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-deduplicate_1/window-deduplicate-asc-cumulate-first-row/savepoint/_metadata create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-deduplicate_1/window-deduplicate-asc-hop-first-row/plan/window-deduplicate-asc-hop-first-row.json create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-deduplicate_1/window-deduplicate-asc-hop-first-row/savepoint/_metadata create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-deduplicate_1/window-deduplicate-asc-partition-by-item-tumble-first-row/plan/window-deduplicate-asc-partition-by-item-tumble-first-row.json create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-deduplicate_1/window-deduplicate-asc-partition-by-item-tumble-first-row/savepoint/_metadata create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-deduplicate_1/window-deduplicate-asc-tumble-first-row-condition-1/plan/window-deduplicate-asc-tumble-first-row-condition-1.json create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-deduplicate_1/window-deduplicate-asc-tumble-first-row-condition-1/savepoint/_metadata create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-deduplicate_1/window-deduplicate-asc-tumble-first-row-condition-3/plan/window-deduplicate-asc-tumble-first-row-condition-3.json create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-deduplicate_1/window-deduplicate-asc-tumble-first-row-condition-3/savepoint/_metadata create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-deduplicate_1/window-deduplicate-asc-tumble-first-row/plan/window-deduplicate-asc-tumble-first-row.json create mode 100644 flink-table/flink-table-planner/src/test/resourc
(flink) 02/02: [FLINK-33818] Remove WindowDeduplicateJsonITCase test
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 011f777036540d0f027b04306714bf9e64003a97 Author: bvarghese1 AuthorDate: Wed Dec 13 14:53:10 2023 -0800 [FLINK-33818] Remove WindowDeduplicateJsonITCase test - This is covered by the new restore tests --- .../jsonplan/WindowDeduplicateJsonITCase.java | 110 - 1 file changed, 110 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java deleted file mode 100644 index ef2cd5d58d9..000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.jsonplan; - -import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.runtime.utils.TestData; -import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.planner.utils.JsonPlanTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -/** Test for window deduplicate json plan. */ -class WindowDeduplicateJsonITCase extends JsonPlanTestBase { - -@BeforeEach -@Override -protected void setup() throws Exception { -super.setup(); -createTestValuesSourceTable( -"MyTable", - JavaScalaConversionUtil.toJava(TestData.windowDataWithTimestamp()), -new String[] { -"ts STRING", -"`int` INT", -"`double` DOUBLE", -"`float` FLOAT", -"`bigdec` DECIMAL(10, 2)", -"`string` STRING", -"`name` STRING", -"`rowtime` AS TO_TIMESTAMP(`ts`)", -"WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND", -}, -new HashMap() { -{ -put("enable-watermark-push-down", "true"); -put("failing-source", "true"); -} -}); -} - -@Test -void testEventTimeTumbleWindow() throws Exception { -createTestValuesSinkTable( -"MySink", -"ts STRING", -"`int` INT", -"`double` DOUBLE", -"`float` FLOAT", -"`bigdec` DECIMAL(10, 2)", -"`string` STRING", -"`name` STRING", -"`rowtime` STRING", -"window_start TIMESTAMP(3)", -"window_end TIMESTAMP(3)", -"window_time TIMESTAMP(3)"); -compileSqlAndExecutePlan( -"insert into MySink select\n" -+ " `ts`,\n" -+ " `int`,\n" -+ " `double`,\n" -+ " `float`, \n" -+ " `bigdec`, \n" -+ " `string`, \n" -+ " `name`, \n" -+ " CAST(`rowtime` AS STRING), \n" -+ " window_start, \n" -+ " window_end, \n" -+ " window_time \n" -+ "FROM (\n" -+ " SELECT *,\n" -+ " ROW_NUMBER() OVER( \n" -+ "PARTITION BY window_start, window_end, `name` ORDER BY rowtime DESC) as rownum \n" -+ " FROM TABLE( \n" -
(flink) branch master updated: [FLINK-33778][table] Cleanup usage of deprecated TableConfig#setIdleStateRetentionTime
This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3c86dcadf53 [FLINK-33778][table] Cleanup usage of deprecated TableConfig#setIdleStateRetentionTime 3c86dcadf53 is described below commit 3c86dcadf5366fa0026125051d69b0a8913d5e61 Author: Jacky Lau AuthorDate: Mon Dec 18 19:41:03 2023 +0800 [FLINK-33778][table] Cleanup usage of deprecated TableConfig#setIdleStateRetentionTime This closes #23894. --- .../plan/rules/physical/stream/ChangelogModeInferenceTest.scala | 4 +++- .../table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala | 3 ++- .../flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala | 4 ++-- .../table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala | 4 +++- .../table/planner/plan/stream/table/TwoStageAggregateTest.scala | 2 +- .../flink/table/planner/runtime/stream/sql/AggregateITCase.scala| 3 ++- .../flink/table/planner/runtime/stream/table/AggregateITCase.scala | 4 +++- .../table/planner/runtime/stream/table/GroupWindowITCase.scala | 5 +++-- .../runtime/stream/table/GroupWindowTableAggregateITCase.scala | 5 +++-- .../flink/table/planner/runtime/stream/table/JoinITCase.scala | 6 -- .../table/planner/runtime/stream/table/TableAggregateITCase.scala | 4 +++- .../table/planner/runtime/utils/StreamingWithAggTestBase.scala | 3 ++- 12 files changed, 31 insertions(+), 16 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala index a40028bd357..9b5c3e01544 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala @@ -26,6 +26,8 @@ import org.apache.flink.table.planner.utils.{AggregatePhaseStrategy, TableTestBa import org.junit.jupiter.api.{BeforeEach, Test} +import java.time.Duration + /** Tests for [[FlinkChangelogModeInferenceProgram]]. */ class ChangelogModeInferenceTest extends TableTestBase { @@ -152,7 +154,7 @@ class ChangelogModeInferenceTest extends TableTestBase { @Test def testTwoLevelGroupByLocalGlobalOn(): Unit = { util.enableMiniBatch() -util.tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) +util.tableEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) util.tableEnv.getConfig.set( OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, AggregatePhaseStrategy.TWO_PHASE.toString) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala index 9850e1afe9c..c8e4420634e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala @@ -28,6 +28,7 @@ import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedT import org.junit.jupiter.api.{BeforeEach, TestTemplate} import org.junit.jupiter.api.extension.ExtendWith +import java.time.Duration import java.util @ExtendWith(Array(classOf[ParameterizedTestExtension])) @@ -41,7 +42,7 @@ class DistinctAggregateTest( @BeforeEach def before(): Unit = { -util.tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) +util.tableEnv.getConfig.setIdleStateRetention(Duration.ofHours(1)) util.enableMiniBatch() util.tableEnv.getConfig .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, aggPhaseEnforcer.toString) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala index 1a74d0b4158..810ab987867 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala @@ -438,7 +438,7 @@ class GroupWindowTest extends TableTestBase { def testWindowAggregateWithLateFire(): Unit = { util.tableConfig.set(TABLE_EXEC_EMI
(flink) branch master updated: Fix NullArgumentException of getQuantile method in DescriptiveStatisticsHistogramStatistics
This is an automated email from the ASF dual-hosted git repository. jingge pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 5e1deeea19c Fix NullArgumentException of getQuantile method in DescriptiveStatisticsHistogramStatistics 5e1deeea19c is described below commit 5e1deeea19c2e73f3bc6f1f9881a778d13abf6a4 Author: 朱通通 AuthorDate: Thu Dec 14 20:05:28 2023 +0800 Fix NullArgumentException of getQuantile method in DescriptiveStatisticsHistogramStatistics --- .../flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java index 7995a2edc5e..c8fa2b2a4d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java @@ -174,6 +174,8 @@ public class DescriptiveStatisticsHistogramStatistics extends HistogramStatistic } if (data != null) { percentilesImpl.setData(data); +} else { +percentilesImpl.setData(new double[] {0.0}); } } }
(flink) branch release-1.18 updated: [FLINK-33704][Filesytems] Update GCS filesystems to latest available versions. This closes #23935
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new 8951f907b46 [FLINK-33704][Filesytems] Update GCS filesystems to latest available versions. This closes #23935 8951f907b46 is described below commit 8951f907b4633d6826852d62920ef638905c7c78 Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com> AuthorDate: Mon Dec 18 09:05:00 2023 +0100 [FLINK-33704][Filesytems] Update GCS filesystems to latest available versions. This closes #23935 * [FLINK-33704][Filesystem] Update GCS filesystem dependencies - Update GS SDK from 2.15.0 to 2.29.1 - Update GS Hadoop Connector from 2.2.15 to 2.2.18 - Update GS GRPC to highest shared version * [FLINK-33704] Make NOTICE inclusion of com.google.re2j for GCS consistent RE2J is listed for the other filesystems as Go license, we should remain consistent throughout the project (cherry picked from commit 492a886248208904276fcd2bda138a79c86bc71c) --- docs/content.zh/docs/deployment/filesystems/gcs.md | 4 +- docs/content/docs/deployment/filesystems/gcs.md| 4 +- flink-filesystems/flink-gs-fs-hadoop/pom.xml | 10 +- .../src/main/resources/META-INF/NOTICE | 105 +++-- 4 files changed, 64 insertions(+), 59 deletions(-) diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md b/docs/content.zh/docs/deployment/filesystems/gcs.md index 363cd2fec90..7edf78b2e61 100644 --- a/docs/content.zh/docs/deployment/filesystems/gcs.md +++ b/docs/content.zh/docs/deployment/filesystems/gcs.md @@ -55,7 +55,7 @@ Note that these examples are *not* exhaustive and you can use GCS in other place Flink provides the `flink-gs-fs-hadoop` file system to write to GCS. This implementation is self-contained with no dependency footprint, so there is no need to add Hadoop to the classpath to use it. -`flink-gs-fs-hadoop` registers a `FileSystem` wrapper for URIs with the *gs://* scheme. It uses Google's [gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop3-2.2.15) Hadoop library to access GCS. It also uses Google's [google-cloud-storage](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage/2.15.0) library to provide `RecoverableWriter` support. +`flink-gs-fs-hadoop` registers a `FileSystem` wrapper for URIs with the *gs://* scheme. It uses Google's [gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop3-2.2.18) Hadoop library to access GCS. It also uses Google's [google-cloud-storage](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage/2.29.1) library to provide `RecoverableWriter` support. This file system can be used with the [FileSystem connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}). @@ -68,7 +68,7 @@ cp ./opt/flink-gs-fs-hadoop-{{< version >}}.jar ./plugins/gs-fs-hadoop/ ### Configuration -The underlying Hadoop file system can be configured using the [Hadoop configuration keys](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/v2.2.15/gcs/CONFIGURATION.md) for `gcs-connector` by adding the configurations to your `flink-conf.yaml`. +The underlying Hadoop file system can be configured using the [Hadoop configuration keys](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/v2.2.18/gcs/CONFIGURATION.md) for `gcs-connector` by adding the configurations to your `flink-conf.yaml`. For example, `gcs-connector` has a `fs.gs.http.connect-timeout` configuration key. If you want to change it, you need to set `gs.http.connect-timeout: xyz` in `flink-conf.yaml`. Flink will internally translate this back to `fs.gs.http.connect-timeout`. diff --git a/docs/content/docs/deployment/filesystems/gcs.md b/docs/content/docs/deployment/filesystems/gcs.md index 97e008eca33..3bd130e2046 100644 --- a/docs/content/docs/deployment/filesystems/gcs.md +++ b/docs/content/docs/deployment/filesystems/gcs.md @@ -55,7 +55,7 @@ Note that these examples are *not* exhaustive and you can use GCS in other place Flink provides the `flink-gs-fs-hadoop` file system to write to GCS. This implementation is self-contained with no dependency footprint, so there is no need to add Hadoop to the classpath to use it. -`flink-gs-fs-hadoop` registers a `FileSystem` wrapper for URIs with the *gs://* scheme. It uses Google's [gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop3-2.2.15) Hadoop library to access GCS. It also uses Google's [google-cloud-storage](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage/2.15.0) library to provide `RecoverableWriter` support. +`flink-gs-fs-hadoop` registers a `FileSystem` wrapper for URIs with