(flink-training) branch master updated: [FLINK-33711] Fix numbers of field of the taxi event / Fix broken link (#66)

2023-12-01 Thread danderson
This is an automated email from the ASF dual-hosted git repository.

danderson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git


The following commit(s) were added to refs/heads/master by this push:
 new dc81157  [FLINK-33711] Fix numbers of field  of the taxi event / Fix 
broken link (#66)
dc81157 is described below

commit dc81157f826649b6084d968689f623ce41a1b46c
Author: Leona Yoda 
AuthorDate: Sat Dec 2 11:55:23 2023 +0900

[FLINK-33711] Fix numbers of field  of the taxi event / Fix broken link 
(#66)

* fix number of fields of the taxi ride event

* fix broken link (use-taxi-data-streams)

-

Co-authored-by: Leona Yoda 
---
 README.md| 6 +++---
 README_zh.md | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/README.md b/README.md
index b1a65cb..9ed2a0c 100644
--- a/README.md
+++ b/README.md
@@ -31,7 +31,7 @@ Exercises that accompany the training content in the 
documentation.
 1. [Clone and build the flink-training 
project](#clone-and-build-the-flink-training-project)
 1. [Import the flink-training project into your 
IDE](#import-the-flink-training-project-into-your-ide)
 
-[**Use the taxi data streams**](#using-the-taxi-data-streams)
+[**Use the taxi data streams**](#use-the-taxi-data-streams)
 
 1. [Schema of taxi ride events](#schema-of-taxi-ride-events)
 1. [Schema of taxi fare events](#schema-of-taxi-fare-events)
@@ -148,7 +148,7 @@ Our taxi data set contains information about individual 
taxi rides in New York C
 
 Each ride is represented by two events: a trip start, and a trip end.
 
-Each event consists of eleven fields:
+Each event consists of ten fields:
 
 ```
 rideId : Long  // a unique id for each ride
@@ -189,7 +189,7 @@ We assume you have set up your development environment 
according to our [setup g
 ### Learn about the data
 
 The initial set of exercises are all based on data streams of events about 
taxi rides and taxi fares. These streams are produced by source functions which 
reads data from input files.
-Read the [instructions](#using-the-taxi-data-streams) to learn how to use them.
+Read the [instructions](#use-the-taxi-data-streams) to learn how to use them.
 
 ### Run and debug Flink programs in your IDE
 
diff --git a/README_zh.md b/README_zh.md
index 2af9bba..1914e08 100644
--- a/README_zh.md
+++ b/README_zh.md
@@ -156,7 +156,7 @@ org.gradle.project.enable_scala = true
 
 每次车程都由两个事件表示:行程开始(trip start)和行程结束(trip end)。
 
-每个事件都由十一个字段组成:
+每个事件都由十个字段组成:
 
 ```
 rideId : Long  // 每次车程的唯一id



(flink) branch master updated: [FLINK-33698][datastream] Fix the backoff calculation of ExponentialBackoffDelayRetryStrategy in AsyncRetryStrategies

2023-12-01 Thread lincoln
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 5da214c963c [FLINK-33698][datastream] Fix the backoff calculation of 
ExponentialBackoffDelayRetryStrategy in AsyncRetryStrategies
5da214c963c is described below

commit 5da214c963c219c8b3da727ffde5d6995b3770b8
Author: xiangyu0xf 
AuthorDate: Sat Dec 2 09:00:24 2023 +0800

[FLINK-33698][datastream] Fix the backoff calculation of 
ExponentialBackoffDelayRetryStrategy in AsyncRetryStrategies

This closes #23830
---
 .../util/retryable/AsyncRetryStrategies.java   | 14 --
 .../util/retryable/AsyncRetryStrategiesTest.java   | 56 ++
 2 files changed, 67 insertions(+), 3 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java
index 519945d5559..580270d203f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java
@@ -28,7 +28,12 @@ import java.util.Collection;
 import java.util.Optional;
 import java.util.function.Predicate;
 
-/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+/**
+ * Utility class to create concrete {@link AsyncRetryStrategy}.
+ *
+ * NOTICE: For performance reasons, this utility's {@link 
AsyncRetryStrategy}
+ * implementation assumes the attempt always start from 1 and will only 
increase by 1 each time.
+ */
 public class AsyncRetryStrategies {
 public static final NoRetryStrategy NO_RETRY_STRATEGY = new 
NoRetryStrategy();
 
@@ -151,10 +156,10 @@ public class AsyncRetryStrategies {
 private static final long serialVersionUID = 1L;
 private final int maxAttempts;
 private final long maxRetryDelay;
+private final long initialDelay;
 private final double multiplier;
 private final Predicate> resultPredicate;
 private final Predicate exceptionPredicate;
-
 private long lastRetryDelay;
 
 public ExponentialBackoffDelayRetryStrategy(
@@ -169,6 +174,7 @@ public class AsyncRetryStrategies {
 this.multiplier = multiplier;
 this.resultPredicate = resultPredicate;
 this.exceptionPredicate = exceptionPredicate;
+this.initialDelay = initialDelay;
 this.lastRetryDelay = initialDelay;
 }
 
@@ -180,9 +186,11 @@ public class AsyncRetryStrategies {
 @Override
 public long getBackoffTimeMillis(int currentAttempts) {
 if (currentAttempts <= 1) {
-// equivalent to initial delay
+// reset to initialDelay
+this.lastRetryDelay = initialDelay;
 return lastRetryDelay;
 }
+
 long backoff = Math.min((long) (lastRetryDelay * multiplier), 
maxRetryDelay);
 this.lastRetryDelay = backoff;
 return backoff;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategiesTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategiesTest.java
new file mode 100644
index 000..606119912ce
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategiesTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Tests for the {@link AsyncRetryStrategies}. */
+public class AsyncRetryStrategiesTest extends TestLogger {
+
+@Test
+public void testExponentialBackoffDelayRetryStrategy() {
+int maxAttempts = 10;
+long initialDelay 

(flink-web) branch asf-site updated (47d2bfc88 -> 6dc1ab9da)

2023-12-01 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


from 47d2bfc88 Rebuild website
 new 188075091 Add Kafka connector v3.0.2 release
 new 6dc1ab9da Rebuild website

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 content/2014/08/26/apache-flink-0.6-available/index.html|  2 +-
 content/2014/09/26/apache-flink-0.6.1-available/index.html  |  2 +-
 content/2014/10/03/upcoming-events/index.html   |  2 +-
 content/2014/11/04/apache-flink-0.7.0-available/index.html  |  2 +-
 content/2014/11/18/hadoop-compatibility-in-flink/index.html |  2 +-
 .../01/06/december-2014-in-the-flink-community/index.html   |  2 +-
 content/2015/01/21/apache-flink-0.8.0-available/index.html  |  2 +-
 .../02/04/january-2015-in-the-flink-community/index.html|  2 +-
 content/2015/02/09/introducing-flink-streaming/index.html   |  2 +-
 .../03/02/february-2015-in-the-flink-community/index.html   |  2 +-
 .../03/13/peeking-into-apache-flinks-engine-room/index.html |  2 +-
 .../2015/04/07/march-2015-in-the-flink-community/index.html |  2 +-
 .../index.html  |  2 +-
 content/2015/05/11/juggling-with-bits-and-bytes/index.html  |  2 +-
 .../2015/05/14/april-2015-in-the-flink-community/index.html |  2 +-
 content/2015/06/24/announcing-apache-flink-0.9.0/index.html |  2 +-
 .../index.html  |  2 +-
 content/2015/09/01/apache-flink-0.9.1-available/index.html  |  2 +-
 content/2015/09/03/announcing-flink-forward-2015/index.html |  2 +-
 .../index.html  |  2 +-
 .../2015/11/16/announcing-apache-flink-0.10.0/index.html|  2 +-
 content/2015/11/27/flink-0.10.1-released/index.html |  2 +-
 .../introducing-stream-windows-in-apache-flink/index.html   |  2 +-
 .../index.html  |  2 +-
 .../index.html  |  2 +-
 content/2016/02/11/flink-0.10.2-released/index.html |  2 +-
 content/2016/03/08/announcing-apache-flink-1.0.0/index.html |  2 +-
 content/2016/04/06/flink-1.0.1-released/index.html  |  2 +-
 .../index.html  |  2 +-
 .../index.html  |  2 +-
 content/2016/04/22/flink-1.0.2-released/index.html  |  2 +-
 content/2016/05/11/flink-1.0.3-released/index.html  |  2 +-
 .../index.html  |  2 +-
 content/2016/08/04/announcing-apache-flink-1.1.0/index.html |  2 +-
 content/2016/08/04/flink-1.1.1-released/index.html  |  2 +-
 .../index.html  |  2 +-
 content/2016/09/05/apache-flink-1.1.2-released/index.html   |  2 +-
 content/2016/10/12/apache-flink-1.1.3-released/index.html   |  2 +-
 .../12/19/apache-flink-in-2016-year-in-review/index.html|  2 +-
 content/2016/12/21/apache-flink-1.1.4-released/index.html   |  2 +-
 content/2017/02/06/announcing-apache-flink-1.2.0/index.html |  2 +-
 content/2017/03/23/apache-flink-1.1.5-released/index.html   |  2 +-
 .../index.html  |  2 +-
 .../03/30/continuous-queries-on-dynamic-tables/index.html   |  2 +-
 content/2017/04/26/apache-flink-1.2.1-released/index.html   |  2 +-
 .../introducing-docker-images-for-apache-flink/index.html   |  2 +-
 .../01/apache-flink-1.3.0-release-announcement/index.html   |  2 +-
 content/2017/06/23/apache-flink-1.3.1-released/index.html   |  2 +-
 .../index.html  |  2 +-
 content/2017/08/05/apache-flink-1.3.2-released/index.html   |  2 +-
 .../index.html  |  2 +-
 .../12/apache-flink-1.4.0-release-announcement/index.html   |  2 +-
 .../12/21/apache-flink-in-2017-year-in-review/index.html|  2 +-
 .../index.html  |  2 +-
 content/2018/02/15/apache-flink-1.4.1-released/index.html   |  2 +-
 .../index.html  |  2 +-
 content/2018/03/08/apache-flink-1.4.2-released/index.html   |  2 +-
 content/2018/03/15/apache-flink-1.3.3-released/index.html   |  2 +-
 .../18/apache-flink-1.5.0-release-announcement/index.html   |  2 +-
 content/2018/07/12/apache-flink-1.5.1-released/index.html   |  2 +-
 content/2018/07/31/apache-flink-1.5.2-released/index.html   |  2 +-
 .../09/apache-flink-1.6.0-release-announcement/index.html   |  2 +-
 content/2018/08/21/apache-flink-1.5.3-released/index.html   |  2 +-
 content/2018/09/20/apache-flink-1.5.4-released/index.html   |  2 +-
 content/2018/09/20/apache-

(flink-web) 01/02: Add Kafka connector v3.0.2 release

2023-12-01 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 1880750919cf0fa6de58249aabc369b6b8448862
Author: Tzu-Li (Gordon) Tai 
AuthorDate: Fri Nov 24 09:05:08 2023 -0800

Add Kafka connector v3.0.2 release

This closes #700.
---
 docs/data/flink_connectors.yml | 8 
 docs/data/release_archive.yml  | 5 +
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/docs/data/flink_connectors.yml b/docs/data/flink_connectors.yml
index 79ec6e71c..438689cbe 100644
--- a/docs/data/flink_connectors.yml
+++ b/docs/data/flink_connectors.yml
@@ -51,10 +51,10 @@ jdbc:
   compatibility: ["1.16.x", "1.17.x"]
 
 kafka:
-  name: "Apache Flink Kafka Connector 3.0.1"
-  source_release_url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-connector-kafka-3.0.1/flink-connector-kafka-3.0.1-src.tgz";
-  source_release_asc_url: 
"https://downloads.apache.org/flink/flink-connector-kafka-3.0.1/flink-connector-kafka-3.0.1-src.tgz.asc";
-  source_release_sha512_url: 
"https://downloads.apache.org/flink/flink-connector-kafka-3.0.1/flink-connector-kafka-3.0.1-src.tgz.sha512";
+  name: "Apache Flink Kafka Connector 3.0.2"
+  source_release_url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-connector-kafka-3.0.2/flink-connector-kafka-3.0.2-src.tgz";
+  source_release_asc_url: 
"https://downloads.apache.org/flink/flink-connector-kafka-3.0.2/flink-connector-kafka-3.0.2-src.tgz.asc";
+  source_release_sha512_url: 
"https://downloads.apache.org/flink/flink-connector-kafka-3.0.2/flink-connector-kafka-3.0.2-src.tgz.sha512";
   compatibility: ["1.17.x", "1.18.x"]
 
 opensearch:
diff --git a/docs/data/release_archive.yml b/docs/data/release_archive.yml
index e2a51874f..137d9ac49 100644
--- a/docs/data/release_archive.yml
+++ b/docs/data/release_archive.yml
@@ -535,6 +535,11 @@ release_archive:
   version: 4.2.0
   release_date: 2023-11-30
   filename: "aws"
+- name: "Flink Kafka Connector"
+  connector: "kafka"
+  version: 3.0.2
+  release_date: 2023-12-01
+  filename: "kafka"
 
   flink_shaded:
 -



(flink-connector-kafka) annotated tag v3.0.2 updated (a5bb91a7 -> 81ae37f0)

2023-12-01 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to annotated tag v3.0.2
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


*** WARNING: tag v3.0.2 was modified! ***

from a5bb91a7 (commit)
  to 81ae37f0 (tag)
 tagging a5bb91a7ac8ab38a427848960f979e20d0f28fc6 (commit)
  by Tzu-Li (Gordon) Tai
  on Fri Dec 1 16:48:58 2023 -0800

- Log -
vv3.0.2
-BEGIN PGP SIGNATURE-

iQIzBAABCAAdFiEEHB4jlNMZThlEYTSI8yCYbTXDPWoFAmVqfvoACgkQ8yCYbTXD
PWos1A//SE+pr2iCyS80kbk6anmAkhX559XrJH+6H/N3hwktMvNI1gmAxfzpTpdG
h7q6p5LF627hUuT1Vxmvo3XD5xRSOpF+49oY1m0W31QcO28HSkUw02ugWxTHaYv4
/4WyFJOwkJ0g3by+QR6fnrj0yCkVi9oA8/a2NVVePJl1Kg+XzLgNId6Rf6LfLdoz
mfor3ZX5Vkeed5Sxl6T4Yxqu2hvIct4Sdq83M8zZdxggETsRaMaopI4dsw2W4S8v
sNpskowgNkYi/L3200ihGJvQS55geSNlOzyKy81XzVZd2qCNTQDlwkoIpLwwPazK
ysBjgffRRMxRzZBM96JjMKjt/MS8Z83kEryTRDvdP7ZeK35CQQqsSoLFIGDzIaSP
5dVd1b2YMpXsPc6HiOoUhgsaBArILaqb6h/Y3K9ajw8kVzeGa3baF6CWBzKpC+ZN
B/X4eR7HxHJaOE+XIyBd9BjaKoH/emYBaz7R8LAIi5dMLyr678yHkIdkSuiy0a0U
VRvbLrgOuazA987Iwc+vk2w3KRk3SCESEpwmq+DionkLwfKgpjBbTFvMF7Znrd23
cFJ2uX9BG7LdJDkO73fYRPLeIl9/HpobVU/bQvCDejW9QMjt2m/8qkTsnoU0AOnh
T5J8Nkdd0O9E2edbQJiceN4naIJqYxIlU3Lm5Ig1x0e9twi2gNk=
=jW0l
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



svn commit: r65793 - /dev/flink/flink-connector-kafka-3.0.2-rc1/ /release/flink/flink-connector-kafka-3.0.2/

2023-12-01 Thread tzulitai
Author: tzulitai
Date: Sat Dec  2 00:47:33 2023
New Revision: 65793

Log:
Release flink-connector-kafka 3.0.2

Added:
release/flink/flink-connector-kafka-3.0.2/
  - copied from r65792, dev/flink/flink-connector-kafka-3.0.2-rc1/
Removed:
dev/flink/flink-connector-kafka-3.0.2-rc1/



(flink) 01/02: [FLINK-33638][table] Support variable-length data generation for variable-length data types

2023-12-01 Thread lincoln
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac707cfeb24585ec4d68ee3dee22b64e42c454b2
Author: Yubin Li 
AuthorDate: Mon Nov 27 17:16:57 2023 +0800

[FLINK-33638][table] Support variable-length data generation for 
variable-length data types
---
 docs/content.zh/docs/connectors/table/datagen.md   |  7 ++
 docs/content/docs/connectors/table/datagen.md  | 24 +-
 .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e   |  4 +-
 .../functions/source/datagen/RandomGenerator.java  |  6 ++
 .../src/test/resources/sql/table.q |  1 +
 .../src/test/resources/sql/table.q |  1 +
 .../datagen/table/DataGenConnectorOptions.java |  9 +++
 .../datagen/table/DataGenConnectorOptionsUtil.java |  1 +
 .../datagen/table/DataGenTableSourceFactory.java   | 30 
 .../datagen/table/RandomGeneratorVisitor.java  | 34 +++--
 .../factories/DataGenTableSourceFactoryTest.java   | 86 ++
 11 files changed, 196 insertions(+), 7 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/datagen.md 
b/docs/content.zh/docs/connectors/table/datagen.md
index cca1ac3c36f..87dd494fa19 100644
--- a/docs/content.zh/docs/connectors/table/datagen.md
+++ b/docs/content.zh/docs/connectors/table/datagen.md
@@ -138,6 +138,13 @@ CREATE TABLE datagen (
   Integer
   随机生成器生成字符的长度,适用于 char、varchar、binary、varbinary、string。
 
+
+  fields.#.var-len
+  可选
+  false
+  Boolean
+  是否生成变长数据,请注意只能用于变长类型(varchar、string、varbinary、bytes)。
+
 
   fields.#.start
   可选
diff --git a/docs/content/docs/connectors/table/datagen.md 
b/docs/content/docs/connectors/table/datagen.md
index c1774f9eb68..25d3b0efd81 100644
--- a/docs/content/docs/connectors/table/datagen.md
+++ b/docs/content/docs/connectors/table/datagen.md
@@ -39,7 +39,7 @@ Usage
 -
 
 By default, a DataGen table will create an unbounded number of rows with a 
random value for each column.
-For variable sized types, 
char/varchar/binary/varbinary/string/array/map/multiset, the length can be 
specified.
+For types, char/varchar/binary/varbinary/string/array/map/multiset, the length 
can be specified.
 Additionally, a total number of rows can be specified, resulting in a bounded 
table.
 
 There also exists a sequence generator, where users specify a sequence of 
start and end values.
@@ -77,6 +77,21 @@ WITH (
 LIKE Orders (EXCLUDING ALL)
 ```
 
+Further more, for variable sized types, varchar/string/varbinary/bytes, you 
can specify whether to enable variable-length data generation.
+
+```sql
+CREATE TABLE Orders (
+order_number BIGINT,
+priceDECIMAL(32,2),
+buyerROW,
+order_time   TIMESTAMP(3),
+seller   VARCHAR(150)
+) WITH (
+  'connector' = 'datagen',
+  'fields.seller.var-len' = 'true'
+)
+```
+
 Types
 -
 
@@ -283,6 +298,13 @@ Connector Options
   Integer
   Size or length of the collection for generating 
char/varchar/binary/varbinary/string/array/map/multiset types.
 
+
+  fields.#.var-len
+  optional
+  false
+  Boolean
+  Whether to generate a variable-length data, please notice that it 
should only be used for variable-length types (varchar, string, varbinary, 
bytes).
+
 
   fields.#.start
   optional
diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
index 95c466ee545..5307b331fd3 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
@@ -317,6 +317,8 @@ Method 

 calls method 

 in (RandomGeneratorVisitor.java:126)
 Method 

 calls method 

 in (RandomGeneratorVisitor.java:126)
 Method 

 calls method 

 in (RandomGeneratorVisitor.java:141)
+Method 

 calls method 

 in (RandomGeneratorVisitor.java:203)
+Method 

 calls method 

 in (RandomGeneratorVisitor.java:172)
 Method 

 calls method 
 in (RandomGeneratorVisitor.java:305)
 Method 

 calls method 

 in (RandomGeneratorVisitor.java:306)
 Method 

 calls method 
 in (RandomGeneratorVisitor.java:262)
@@ -832,4 +834,4 @@ Static Initializer 
()>
 gets field 
 in 
(PartitionTimeCommitTrigger.java:52)
 Static Initializer 
()> 
calls constructor 
(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(ProcTimeCommitTrigger.java:47)
 Static Initializer 
()> 
gets field  
in (ProcTimeCommitTrigger.java:47)
-Static Initializer 
()> 
gets field 
 in 
(ProcTimeCommitTrigger.java:47)
\ No newline at end of file
+Static Initializer 
()> 
gets fie

(flink) branch master updated (5edc7d7b18e -> 1403febd30e)

2023-12-01 Thread lincoln
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 5edc7d7b18e [FLINK-33470] Deleting JoinJsonPlanTest.java and 
JoinJsonPlanITCase.java
 new ac707cfeb24 [FLINK-33638][table] Support variable-length data 
generation for variable-length data types
 new 1403febd30e [FLINK-33638][docs-zh] Update the outdated Chinese datagen 
connector docs

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content.zh/docs/connectors/table/datagen.md   | 227 ++---
 docs/content/docs/connectors/table/datagen.md  |  38 +++-
 .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e   |   4 +-
 .../functions/source/datagen/RandomGenerator.java  |   6 +
 .../src/test/resources/sql/table.q |   1 +
 .../src/test/resources/sql/table.q |   1 +
 .../datagen/table/DataGenConnectorOptions.java |   9 +
 .../datagen/table/DataGenConnectorOptionsUtil.java |   1 +
 .../datagen/table/DataGenTableSourceFactory.java   |  30 +++
 .../datagen/table/RandomGeneratorVisitor.java  |  34 ++-
 .../factories/DataGenTableSourceFactoryTest.java   |  86 
 11 files changed, 392 insertions(+), 45 deletions(-)



(flink) 02/02: [FLINK-33638][docs-zh] Update the outdated Chinese datagen connector docs

2023-12-01 Thread lincoln
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1403febd30e03cb38352695151d43e4ed3eb8ac1
Author: Yubin Li 
AuthorDate: Fri Dec 1 11:23:19 2023 +0800

[FLINK-33638][docs-zh] Update the outdated Chinese datagen connector docs
---
 docs/content.zh/docs/connectors/table/datagen.md | 220 +++
 docs/content/docs/connectors/table/datagen.md|  14 +-
 2 files changed, 196 insertions(+), 38 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/datagen.md 
b/docs/content.zh/docs/connectors/table/datagen.md
index 87dd494fa19..210ccc088d2 100644
--- a/docs/content.zh/docs/connectors/table/datagen.md
+++ b/docs/content.zh/docs/connectors/table/datagen.md
@@ -29,51 +29,202 @@ under the License.
 {{< label "Scan Source: 有界" >}}
 {{< label "Scan Source: 无界" >}}
 
-DataGen 连接器允许按数据生成规则进行读取。
+DataGen 连接器允许基于内存生成数据来创建表。
+在本地开发时,若不访问外部系统(如 Kafka),这会非常有用。
+可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)灵活地生成记录。
 
-DataGen 连接器可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)。
-这使您可以灵活地生成记录。
+DataGen 连接器是内置的,不需要额外的依赖项。
 
-DataGen 连接器是内置的。
+用法
+-
 
-注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。
+默认情况下,DataGen 表将创建无限数量的行,每列都有一个随机值。
+对于 char、varchar、binary、varbinary、string、array、map 和 multiset 类型,可以指定长度。
+还可以指定总行数,从而生成有界表。
 
-怎么创建一个 DataGen 的表
-
-
-表的有界性:当表中字段的数据全部生成完成后,source 就结束了。 因此,表的有界性取决于字段的有界性。
-
-每个列,都有两种生成数据的方法:
+还支持序列生成器,您可以指定序列的起始和结束值。
+如果表中有任一列是序列类型,则该表将是有界的,并在第一个序列完成时结束。
 
-- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、binary、varbinary, string 
(类型)可以指定长度。它是无界的生成器。
-
-- 序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。
+时间类型字段对应的值始终是本地机器当前系统时间。
 
 ```sql
-CREATE TABLE datagen (
- f_sequence INT,
- f_random INT,
- f_random_str STRING,
- ts AS localtimestamp,
- WATERMARK FOR ts AS ts
+CREATE TABLE Orders (
+order_number BIGINT,
+priceDECIMAL(32,2),
+buyerROW,
+order_time   TIMESTAMP(3)
 ) WITH (
- 'connector' = 'datagen',
+  'connector' = 'datagen'
+)
+```
 
- -- optional options --
+DataGen 连接器通常与 ``LIKE`` 子句结合使用,以模拟物理表。
 
- 'rows-per-second'='5',
+```sql
+CREATE TABLE Orders (
+order_number BIGINT,
+priceDECIMAL(32,2),
+buyerROW,
+order_time   TIMESTAMP(3)
+) WITH (...)
 
- 'fields.f_sequence.kind'='sequence',
- 'fields.f_sequence.start'='1',
- 'fields.f_sequence.end'='1000',
+-- create a bounded mock table
+CREATE TEMPORARY TABLE GenOrders
+WITH (
+'connector' = 'datagen',
+'number-of-rows' = '10'
+)
+LIKE Orders (EXCLUDING ALL)
+```
 
- 'fields.f_random.min'='1',
- 'fields.f_random.max'='1000',
+此外,对于可变长度类型(varchar、string、varbinary 和 bytes),您可以指定是否生成可变长度的数据。
 
- 'fields.f_random_str.length'='10'
+```sql
+CREATE TABLE Orders (
+order_number BIGINT,
+priceDECIMAL(32,2),
+buyerROW,
+order_time   TIMESTAMP(3),
+seller   VARCHAR(150)
+) WITH (
+  'connector' = 'datagen',
+  'fields.seller.var-len' = 'true'
 )
 ```
 
+字段类型
+-
+
+
+
+
+Type
+Supported 
Generators
+Notes
+
+
+
+
+BOOLEAN
+random
+
+
+
+CHAR
+random / sequence
+
+
+
+VARCHAR
+random / sequence
+
+
+
+BINARY
+random / sequence
+
+
+
+VARBINARY
+random / sequence
+
+
+
+STRING
+random / sequence
+
+
+
+DECIMAL
+random / sequence
+
+
+
+TINYINT
+random / sequence
+
+
+
+SMALLINT
+random / sequence
+
+
+
+INT
+random / sequence
+
+
+
+BIGINT
+random / sequence
+
+
+
+FLOAT
+random / sequence
+
+
+
+DOUBLE
+random / sequence
+
+
+
+DATE
+random
+总是解析为本地机器的当前日期。
+
+
+TIME
+random
+总是解析为本地机器的当前时间。
+
+
+TIMESTAMP
+random
+
+解析为相对于本地机器的当前时间戳向过去偏移的时间戳。偏移的最大值可以通过 'max-past' 选项指定。
+
+
+
+TIMESTAMP_LTZ
+random
+
+解析为相对于本地机器的当前时间戳向过去偏移的时间戳。偏移的最大值可以通过 'max-past' 选项指定。
+
+
+
+INTERVAL YEAR TO MONTH
+random
+
+
+
+INTERVAL DAY TO MONTH
+random
+
+
+
+  

(flink-connector-opensearch) branch dependabot/maven/flink-connector-opensearch/org.opensearch-opensearch-2.11.1 created (now f0c07d7)

2023-12-01 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/flink-connector-opensearch/org.opensearch-opensearch-2.11.1
in repository https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git


  at f0c07d7  Bump org.opensearch:opensearch in /flink-connector-opensearch

No new revisions were added by this update.



(flink) 02/02: [FLINK-33470] Deleting JoinJsonPlanTest.java and JoinJsonPlanITCase.java

2023-12-01 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5edc7d7b18e88cc86e84d197202d8cbb40621864
Author: Jim Hughes 
AuthorDate: Tue Nov 7 17:14:09 2023 -0500

[FLINK-33470] Deleting JoinJsonPlanTest.java and JoinJsonPlanITCase.java

Removing json test files as well.
---
 .../plan/nodes/exec/stream/JoinJsonPlanTest.java   | 144 ---
 .../stream/jsonplan/JoinJsonPlanITCase.java| 168 
 .../JoinJsonPlanTest_jsonplan/testInnerJoin.out| 222 --
 .../testInnerJoinWithEqualPk.out   | 332 ---
 .../testInnerJoinWithPk.out| 450 -
 .../testLeftJoinNonEqui.out| 266 
 6 files changed, 1582 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java
deleted file mode 100644
index dd2771ea7cf..000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/** Test json serialization/deserialization for join. */
-class JoinJsonPlanTest extends TableTestBase {
-
-private StreamTableTestUtil util;
-private TableEnvironment tEnv;
-
-@BeforeEach
-void setup() {
-util = streamTestUtil(TableConfig.getDefault());
-tEnv = util.getTableEnv();
-
-String srcTableA =
-"CREATE TABLE A (\n"
-+ "  a1 int,\n"
-+ "  a2 bigint,\n"
-+ "  a3 bigint\n"
-+ ") with (\n"
-+ "  'connector' = 'values',\n"
-+ "  'bounded' = 'false')";
-String srcTableB =
-"CREATE TABLE B (\n"
-+ "  b1 int,\n"
-+ "  b2 bigint,\n"
-+ "  b3 bigint\n"
-+ ") with (\n"
-+ "  'connector' = 'values',\n"
-+ "  'bounded' = 'false')";
-String srcTableT =
-"CREATE TABLE t (\n"
-+ "  a int,\n"
-+ "  b bigint,\n"
-+ "  c varchar\n"
-+ ") with (\n"
-+ "  'connector' = 'values',\n"
-+ "  'bounded' = 'false')";
-String srcTableS =
-"CREATE TABLE s (\n"
-+ "  x bigint,\n"
-+ "  y varchar,\n"
-+ "  z int\n"
-+ ") with (\n"
-+ "  'connector' = 'values',\n"
-+ "  'bounded' = 'false')";
-tEnv.executeSql(srcTableA);
-tEnv.executeSql(srcTableB);
-tEnv.executeSql(srcTableT);
-tEnv.executeSql(srcTableS);
-}
-
-@Test
-void testInnerJoin() {
-String sinkTableDdl =
-"CREATE TABLE MySink (\n"
-+ "  a1 int,\n"
-+ "  b1 int\n"
-+ ") with (\n"
-+ "  'connector' = 'values',\n"
-+ "  'table-sink-class' = 'DEFAULT')";
-tEnv.executeSql(sinkTableDdl);
-util.verifyJsonPlan("INSERT INTO MySink SELECT a1, b1 FROM A JOIN B ON 
a1 = b1");
-}
-
-@Test
-void testInnerJoinWithEqualPk() {
-String query1 = "SELECT SUM(a2) AS a2, a1 FR

(flink) branch master updated (87d7b4abd0a -> 5edc7d7b18e)

2023-12-01 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 87d7b4abd0a Revert "Revert "[FLINK-32878][Filesystems] Add entropy in 
temporary object name in GCS connector. This closes #23729""
 new e886dfdda6c [FLINK-33470] Implement restore tests for Join node
 new 5edc7d7b18e [FLINK-33470] Deleting JoinJsonPlanTest.java and 
JoinJsonPlanITCase.java

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/table/test/program/SinkTestStep.java |  24 +-
 .../factories/TestValuesRuntimeFunctions.java  |  13 +-
 .../planner/factories/TestValuesTableFactory.java  |   9 +
 .../plan/nodes/exec/stream/JoinJsonPlanTest.java   | 144 ---
 .../plan/nodes/exec/testutils/JoinRestoreTest.java |  51 +++
 .../nodes/exec/testutils/JoinTestPrograms.java | 450 +
 .../plan/nodes/exec/testutils/RestoreTestBase.java |  25 +-
 .../stream/jsonplan/JoinJsonPlanITCase.java| 168 
 .../anti-join/plan/anti-join.json} | 189 -
 .../anti-join/savepoint/_metadata  | Bin 0 -> 9813 bytes
 .../cross-join/plan/cross-join.json}   | 129 +++---
 .../cross-join/savepoint/_metadata | Bin 0 -> 12164 bytes
 .../plan/inner-join-with-duplicate-key.json}   | 156 +++
 .../savepoint/_metadata| Bin 0 -> 12351 bytes
 .../plan/inner-join-with-equal-pk.json}| 150 ---
 .../inner-join-with-equal-pk/savepoint/_metadata   | Bin 0 -> 18610 bytes
 .../plan/inner-join-with-non-equi-join.json}   | 147 +++
 .../savepoint/_metadata| Bin 0 -> 13383 bytes
 .../plan/inner-join-with-pk.json}  | 198 -
 .../inner-join-with-pk/savepoint/_metadata | Bin 0 -> 21984 bytes
 .../join-with-filter/plan/join-with-filter.json| 305 ++
 .../join-with-filter/savepoint/_metadata   | Bin 0 -> 12147 bytes
 .../left-join/plan/left-join.json} | 152 +++
 .../left-join/savepoint/_metadata  | Bin 0 -> 13348 bytes
 .../plan/non-window-inner-join-with-null-cond.json | 354 
 .../savepoint/_metadata| Bin 0 -> 15297 bytes
 .../plan/non-window-inner-join.json| 354 
 .../non-window-inner-join/savepoint/_metadata  | Bin 0 -> 15209 bytes
 .../outer-join/plan/outer-join.json}   | 154 +++
 .../outer-join/savepoint/_metadata | Bin 0 -> 13813 bytes
 .../right-join/plan/right-join.json}   | 154 +++
 .../right-join/savepoint/_metadata | Bin 0 -> 13096 bytes
 .../semi-join/plan/semi-join.json} | 141 ---
 .../semi-join/savepoint/_metadata  | Bin 0 -> 11260 bytes
 34 files changed, 2291 insertions(+), 1176 deletions(-)
 delete mode 100644 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java
 create mode 100644 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinRestoreTest.java
 create mode 100644 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinTestPrograms.java
 delete mode 100644 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
 copy 
flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out
 => restore-tests/stream-exec-join_1/anti-join/plan/anti-join.json} (54%)
 create mode 100644 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/anti-join/savepoint/_metadata
 copy 
flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out
 => restore-tests/stream-exec-join_1/cross-join/plan/cross-join.json} (60%)
 create mode 100644 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/cross-join/savepoint/_metadata
 copy 
flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out
 => 
restore-tests/stream-exec-join_1/inner-join-with-duplicate-key/plan/inner-join-with-duplicate-key.json}
 (57%)
 create mode 100644 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/inner-join-with-duplicate-key/savepoint/_metadata
 rename 
flink-table/flink-table-planner/src/test/resources/{org/apache

(flink) branch master updated: Revert "Revert "[FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729""

2023-12-01 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 87d7b4abd0a Revert "Revert "[FLINK-32878][Filesystems] Add entropy in 
temporary object name in GCS connector. This closes #23729""
87d7b4abd0a is described below

commit 87d7b4abd0a1ec92433603c83401cc8ad00fd500
Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com>
AuthorDate: Fri Dec 1 16:37:21 2023 +0100

Revert "Revert "[FLINK-32878][Filesystems] Add entropy in temporary object 
name in GCS connector. This closes #23729""

This reverts commit c7da98f23c3f86de3a3b12355fa7a1289200f93d.
---
 docs/content.zh/docs/deployment/filesystems/gcs.md |  9 ++---
 docs/content/docs/deployment/filesystems/gcs.md|  9 ++---
 .../apache/flink/fs/gs/GSFileSystemOptions.java| 17 ++
 .../org/apache/flink/fs/gs/utils/BlobUtils.java| 21 +++-
 .../flink/fs/gs/writer/GSCommitRecoverable.java|  7 ++--
 .../fs/gs/writer/GSRecoverableWriterCommitter.java | 29 +
 .../apache/flink/fs/gs/utils/BlobUtilsTest.java| 15 +
 .../fs/gs/writer/GSCommitRecoverableTest.java  | 38 ++
 8 files changed, 127 insertions(+), 18 deletions(-)

diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md 
b/docs/content.zh/docs/deployment/filesystems/gcs.md
index 7edf78b2e61..f80e5b3af4a 100644
--- a/docs/content.zh/docs/deployment/filesystems/gcs.md
+++ b/docs/content.zh/docs/deployment/filesystems/gcs.md
@@ -76,10 +76,11 @@ You can also set `gcs-connector` options directly in the 
Hadoop `core-site.xml`
 
 `flink-gs-fs-hadoop` can also be configured by setting the following options 
in `flink-conf.yaml`:
 
-| Key   | Description  





  [...]
-|---|---
 [...]
-| gs.writer.temporary.bucket.name   | Set this property to choose a 
bucket to hold temporary blobs for in-progress writes via `RecoverableWriter`. 
If this property is not set, temporary blobs will be written to same bucket as 
the final file being written. In either case, temporary blobs are written with 
the prefix `.inprogress/`.   It is recommended to choose a separate 
bucket in order to [assign it a 
TTL](https://cloud.google.com/storage/docs/lifecycle), to provide a mec [...]
-| gs.writer.chunk.size  | Set this property to [set the 
chunk 
size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_)
 for writes via `RecoverableWriter`. If not set, a Google-determined 
default chunk size will be used.

[...]
+| Key | Description





  [...]
+|-|-
 [...]
+| gs.writer.temporary.bucket.name | Set this property to choose a bucket to 
hold temporary blobs for in-progress writes via `R

(flink) branch master updated: Revert "[FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729"

2023-12-01 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c7da98f23c3 Revert "[FLINK-32878][Filesystems] Add entropy in 
temporary object name in GCS connector. This closes #23729"
c7da98f23c3 is described below

commit c7da98f23c3f86de3a3b12355fa7a1289200f93d
Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com>
AuthorDate: Fri Dec 1 16:00:36 2023 +0100

Revert "[FLINK-32878][Filesystems] Add entropy in temporary object name in 
GCS connector. This closes #23729"

This reverts commit dc1db12137ecad921ae90969d7bfbf1ee7d3d2ef.
---
 docs/content.zh/docs/deployment/filesystems/gcs.md |  9 +++--
 docs/content/docs/deployment/filesystems/gcs.md|  9 +++--
 .../apache/flink/fs/gs/GSFileSystemOptions.java| 17 --
 .../org/apache/flink/fs/gs/utils/BlobUtils.java| 21 +---
 .../flink/fs/gs/writer/GSCommitRecoverable.java|  7 ++--
 .../fs/gs/writer/GSRecoverableWriterCommitter.java | 29 -
 .../apache/flink/fs/gs/utils/BlobUtilsTest.java| 15 -
 .../fs/gs/writer/GSCommitRecoverableTest.java  | 38 --
 8 files changed, 18 insertions(+), 127 deletions(-)

diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md 
b/docs/content.zh/docs/deployment/filesystems/gcs.md
index f80e5b3af4a..7edf78b2e61 100644
--- a/docs/content.zh/docs/deployment/filesystems/gcs.md
+++ b/docs/content.zh/docs/deployment/filesystems/gcs.md
@@ -76,11 +76,10 @@ You can also set `gcs-connector` options directly in the 
Hadoop `core-site.xml`
 
 `flink-gs-fs-hadoop` can also be configured by setting the following options 
in `flink-conf.yaml`:
 
-| Key | Description





  [...]
-|-|-
 [...]
-| gs.writer.temporary.bucket.name | Set this property to choose a bucket to 
hold temporary blobs for in-progress writes via `RecoverableWriter`. If this 
property is not set, temporary blobs will be written to same bucket as the 
final file being written. In either case, temporary blobs are written with the 
prefix `.inprogress/`.   It is recommended to choose a separate bucket 
in order to [assign it a TTL](https://cloud.google.com/storage/docs/lifecycle), 
to provide a mechanism to  [...]
-| gs.writer.chunk.size| Set this property to [set the chunk 
size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_)
 for writes via `RecoverableWriter`. If not set, a Google-determined 
default chunk size will be used.

  [...]
-| gs.filesink.entropy.enabled | Set this property to improve performance 
due to hotspotting issues on GCS. This option defines whether to enable entropy 
injection in filesink gcs path. If this is enabled, entropy in the form of 
temporary object id will be injected in beginning of the gcs path of the 
temporary objects. The final object path remains unchanged. 

[...]
+| Key   | Description  





  [...]
+|---|

(flink) branch master updated: [FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729

2023-12-01 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new dc1db12137e [FLINK-32878][Filesystems] Add entropy in temporary object 
name in GCS connector. This closes #23729
dc1db12137e is described below

commit dc1db12137ecad921ae90969d7bfbf1ee7d3d2ef
Author: Cheena Budhiraja <110803195+bche...@users.noreply.github.com>
AuthorDate: Fri Dec 1 18:52:43 2023 +0530

[FLINK-32878][Filesystems] Add entropy in temporary object name in GCS 
connector. This closes #23729
---
 docs/content.zh/docs/deployment/filesystems/gcs.md |  9 ++---
 docs/content/docs/deployment/filesystems/gcs.md|  9 ++---
 .../apache/flink/fs/gs/GSFileSystemOptions.java| 17 ++
 .../org/apache/flink/fs/gs/utils/BlobUtils.java| 21 +++-
 .../flink/fs/gs/writer/GSCommitRecoverable.java|  7 ++--
 .../fs/gs/writer/GSRecoverableWriterCommitter.java | 29 +
 .../apache/flink/fs/gs/utils/BlobUtilsTest.java| 15 +
 .../fs/gs/writer/GSCommitRecoverableTest.java  | 38 ++
 8 files changed, 127 insertions(+), 18 deletions(-)

diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md 
b/docs/content.zh/docs/deployment/filesystems/gcs.md
index 7edf78b2e61..f80e5b3af4a 100644
--- a/docs/content.zh/docs/deployment/filesystems/gcs.md
+++ b/docs/content.zh/docs/deployment/filesystems/gcs.md
@@ -76,10 +76,11 @@ You can also set `gcs-connector` options directly in the 
Hadoop `core-site.xml`
 
 `flink-gs-fs-hadoop` can also be configured by setting the following options 
in `flink-conf.yaml`:
 
-| Key   | Description  





  [...]
-|---|---
 [...]
-| gs.writer.temporary.bucket.name   | Set this property to choose a 
bucket to hold temporary blobs for in-progress writes via `RecoverableWriter`. 
If this property is not set, temporary blobs will be written to same bucket as 
the final file being written. In either case, temporary blobs are written with 
the prefix `.inprogress/`.   It is recommended to choose a separate 
bucket in order to [assign it a 
TTL](https://cloud.google.com/storage/docs/lifecycle), to provide a mec [...]
-| gs.writer.chunk.size  | Set this property to [set the 
chunk 
size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_)
 for writes via `RecoverableWriter`. If not set, a Google-determined 
default chunk size will be used.

[...]
+| Key | Description





  [...]
+|-|-
 [...]
+| gs.writer.temporary.bucket.name | Set this property to choose a bucket to 
hold temporary blobs for in-progress writes via `RecoverableWriter`. If this 
property is not set, temporary blobs will be written to same bucket as the 
final

(flink-kubernetes-operator) branch main updated: [FLINK-33710] Prevent NOOP spec updates from the autoscaler parallelism override map (#720)

2023-12-01 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 158cbe29 [FLINK-33710] Prevent NOOP spec updates from the autoscaler 
parallelism override map (#720)
158cbe29 is described below

commit 158cbe29169cbfb7fa7ad676fb0273fd7ef6d25e
Author: Maximilian Michels 
AuthorDate: Fri Dec 1 11:56:42 2023 +0100

[FLINK-33710] Prevent NOOP spec updates from the autoscaler parallelism 
override map (#720)
---
 .../autoscaler/KubernetesScalingRealizer.java  |  4 +++
 .../autoscaler/KubernetesScalingRealizerTest.java} | 42 +-
 2 files changed, 30 insertions(+), 16 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
index ca3f4ef2..6bb7a949 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.PipelineOptions;
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
 
 import java.util.Map;
+import java.util.TreeMap;
 
 /** The Kubernetes implementation for applying parallelism overrides. */
 public class KubernetesScalingRealizer
@@ -32,6 +33,9 @@ public class KubernetesScalingRealizer
 @Override
 public void realize(
 KubernetesJobAutoScalerContext context, Map 
parallelismOverrides) {
+// Make sure the keys are sorted via TreeMap to prevent changing the 
spec when none of the
+// entries changed but the key order is different!
+parallelismOverrides = new TreeMap<>(parallelismOverrides);
 context.getResource()
 .getSpec()
 .getFlinkConfiguration()
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
similarity index 50%
copy from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
copy to 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
index ca3f4ef2..dda7ba0a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
@@ -17,26 +17,36 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
-import org.apache.flink.autoscaler.realizer.ScalingRealizer;
-import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.PipelineOptions;
 
-import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.junit.jupiter.api.Test;
 
+import java.util.LinkedHashMap;
 import java.util.Map;
 
-/** The Kubernetes implementation for applying parallelism overrides. */
-public class KubernetesScalingRealizer
-implements ScalingRealizer 
{
-
-@Override
-public void realize(
-KubernetesJobAutoScalerContext context, Map 
parallelismOverrides) {
-context.getResource()
-.getSpec()
-.getFlinkConfiguration()
-.put(
-PipelineOptions.PARALLELISM_OVERRIDES.key(),
-ConfigurationUtils.convertValue(parallelismOverrides, 
String.class));
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for KubernetesScalingRealizer. */
+public class KubernetesScalingRealizerTest {
+
+@Test
+public void testAutoscalerOverridesVertexIdsAreSorted() {
+
+KubernetesJobAutoScalerContext ctx =
+TestingKubernetesAutoscalerUtils.createContext("test", null);
+
+// Create map which returns keys unsorted
+Map overrides = new LinkedHashMap<>();
+overrides.put("b", "2");
+overrides.put("a", "1");
+
+new KubernetesScalingRealizer().realize(ctx, overrides);
+
+assertThat(
+ctx.getResource()
+.getSpec()
+.getFlinkConfiguration()
+
.get(PipelineOptions.PARALLELISM_OVERRIDES.key()))
+.isEqualTo("a:1,b:2");
 }
 }



(flink-kubernetes-operator) branch main updated: [FLINK-33522] Be ware of SerializedThrowable when checking for StopWithSavepointStoppingException (#716)

2023-12-01 Thread mxm
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 51a91049 [FLINK-33522] Be ware of SerializedThrowable when checking 
for StopWithSavepointStoppingException (#716)
51a91049 is described below

commit 51a91049b5f17f8a0b21e11feceb4410a97c50c1
Author: Maximilian Michels 
AuthorDate: Fri Dec 1 10:47:40 2023 +0100

[FLINK-33522] Be ware of SerializedThrowable when checking for 
StopWithSavepointStoppingException (#716)

Turns out that the previous detection code in #706 may not always fire 
correctly
due to an encapsulated serialized throwable. This minor change fixes that.
---
 .../flink/kubernetes/operator/service/AbstractFlinkService.java | 6 --
 .../flink/kubernetes/operator/service/AbstractFlinkServiceTest.java | 5 +++--
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 0821b439..3c4fe4aa 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -331,8 +331,10 @@ public abstract class AbstractFlinkService implements 
FlinkService {
 exception);
 } catch (Exception e) {
 var stopWithSavepointException =
-ExceptionUtils.findThrowable(
-e, 
StopWithSavepointStoppingException.class);
+
ExceptionUtils.findThrowableSerializedAware(
+e,
+
StopWithSavepointStoppingException.class,
+getClass().getClassLoader());
 if (stopWithSavepointException.isPresent()) {
 // Handle edge case where the savepoint 
completes but the job fails
 // right afterward.
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index c12caf21..a108b8bb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -651,8 +651,9 @@ public class AbstractFlinkServiceTest {
 if (failAfterSavepointCompletes) {
 stopWithSavepointFuture.completeExceptionally(
 new CompletionException(
-new StopWithSavepointStoppingException(
-savepointPath, jobID)));
+new SerializedThrowable(
+new 
StopWithSavepointStoppingException(
+savepointPath, 
jobID;
 } else {
 stopWithSavepointFuture.complete(
 new Tuple3<>(id, formatType, savepointDir));