(flink-training) branch master updated: [FLINK-33711] Fix numbers of field of the taxi event / Fix broken link (#66)
This is an automated email from the ASF dual-hosted git repository. danderson pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git The following commit(s) were added to refs/heads/master by this push: new dc81157 [FLINK-33711] Fix numbers of field of the taxi event / Fix broken link (#66) dc81157 is described below commit dc81157f826649b6084d968689f623ce41a1b46c Author: Leona Yoda AuthorDate: Sat Dec 2 11:55:23 2023 +0900 [FLINK-33711] Fix numbers of field of the taxi event / Fix broken link (#66) * fix number of fields of the taxi ride event * fix broken link (use-taxi-data-streams) - Co-authored-by: Leona Yoda --- README.md| 6 +++--- README_zh.md | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index b1a65cb..9ed2a0c 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ Exercises that accompany the training content in the documentation. 1. [Clone and build the flink-training project](#clone-and-build-the-flink-training-project) 1. [Import the flink-training project into your IDE](#import-the-flink-training-project-into-your-ide) -[**Use the taxi data streams**](#using-the-taxi-data-streams) +[**Use the taxi data streams**](#use-the-taxi-data-streams) 1. [Schema of taxi ride events](#schema-of-taxi-ride-events) 1. [Schema of taxi fare events](#schema-of-taxi-fare-events) @@ -148,7 +148,7 @@ Our taxi data set contains information about individual taxi rides in New York C Each ride is represented by two events: a trip start, and a trip end. -Each event consists of eleven fields: +Each event consists of ten fields: ``` rideId : Long // a unique id for each ride @@ -189,7 +189,7 @@ We assume you have set up your development environment according to our [setup g ### Learn about the data The initial set of exercises are all based on data streams of events about taxi rides and taxi fares. These streams are produced by source functions which reads data from input files. -Read the [instructions](#using-the-taxi-data-streams) to learn how to use them. +Read the [instructions](#use-the-taxi-data-streams) to learn how to use them. ### Run and debug Flink programs in your IDE diff --git a/README_zh.md b/README_zh.md index 2af9bba..1914e08 100644 --- a/README_zh.md +++ b/README_zh.md @@ -156,7 +156,7 @@ org.gradle.project.enable_scala = true 每次车程都由两个事件表示:行程开始(trip start)和行程结束(trip end)。 -每个事件都由十一个字段组成: +每个事件都由十个字段组成: ``` rideId : Long // 每次车程的唯一id
(flink) branch master updated: [FLINK-33698][datastream] Fix the backoff calculation of ExponentialBackoffDelayRetryStrategy in AsyncRetryStrategies
This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 5da214c963c [FLINK-33698][datastream] Fix the backoff calculation of ExponentialBackoffDelayRetryStrategy in AsyncRetryStrategies 5da214c963c is described below commit 5da214c963c219c8b3da727ffde5d6995b3770b8 Author: xiangyu0xf AuthorDate: Sat Dec 2 09:00:24 2023 +0800 [FLINK-33698][datastream] Fix the backoff calculation of ExponentialBackoffDelayRetryStrategy in AsyncRetryStrategies This closes #23830 --- .../util/retryable/AsyncRetryStrategies.java | 14 -- .../util/retryable/AsyncRetryStrategiesTest.java | 56 ++ 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java index 519945d5559..580270d203f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java @@ -28,7 +28,12 @@ import java.util.Collection; import java.util.Optional; import java.util.function.Predicate; -/** Utility class to create concrete {@link AsyncRetryStrategy}. */ +/** + * Utility class to create concrete {@link AsyncRetryStrategy}. + * + * NOTICE: For performance reasons, this utility's {@link AsyncRetryStrategy} + * implementation assumes the attempt always start from 1 and will only increase by 1 each time. + */ public class AsyncRetryStrategies { public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy(); @@ -151,10 +156,10 @@ public class AsyncRetryStrategies { private static final long serialVersionUID = 1L; private final int maxAttempts; private final long maxRetryDelay; +private final long initialDelay; private final double multiplier; private final Predicate> resultPredicate; private final Predicate exceptionPredicate; - private long lastRetryDelay; public ExponentialBackoffDelayRetryStrategy( @@ -169,6 +174,7 @@ public class AsyncRetryStrategies { this.multiplier = multiplier; this.resultPredicate = resultPredicate; this.exceptionPredicate = exceptionPredicate; +this.initialDelay = initialDelay; this.lastRetryDelay = initialDelay; } @@ -180,9 +186,11 @@ public class AsyncRetryStrategies { @Override public long getBackoffTimeMillis(int currentAttempts) { if (currentAttempts <= 1) { -// equivalent to initial delay +// reset to initialDelay +this.lastRetryDelay = initialDelay; return lastRetryDelay; } + long backoff = Math.min((long) (lastRetryDelay * multiplier), maxRetryDelay); this.lastRetryDelay = backoff; return backoff; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategiesTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategiesTest.java new file mode 100644 index 000..606119912ce --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategiesTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.retryable; + +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +/** Tests for the {@link AsyncRetryStrategies}. */ +public class AsyncRetryStrategiesTest extends TestLogger { + +@Test +public void testExponentialBackoffDelayRetryStrategy() { +int maxAttempts = 10; +long initialDelay
(flink-web) branch asf-site updated (47d2bfc88 -> 6dc1ab9da)
This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git from 47d2bfc88 Rebuild website new 188075091 Add Kafka connector v3.0.2 release new 6dc1ab9da Rebuild website The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: content/2014/08/26/apache-flink-0.6-available/index.html| 2 +- content/2014/09/26/apache-flink-0.6.1-available/index.html | 2 +- content/2014/10/03/upcoming-events/index.html | 2 +- content/2014/11/04/apache-flink-0.7.0-available/index.html | 2 +- content/2014/11/18/hadoop-compatibility-in-flink/index.html | 2 +- .../01/06/december-2014-in-the-flink-community/index.html | 2 +- content/2015/01/21/apache-flink-0.8.0-available/index.html | 2 +- .../02/04/january-2015-in-the-flink-community/index.html| 2 +- content/2015/02/09/introducing-flink-streaming/index.html | 2 +- .../03/02/february-2015-in-the-flink-community/index.html | 2 +- .../03/13/peeking-into-apache-flinks-engine-room/index.html | 2 +- .../2015/04/07/march-2015-in-the-flink-community/index.html | 2 +- .../index.html | 2 +- content/2015/05/11/juggling-with-bits-and-bytes/index.html | 2 +- .../2015/05/14/april-2015-in-the-flink-community/index.html | 2 +- content/2015/06/24/announcing-apache-flink-0.9.0/index.html | 2 +- .../index.html | 2 +- content/2015/09/01/apache-flink-0.9.1-available/index.html | 2 +- content/2015/09/03/announcing-flink-forward-2015/index.html | 2 +- .../index.html | 2 +- .../2015/11/16/announcing-apache-flink-0.10.0/index.html| 2 +- content/2015/11/27/flink-0.10.1-released/index.html | 2 +- .../introducing-stream-windows-in-apache-flink/index.html | 2 +- .../index.html | 2 +- .../index.html | 2 +- content/2016/02/11/flink-0.10.2-released/index.html | 2 +- content/2016/03/08/announcing-apache-flink-1.0.0/index.html | 2 +- content/2016/04/06/flink-1.0.1-released/index.html | 2 +- .../index.html | 2 +- .../index.html | 2 +- content/2016/04/22/flink-1.0.2-released/index.html | 2 +- content/2016/05/11/flink-1.0.3-released/index.html | 2 +- .../index.html | 2 +- content/2016/08/04/announcing-apache-flink-1.1.0/index.html | 2 +- content/2016/08/04/flink-1.1.1-released/index.html | 2 +- .../index.html | 2 +- content/2016/09/05/apache-flink-1.1.2-released/index.html | 2 +- content/2016/10/12/apache-flink-1.1.3-released/index.html | 2 +- .../12/19/apache-flink-in-2016-year-in-review/index.html| 2 +- content/2016/12/21/apache-flink-1.1.4-released/index.html | 2 +- content/2017/02/06/announcing-apache-flink-1.2.0/index.html | 2 +- content/2017/03/23/apache-flink-1.1.5-released/index.html | 2 +- .../index.html | 2 +- .../03/30/continuous-queries-on-dynamic-tables/index.html | 2 +- content/2017/04/26/apache-flink-1.2.1-released/index.html | 2 +- .../introducing-docker-images-for-apache-flink/index.html | 2 +- .../01/apache-flink-1.3.0-release-announcement/index.html | 2 +- content/2017/06/23/apache-flink-1.3.1-released/index.html | 2 +- .../index.html | 2 +- content/2017/08/05/apache-flink-1.3.2-released/index.html | 2 +- .../index.html | 2 +- .../12/apache-flink-1.4.0-release-announcement/index.html | 2 +- .../12/21/apache-flink-in-2017-year-in-review/index.html| 2 +- .../index.html | 2 +- content/2018/02/15/apache-flink-1.4.1-released/index.html | 2 +- .../index.html | 2 +- content/2018/03/08/apache-flink-1.4.2-released/index.html | 2 +- content/2018/03/15/apache-flink-1.3.3-released/index.html | 2 +- .../18/apache-flink-1.5.0-release-announcement/index.html | 2 +- content/2018/07/12/apache-flink-1.5.1-released/index.html | 2 +- content/2018/07/31/apache-flink-1.5.2-released/index.html | 2 +- .../09/apache-flink-1.6.0-release-announcement/index.html | 2 +- content/2018/08/21/apache-flink-1.5.3-released/index.html | 2 +- content/2018/09/20/apache-flink-1.5.4-released/index.html | 2 +- content/2018/09/20/apache-
(flink-web) 01/02: Add Kafka connector v3.0.2 release
This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 1880750919cf0fa6de58249aabc369b6b8448862 Author: Tzu-Li (Gordon) Tai AuthorDate: Fri Nov 24 09:05:08 2023 -0800 Add Kafka connector v3.0.2 release This closes #700. --- docs/data/flink_connectors.yml | 8 docs/data/release_archive.yml | 5 + 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/docs/data/flink_connectors.yml b/docs/data/flink_connectors.yml index 79ec6e71c..438689cbe 100644 --- a/docs/data/flink_connectors.yml +++ b/docs/data/flink_connectors.yml @@ -51,10 +51,10 @@ jdbc: compatibility: ["1.16.x", "1.17.x"] kafka: - name: "Apache Flink Kafka Connector 3.0.1" - source_release_url: "https://www.apache.org/dyn/closer.lua/flink/flink-connector-kafka-3.0.1/flink-connector-kafka-3.0.1-src.tgz"; - source_release_asc_url: "https://downloads.apache.org/flink/flink-connector-kafka-3.0.1/flink-connector-kafka-3.0.1-src.tgz.asc"; - source_release_sha512_url: "https://downloads.apache.org/flink/flink-connector-kafka-3.0.1/flink-connector-kafka-3.0.1-src.tgz.sha512"; + name: "Apache Flink Kafka Connector 3.0.2" + source_release_url: "https://www.apache.org/dyn/closer.lua/flink/flink-connector-kafka-3.0.2/flink-connector-kafka-3.0.2-src.tgz"; + source_release_asc_url: "https://downloads.apache.org/flink/flink-connector-kafka-3.0.2/flink-connector-kafka-3.0.2-src.tgz.asc"; + source_release_sha512_url: "https://downloads.apache.org/flink/flink-connector-kafka-3.0.2/flink-connector-kafka-3.0.2-src.tgz.sha512"; compatibility: ["1.17.x", "1.18.x"] opensearch: diff --git a/docs/data/release_archive.yml b/docs/data/release_archive.yml index e2a51874f..137d9ac49 100644 --- a/docs/data/release_archive.yml +++ b/docs/data/release_archive.yml @@ -535,6 +535,11 @@ release_archive: version: 4.2.0 release_date: 2023-11-30 filename: "aws" +- name: "Flink Kafka Connector" + connector: "kafka" + version: 3.0.2 + release_date: 2023-12-01 + filename: "kafka" flink_shaded: -
(flink-connector-kafka) annotated tag v3.0.2 updated (a5bb91a7 -> 81ae37f0)
This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a change to annotated tag v3.0.2 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git *** WARNING: tag v3.0.2 was modified! *** from a5bb91a7 (commit) to 81ae37f0 (tag) tagging a5bb91a7ac8ab38a427848960f979e20d0f28fc6 (commit) by Tzu-Li (Gordon) Tai on Fri Dec 1 16:48:58 2023 -0800 - Log - vv3.0.2 -BEGIN PGP SIGNATURE- iQIzBAABCAAdFiEEHB4jlNMZThlEYTSI8yCYbTXDPWoFAmVqfvoACgkQ8yCYbTXD PWos1A//SE+pr2iCyS80kbk6anmAkhX559XrJH+6H/N3hwktMvNI1gmAxfzpTpdG h7q6p5LF627hUuT1Vxmvo3XD5xRSOpF+49oY1m0W31QcO28HSkUw02ugWxTHaYv4 /4WyFJOwkJ0g3by+QR6fnrj0yCkVi9oA8/a2NVVePJl1Kg+XzLgNId6Rf6LfLdoz mfor3ZX5Vkeed5Sxl6T4Yxqu2hvIct4Sdq83M8zZdxggETsRaMaopI4dsw2W4S8v sNpskowgNkYi/L3200ihGJvQS55geSNlOzyKy81XzVZd2qCNTQDlwkoIpLwwPazK ysBjgffRRMxRzZBM96JjMKjt/MS8Z83kEryTRDvdP7ZeK35CQQqsSoLFIGDzIaSP 5dVd1b2YMpXsPc6HiOoUhgsaBArILaqb6h/Y3K9ajw8kVzeGa3baF6CWBzKpC+ZN B/X4eR7HxHJaOE+XIyBd9BjaKoH/emYBaz7R8LAIi5dMLyr678yHkIdkSuiy0a0U VRvbLrgOuazA987Iwc+vk2w3KRk3SCESEpwmq+DionkLwfKgpjBbTFvMF7Znrd23 cFJ2uX9BG7LdJDkO73fYRPLeIl9/HpobVU/bQvCDejW9QMjt2m/8qkTsnoU0AOnh T5J8Nkdd0O9E2edbQJiceN4naIJqYxIlU3Lm5Ig1x0e9twi2gNk= =jW0l -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
svn commit: r65793 - /dev/flink/flink-connector-kafka-3.0.2-rc1/ /release/flink/flink-connector-kafka-3.0.2/
Author: tzulitai Date: Sat Dec 2 00:47:33 2023 New Revision: 65793 Log: Release flink-connector-kafka 3.0.2 Added: release/flink/flink-connector-kafka-3.0.2/ - copied from r65792, dev/flink/flink-connector-kafka-3.0.2-rc1/ Removed: dev/flink/flink-connector-kafka-3.0.2-rc1/
(flink) 01/02: [FLINK-33638][table] Support variable-length data generation for variable-length data types
This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ac707cfeb24585ec4d68ee3dee22b64e42c454b2 Author: Yubin Li AuthorDate: Mon Nov 27 17:16:57 2023 +0800 [FLINK-33638][table] Support variable-length data generation for variable-length data types --- docs/content.zh/docs/connectors/table/datagen.md | 7 ++ docs/content/docs/connectors/table/datagen.md | 24 +- .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e | 4 +- .../functions/source/datagen/RandomGenerator.java | 6 ++ .../src/test/resources/sql/table.q | 1 + .../src/test/resources/sql/table.q | 1 + .../datagen/table/DataGenConnectorOptions.java | 9 +++ .../datagen/table/DataGenConnectorOptionsUtil.java | 1 + .../datagen/table/DataGenTableSourceFactory.java | 30 .../datagen/table/RandomGeneratorVisitor.java | 34 +++-- .../factories/DataGenTableSourceFactoryTest.java | 86 ++ 11 files changed, 196 insertions(+), 7 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/datagen.md b/docs/content.zh/docs/connectors/table/datagen.md index cca1ac3c36f..87dd494fa19 100644 --- a/docs/content.zh/docs/connectors/table/datagen.md +++ b/docs/content.zh/docs/connectors/table/datagen.md @@ -138,6 +138,13 @@ CREATE TABLE datagen ( Integer 随机生成器生成字符的长度,适用于 char、varchar、binary、varbinary、string。 + + fields.#.var-len + 可选 + false + Boolean + 是否生成变长数据,请注意只能用于变长类型(varchar、string、varbinary、bytes)。 + fields.#.start 可选 diff --git a/docs/content/docs/connectors/table/datagen.md b/docs/content/docs/connectors/table/datagen.md index c1774f9eb68..25d3b0efd81 100644 --- a/docs/content/docs/connectors/table/datagen.md +++ b/docs/content/docs/connectors/table/datagen.md @@ -39,7 +39,7 @@ Usage - By default, a DataGen table will create an unbounded number of rows with a random value for each column. -For variable sized types, char/varchar/binary/varbinary/string/array/map/multiset, the length can be specified. +For types, char/varchar/binary/varbinary/string/array/map/multiset, the length can be specified. Additionally, a total number of rows can be specified, resulting in a bounded table. There also exists a sequence generator, where users specify a sequence of start and end values. @@ -77,6 +77,21 @@ WITH ( LIKE Orders (EXCLUDING ALL) ``` +Further more, for variable sized types, varchar/string/varbinary/bytes, you can specify whether to enable variable-length data generation. + +```sql +CREATE TABLE Orders ( +order_number BIGINT, +priceDECIMAL(32,2), +buyerROW, +order_time TIMESTAMP(3), +seller VARCHAR(150) +) WITH ( + 'connector' = 'datagen', + 'fields.seller.var-len' = 'true' +) +``` + Types - @@ -283,6 +298,13 @@ Connector Options Integer Size or length of the collection for generating char/varchar/binary/varbinary/string/array/map/multiset types. + + fields.#.var-len + optional + false + Boolean + Whether to generate a variable-length data, please notice that it should only be used for variable-length types (varchar, string, varbinary, bytes). + fields.#.start optional diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e index 95c466ee545..5307b331fd3 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e @@ -317,6 +317,8 @@ Method calls method in (RandomGeneratorVisitor.java:126) Method calls method in (RandomGeneratorVisitor.java:126) Method calls method in (RandomGeneratorVisitor.java:141) +Method calls method in (RandomGeneratorVisitor.java:203) +Method calls method in (RandomGeneratorVisitor.java:172) Method calls method in (RandomGeneratorVisitor.java:305) Method calls method in (RandomGeneratorVisitor.java:306) Method calls method in (RandomGeneratorVisitor.java:262) @@ -832,4 +834,4 @@ Static Initializer ()> gets field in (PartitionTimeCommitTrigger.java:52) Static Initializer ()> calls constructor (org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (ProcTimeCommitTrigger.java:47) Static Initializer ()> gets field in (ProcTimeCommitTrigger.java:47) -Static Initializer ()> gets field in (ProcTimeCommitTrigger.java:47) \ No newline at end of file +Static Initializer ()> gets fie
(flink) branch master updated (5edc7d7b18e -> 1403febd30e)
This is an automated email from the ASF dual-hosted git repository. lincoln pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 5edc7d7b18e [FLINK-33470] Deleting JoinJsonPlanTest.java and JoinJsonPlanITCase.java new ac707cfeb24 [FLINK-33638][table] Support variable-length data generation for variable-length data types new 1403febd30e [FLINK-33638][docs-zh] Update the outdated Chinese datagen connector docs The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/content.zh/docs/connectors/table/datagen.md | 227 ++--- docs/content/docs/connectors/table/datagen.md | 38 +++- .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e | 4 +- .../functions/source/datagen/RandomGenerator.java | 6 + .../src/test/resources/sql/table.q | 1 + .../src/test/resources/sql/table.q | 1 + .../datagen/table/DataGenConnectorOptions.java | 9 + .../datagen/table/DataGenConnectorOptionsUtil.java | 1 + .../datagen/table/DataGenTableSourceFactory.java | 30 +++ .../datagen/table/RandomGeneratorVisitor.java | 34 ++- .../factories/DataGenTableSourceFactoryTest.java | 86 11 files changed, 392 insertions(+), 45 deletions(-)
(flink) 02/02: [FLINK-33638][docs-zh] Update the outdated Chinese datagen connector docs
This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 1403febd30e03cb38352695151d43e4ed3eb8ac1 Author: Yubin Li AuthorDate: Fri Dec 1 11:23:19 2023 +0800 [FLINK-33638][docs-zh] Update the outdated Chinese datagen connector docs --- docs/content.zh/docs/connectors/table/datagen.md | 220 +++ docs/content/docs/connectors/table/datagen.md| 14 +- 2 files changed, 196 insertions(+), 38 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/datagen.md b/docs/content.zh/docs/connectors/table/datagen.md index 87dd494fa19..210ccc088d2 100644 --- a/docs/content.zh/docs/connectors/table/datagen.md +++ b/docs/content.zh/docs/connectors/table/datagen.md @@ -29,51 +29,202 @@ under the License. {{< label "Scan Source: 有界" >}} {{< label "Scan Source: 无界" >}} -DataGen 连接器允许按数据生成规则进行读取。 +DataGen 连接器允许基于内存生成数据来创建表。 +在本地开发时,若不访问外部系统(如 Kafka),这会非常有用。 +可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)灵活地生成记录。 -DataGen 连接器可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)。 -这使您可以灵活地生成记录。 +DataGen 连接器是内置的,不需要额外的依赖项。 -DataGen 连接器是内置的。 +用法 +- -注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。 +默认情况下,DataGen 表将创建无限数量的行,每列都有一个随机值。 +对于 char、varchar、binary、varbinary、string、array、map 和 multiset 类型,可以指定长度。 +还可以指定总行数,从而生成有界表。 -怎么创建一个 DataGen 的表 - - -表的有界性:当表中字段的数据全部生成完成后,source 就结束了。 因此,表的有界性取决于字段的有界性。 - -每个列,都有两种生成数据的方法: +还支持序列生成器,您可以指定序列的起始和结束值。 +如果表中有任一列是序列类型,则该表将是有界的,并在第一个序列完成时结束。 -- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、binary、varbinary, string (类型)可以指定长度。它是无界的生成器。 - -- 序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。 +时间类型字段对应的值始终是本地机器当前系统时间。 ```sql -CREATE TABLE datagen ( - f_sequence INT, - f_random INT, - f_random_str STRING, - ts AS localtimestamp, - WATERMARK FOR ts AS ts +CREATE TABLE Orders ( +order_number BIGINT, +priceDECIMAL(32,2), +buyerROW, +order_time TIMESTAMP(3) ) WITH ( - 'connector' = 'datagen', + 'connector' = 'datagen' +) +``` - -- optional options -- +DataGen 连接器通常与 ``LIKE`` 子句结合使用,以模拟物理表。 - 'rows-per-second'='5', +```sql +CREATE TABLE Orders ( +order_number BIGINT, +priceDECIMAL(32,2), +buyerROW, +order_time TIMESTAMP(3) +) WITH (...) - 'fields.f_sequence.kind'='sequence', - 'fields.f_sequence.start'='1', - 'fields.f_sequence.end'='1000', +-- create a bounded mock table +CREATE TEMPORARY TABLE GenOrders +WITH ( +'connector' = 'datagen', +'number-of-rows' = '10' +) +LIKE Orders (EXCLUDING ALL) +``` - 'fields.f_random.min'='1', - 'fields.f_random.max'='1000', +此外,对于可变长度类型(varchar、string、varbinary 和 bytes),您可以指定是否生成可变长度的数据。 - 'fields.f_random_str.length'='10' +```sql +CREATE TABLE Orders ( +order_number BIGINT, +priceDECIMAL(32,2), +buyerROW, +order_time TIMESTAMP(3), +seller VARCHAR(150) +) WITH ( + 'connector' = 'datagen', + 'fields.seller.var-len' = 'true' ) ``` +字段类型 +- + + + + +Type +Supported Generators +Notes + + + + +BOOLEAN +random + + + +CHAR +random / sequence + + + +VARCHAR +random / sequence + + + +BINARY +random / sequence + + + +VARBINARY +random / sequence + + + +STRING +random / sequence + + + +DECIMAL +random / sequence + + + +TINYINT +random / sequence + + + +SMALLINT +random / sequence + + + +INT +random / sequence + + + +BIGINT +random / sequence + + + +FLOAT +random / sequence + + + +DOUBLE +random / sequence + + + +DATE +random +总是解析为本地机器的当前日期。 + + +TIME +random +总是解析为本地机器的当前时间。 + + +TIMESTAMP +random + +解析为相对于本地机器的当前时间戳向过去偏移的时间戳。偏移的最大值可以通过 'max-past' 选项指定。 + + + +TIMESTAMP_LTZ +random + +解析为相对于本地机器的当前时间戳向过去偏移的时间戳。偏移的最大值可以通过 'max-past' 选项指定。 + + + +INTERVAL YEAR TO MONTH +random + + + +INTERVAL DAY TO MONTH +random + + + +
(flink-connector-opensearch) branch dependabot/maven/flink-connector-opensearch/org.opensearch-opensearch-2.11.1 created (now f0c07d7)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/flink-connector-opensearch/org.opensearch-opensearch-2.11.1 in repository https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git at f0c07d7 Bump org.opensearch:opensearch in /flink-connector-opensearch No new revisions were added by this update.
(flink) 02/02: [FLINK-33470] Deleting JoinJsonPlanTest.java and JoinJsonPlanITCase.java
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 5edc7d7b18e88cc86e84d197202d8cbb40621864 Author: Jim Hughes AuthorDate: Tue Nov 7 17:14:09 2023 -0500 [FLINK-33470] Deleting JoinJsonPlanTest.java and JoinJsonPlanITCase.java Removing json test files as well. --- .../plan/nodes/exec/stream/JoinJsonPlanTest.java | 144 --- .../stream/jsonplan/JoinJsonPlanITCase.java| 168 .../JoinJsonPlanTest_jsonplan/testInnerJoin.out| 222 -- .../testInnerJoinWithEqualPk.out | 332 --- .../testInnerJoinWithPk.out| 450 - .../testLeftJoinNonEqui.out| 266 6 files changed, 1582 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java deleted file mode 100644 index dd2771ea7cf..000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** Test json serialization/deserialization for join. */ -class JoinJsonPlanTest extends TableTestBase { - -private StreamTableTestUtil util; -private TableEnvironment tEnv; - -@BeforeEach -void setup() { -util = streamTestUtil(TableConfig.getDefault()); -tEnv = util.getTableEnv(); - -String srcTableA = -"CREATE TABLE A (\n" -+ " a1 int,\n" -+ " a2 bigint,\n" -+ " a3 bigint\n" -+ ") with (\n" -+ " 'connector' = 'values',\n" -+ " 'bounded' = 'false')"; -String srcTableB = -"CREATE TABLE B (\n" -+ " b1 int,\n" -+ " b2 bigint,\n" -+ " b3 bigint\n" -+ ") with (\n" -+ " 'connector' = 'values',\n" -+ " 'bounded' = 'false')"; -String srcTableT = -"CREATE TABLE t (\n" -+ " a int,\n" -+ " b bigint,\n" -+ " c varchar\n" -+ ") with (\n" -+ " 'connector' = 'values',\n" -+ " 'bounded' = 'false')"; -String srcTableS = -"CREATE TABLE s (\n" -+ " x bigint,\n" -+ " y varchar,\n" -+ " z int\n" -+ ") with (\n" -+ " 'connector' = 'values',\n" -+ " 'bounded' = 'false')"; -tEnv.executeSql(srcTableA); -tEnv.executeSql(srcTableB); -tEnv.executeSql(srcTableT); -tEnv.executeSql(srcTableS); -} - -@Test -void testInnerJoin() { -String sinkTableDdl = -"CREATE TABLE MySink (\n" -+ " a1 int,\n" -+ " b1 int\n" -+ ") with (\n" -+ " 'connector' = 'values',\n" -+ " 'table-sink-class' = 'DEFAULT')"; -tEnv.executeSql(sinkTableDdl); -util.verifyJsonPlan("INSERT INTO MySink SELECT a1, b1 FROM A JOIN B ON a1 = b1"); -} - -@Test -void testInnerJoinWithEqualPk() { -String query1 = "SELECT SUM(a2) AS a2, a1 FR
(flink) branch master updated (87d7b4abd0a -> 5edc7d7b18e)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 87d7b4abd0a Revert "Revert "[FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729"" new e886dfdda6c [FLINK-33470] Implement restore tests for Join node new 5edc7d7b18e [FLINK-33470] Deleting JoinJsonPlanTest.java and JoinJsonPlanITCase.java The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/table/test/program/SinkTestStep.java | 24 +- .../factories/TestValuesRuntimeFunctions.java | 13 +- .../planner/factories/TestValuesTableFactory.java | 9 + .../plan/nodes/exec/stream/JoinJsonPlanTest.java | 144 --- .../plan/nodes/exec/testutils/JoinRestoreTest.java | 51 +++ .../nodes/exec/testutils/JoinTestPrograms.java | 450 + .../plan/nodes/exec/testutils/RestoreTestBase.java | 25 +- .../stream/jsonplan/JoinJsonPlanITCase.java| 168 .../anti-join/plan/anti-join.json} | 189 - .../anti-join/savepoint/_metadata | Bin 0 -> 9813 bytes .../cross-join/plan/cross-join.json} | 129 +++--- .../cross-join/savepoint/_metadata | Bin 0 -> 12164 bytes .../plan/inner-join-with-duplicate-key.json} | 156 +++ .../savepoint/_metadata| Bin 0 -> 12351 bytes .../plan/inner-join-with-equal-pk.json}| 150 --- .../inner-join-with-equal-pk/savepoint/_metadata | Bin 0 -> 18610 bytes .../plan/inner-join-with-non-equi-join.json} | 147 +++ .../savepoint/_metadata| Bin 0 -> 13383 bytes .../plan/inner-join-with-pk.json} | 198 - .../inner-join-with-pk/savepoint/_metadata | Bin 0 -> 21984 bytes .../join-with-filter/plan/join-with-filter.json| 305 ++ .../join-with-filter/savepoint/_metadata | Bin 0 -> 12147 bytes .../left-join/plan/left-join.json} | 152 +++ .../left-join/savepoint/_metadata | Bin 0 -> 13348 bytes .../plan/non-window-inner-join-with-null-cond.json | 354 .../savepoint/_metadata| Bin 0 -> 15297 bytes .../plan/non-window-inner-join.json| 354 .../non-window-inner-join/savepoint/_metadata | Bin 0 -> 15209 bytes .../outer-join/plan/outer-join.json} | 154 +++ .../outer-join/savepoint/_metadata | Bin 0 -> 13813 bytes .../right-join/plan/right-join.json} | 154 +++ .../right-join/savepoint/_metadata | Bin 0 -> 13096 bytes .../semi-join/plan/semi-join.json} | 141 --- .../semi-join/savepoint/_metadata | Bin 0 -> 11260 bytes 34 files changed, 2291 insertions(+), 1176 deletions(-) delete mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinRestoreTest.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinTestPrograms.java delete mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java copy flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out => restore-tests/stream-exec-join_1/anti-join/plan/anti-join.json} (54%) create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/anti-join/savepoint/_metadata copy flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out => restore-tests/stream-exec-join_1/cross-join/plan/cross-join.json} (60%) create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/cross-join/savepoint/_metadata copy flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out => restore-tests/stream-exec-join_1/inner-join-with-duplicate-key/plan/inner-join-with-duplicate-key.json} (57%) create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/inner-join-with-duplicate-key/savepoint/_metadata rename flink-table/flink-table-planner/src/test/resources/{org/apache
(flink) branch master updated: Revert "Revert "[FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729""
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 87d7b4abd0a Revert "Revert "[FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729"" 87d7b4abd0a is described below commit 87d7b4abd0a1ec92433603c83401cc8ad00fd500 Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com> AuthorDate: Fri Dec 1 16:37:21 2023 +0100 Revert "Revert "[FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729"" This reverts commit c7da98f23c3f86de3a3b12355fa7a1289200f93d. --- docs/content.zh/docs/deployment/filesystems/gcs.md | 9 ++--- docs/content/docs/deployment/filesystems/gcs.md| 9 ++--- .../apache/flink/fs/gs/GSFileSystemOptions.java| 17 ++ .../org/apache/flink/fs/gs/utils/BlobUtils.java| 21 +++- .../flink/fs/gs/writer/GSCommitRecoverable.java| 7 ++-- .../fs/gs/writer/GSRecoverableWriterCommitter.java | 29 + .../apache/flink/fs/gs/utils/BlobUtilsTest.java| 15 + .../fs/gs/writer/GSCommitRecoverableTest.java | 38 ++ 8 files changed, 127 insertions(+), 18 deletions(-) diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md b/docs/content.zh/docs/deployment/filesystems/gcs.md index 7edf78b2e61..f80e5b3af4a 100644 --- a/docs/content.zh/docs/deployment/filesystems/gcs.md +++ b/docs/content.zh/docs/deployment/filesystems/gcs.md @@ -76,10 +76,11 @@ You can also set `gcs-connector` options directly in the Hadoop `core-site.xml` `flink-gs-fs-hadoop` can also be configured by setting the following options in `flink-conf.yaml`: -| Key | Description [...] -|---|--- [...] -| gs.writer.temporary.bucket.name | Set this property to choose a bucket to hold temporary blobs for in-progress writes via `RecoverableWriter`. If this property is not set, temporary blobs will be written to same bucket as the final file being written. In either case, temporary blobs are written with the prefix `.inprogress/`. It is recommended to choose a separate bucket in order to [assign it a TTL](https://cloud.google.com/storage/docs/lifecycle), to provide a mec [...] -| gs.writer.chunk.size | Set this property to [set the chunk size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_) for writes via `RecoverableWriter`. If not set, a Google-determined default chunk size will be used. [...] +| Key | Description [...] +|-|- [...] +| gs.writer.temporary.bucket.name | Set this property to choose a bucket to hold temporary blobs for in-progress writes via `R
(flink) branch master updated: Revert "[FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729"
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c7da98f23c3 Revert "[FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729" c7da98f23c3 is described below commit c7da98f23c3f86de3a3b12355fa7a1289200f93d Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com> AuthorDate: Fri Dec 1 16:00:36 2023 +0100 Revert "[FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729" This reverts commit dc1db12137ecad921ae90969d7bfbf1ee7d3d2ef. --- docs/content.zh/docs/deployment/filesystems/gcs.md | 9 +++-- docs/content/docs/deployment/filesystems/gcs.md| 9 +++-- .../apache/flink/fs/gs/GSFileSystemOptions.java| 17 -- .../org/apache/flink/fs/gs/utils/BlobUtils.java| 21 +--- .../flink/fs/gs/writer/GSCommitRecoverable.java| 7 ++-- .../fs/gs/writer/GSRecoverableWriterCommitter.java | 29 - .../apache/flink/fs/gs/utils/BlobUtilsTest.java| 15 - .../fs/gs/writer/GSCommitRecoverableTest.java | 38 -- 8 files changed, 18 insertions(+), 127 deletions(-) diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md b/docs/content.zh/docs/deployment/filesystems/gcs.md index f80e5b3af4a..7edf78b2e61 100644 --- a/docs/content.zh/docs/deployment/filesystems/gcs.md +++ b/docs/content.zh/docs/deployment/filesystems/gcs.md @@ -76,11 +76,10 @@ You can also set `gcs-connector` options directly in the Hadoop `core-site.xml` `flink-gs-fs-hadoop` can also be configured by setting the following options in `flink-conf.yaml`: -| Key | Description [...] -|-|- [...] -| gs.writer.temporary.bucket.name | Set this property to choose a bucket to hold temporary blobs for in-progress writes via `RecoverableWriter`. If this property is not set, temporary blobs will be written to same bucket as the final file being written. In either case, temporary blobs are written with the prefix `.inprogress/`. It is recommended to choose a separate bucket in order to [assign it a TTL](https://cloud.google.com/storage/docs/lifecycle), to provide a mechanism to [...] -| gs.writer.chunk.size| Set this property to [set the chunk size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_) for writes via `RecoverableWriter`. If not set, a Google-determined default chunk size will be used. [...] -| gs.filesink.entropy.enabled | Set this property to improve performance due to hotspotting issues on GCS. This option defines whether to enable entropy injection in filesink gcs path. If this is enabled, entropy in the form of temporary object id will be injected in beginning of the gcs path of the temporary objects. The final object path remains unchanged. [...] +| Key | Description [...] +|---|
(flink) branch master updated: [FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new dc1db12137e [FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729 dc1db12137e is described below commit dc1db12137ecad921ae90969d7bfbf1ee7d3d2ef Author: Cheena Budhiraja <110803195+bche...@users.noreply.github.com> AuthorDate: Fri Dec 1 18:52:43 2023 +0530 [FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729 --- docs/content.zh/docs/deployment/filesystems/gcs.md | 9 ++--- docs/content/docs/deployment/filesystems/gcs.md| 9 ++--- .../apache/flink/fs/gs/GSFileSystemOptions.java| 17 ++ .../org/apache/flink/fs/gs/utils/BlobUtils.java| 21 +++- .../flink/fs/gs/writer/GSCommitRecoverable.java| 7 ++-- .../fs/gs/writer/GSRecoverableWriterCommitter.java | 29 + .../apache/flink/fs/gs/utils/BlobUtilsTest.java| 15 + .../fs/gs/writer/GSCommitRecoverableTest.java | 38 ++ 8 files changed, 127 insertions(+), 18 deletions(-) diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md b/docs/content.zh/docs/deployment/filesystems/gcs.md index 7edf78b2e61..f80e5b3af4a 100644 --- a/docs/content.zh/docs/deployment/filesystems/gcs.md +++ b/docs/content.zh/docs/deployment/filesystems/gcs.md @@ -76,10 +76,11 @@ You can also set `gcs-connector` options directly in the Hadoop `core-site.xml` `flink-gs-fs-hadoop` can also be configured by setting the following options in `flink-conf.yaml`: -| Key | Description [...] -|---|--- [...] -| gs.writer.temporary.bucket.name | Set this property to choose a bucket to hold temporary blobs for in-progress writes via `RecoverableWriter`. If this property is not set, temporary blobs will be written to same bucket as the final file being written. In either case, temporary blobs are written with the prefix `.inprogress/`. It is recommended to choose a separate bucket in order to [assign it a TTL](https://cloud.google.com/storage/docs/lifecycle), to provide a mec [...] -| gs.writer.chunk.size | Set this property to [set the chunk size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_) for writes via `RecoverableWriter`. If not set, a Google-determined default chunk size will be used. [...] +| Key | Description [...] +|-|- [...] +| gs.writer.temporary.bucket.name | Set this property to choose a bucket to hold temporary blobs for in-progress writes via `RecoverableWriter`. If this property is not set, temporary blobs will be written to same bucket as the final
(flink-kubernetes-operator) branch main updated: [FLINK-33710] Prevent NOOP spec updates from the autoscaler parallelism override map (#720)
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 158cbe29 [FLINK-33710] Prevent NOOP spec updates from the autoscaler parallelism override map (#720) 158cbe29 is described below commit 158cbe29169cbfb7fa7ad676fb0273fd7ef6d25e Author: Maximilian Michels AuthorDate: Fri Dec 1 11:56:42 2023 +0100 [FLINK-33710] Prevent NOOP spec updates from the autoscaler parallelism override map (#720) --- .../autoscaler/KubernetesScalingRealizer.java | 4 +++ .../autoscaler/KubernetesScalingRealizerTest.java} | 42 +- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java index ca3f4ef2..6bb7a949 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.PipelineOptions; import io.javaoperatorsdk.operator.processing.event.ResourceID; import java.util.Map; +import java.util.TreeMap; /** The Kubernetes implementation for applying parallelism overrides. */ public class KubernetesScalingRealizer @@ -32,6 +33,9 @@ public class KubernetesScalingRealizer @Override public void realize( KubernetesJobAutoScalerContext context, Map parallelismOverrides) { +// Make sure the keys are sorted via TreeMap to prevent changing the spec when none of the +// entries changed but the key order is different! +parallelismOverrides = new TreeMap<>(parallelismOverrides); context.getResource() .getSpec() .getFlinkConfiguration() diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java similarity index 50% copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java copy to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java index ca3f4ef2..dda7ba0a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java @@ -17,26 +17,36 @@ package org.apache.flink.kubernetes.operator.autoscaler; -import org.apache.flink.autoscaler.realizer.ScalingRealizer; -import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.PipelineOptions; -import io.javaoperatorsdk.operator.processing.event.ResourceID; +import org.junit.jupiter.api.Test; +import java.util.LinkedHashMap; import java.util.Map; -/** The Kubernetes implementation for applying parallelism overrides. */ -public class KubernetesScalingRealizer -implements ScalingRealizer { - -@Override -public void realize( -KubernetesJobAutoScalerContext context, Map parallelismOverrides) { -context.getResource() -.getSpec() -.getFlinkConfiguration() -.put( -PipelineOptions.PARALLELISM_OVERRIDES.key(), -ConfigurationUtils.convertValue(parallelismOverrides, String.class)); +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for KubernetesScalingRealizer. */ +public class KubernetesScalingRealizerTest { + +@Test +public void testAutoscalerOverridesVertexIdsAreSorted() { + +KubernetesJobAutoScalerContext ctx = +TestingKubernetesAutoscalerUtils.createContext("test", null); + +// Create map which returns keys unsorted +Map overrides = new LinkedHashMap<>(); +overrides.put("b", "2"); +overrides.put("a", "1"); + +new KubernetesScalingRealizer().realize(ctx, overrides); + +assertThat( +ctx.getResource() +.getSpec() +.getFlinkConfiguration() + .get(PipelineOptions.PARALLELISM_OVERRIDES.key())) +.isEqualTo("a:1,b:2"); } }
(flink-kubernetes-operator) branch main updated: [FLINK-33522] Be ware of SerializedThrowable when checking for StopWithSavepointStoppingException (#716)
This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 51a91049 [FLINK-33522] Be ware of SerializedThrowable when checking for StopWithSavepointStoppingException (#716) 51a91049 is described below commit 51a91049b5f17f8a0b21e11feceb4410a97c50c1 Author: Maximilian Michels AuthorDate: Fri Dec 1 10:47:40 2023 +0100 [FLINK-33522] Be ware of SerializedThrowable when checking for StopWithSavepointStoppingException (#716) Turns out that the previous detection code in #706 may not always fire correctly due to an encapsulated serialized throwable. This minor change fixes that. --- .../flink/kubernetes/operator/service/AbstractFlinkService.java | 6 -- .../flink/kubernetes/operator/service/AbstractFlinkServiceTest.java | 5 +++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 0821b439..3c4fe4aa 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -331,8 +331,10 @@ public abstract class AbstractFlinkService implements FlinkService { exception); } catch (Exception e) { var stopWithSavepointException = -ExceptionUtils.findThrowable( -e, StopWithSavepointStoppingException.class); + ExceptionUtils.findThrowableSerializedAware( +e, + StopWithSavepointStoppingException.class, +getClass().getClassLoader()); if (stopWithSavepointException.isPresent()) { // Handle edge case where the savepoint completes but the job fails // right afterward. diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java index c12caf21..a108b8bb 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java @@ -651,8 +651,9 @@ public class AbstractFlinkServiceTest { if (failAfterSavepointCompletes) { stopWithSavepointFuture.completeExceptionally( new CompletionException( -new StopWithSavepointStoppingException( -savepointPath, jobID))); +new SerializedThrowable( +new StopWithSavepointStoppingException( +savepointPath, jobID; } else { stopWithSavepointFuture.complete( new Tuple3<>(id, formatType, savepointDir));