(flink) branch master updated: [hotfix] Prepare StatefulSequenceSource for removal in 2.0

2023-10-30 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 329e3c18350 [hotfix] Prepare StatefulSequenceSource for removal in 2.0
329e3c18350 is described below

commit 329e3c183501fe99be37d663b54f1c1e43bab573
Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com>
AuthorDate: Sat Oct 28 15:07:32 2023 +0200

[hotfix] Prepare StatefulSequenceSource for removal in 2.0
---
 .../streaming/api/functions/source/StatefulSequenceSource.java  | 3 +++
 .../apache/flink/connector/datagen/table/DataGenTableSource.java| 6 +-
 2 files changed, 4 insertions(+), 5 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
index a9f4d1e7023..983f49c7eca 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
@@ -42,6 +42,9 @@ import java.util.Deque;
  * This strategy guarantees that each element will be emitted exactly-once, 
but elements will not
  * necessarily be emitted in ascending order, even for the same tasks.
  *
+ * NOTE: this source will be removed together with the deprecated
+ * StreamExecutionEnvironmetn#generateSequence() method.
+ *
  * @deprecated This class is based on the {@link
  * org.apache.flink.streaming.api.functions.source.SourceFunction} API, 
which is due to be
  * removed. Use the new {@link 
org.apache.flink.api.connector.source.Source} API instead.
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
index f9c1bf4deee..9163c2a63f2 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.datagen.table;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.datagen.table.types.RowDataGenerator;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
 import 
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -33,10 +32,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.types.DataType;
 
-/**
- * A {@link StreamTableSource} that emits each number from a given interval 
exactly once, possibly
- * in parallel. See {@link StatefulSequenceSource}.
- */
+/** A {@link StreamTableSource} that emits generated data rows. */
 @Internal
 public class DataGenTableSource implements ScanTableSource, 
SupportsLimitPushDown {
 



(flink) branch master updated: [FLINK-33084][runtime] Migrate globalJobParameter to configuration.

2023-10-30 Thread zhuzh
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new b946ecc6683 [FLINK-33084][runtime] Migrate globalJobParameter to 
configuration.
b946ecc6683 is described below

commit b946ecc668342d48c2c0193ad4eff1897c75b68f
Author: JunRuiLee 
AuthorDate: Wed Sep 13 18:51:27 2023 +0800

[FLINK-33084][runtime] Migrate globalJobParameter to configuration.

This closes #23409.
---
 .../org/apache/flink/api/common/ExecutionConfig.java | 20 ++--
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index f3fae490cf7..22aee331513 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -154,8 +154,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable parameters) {
+configuration.set(PipelineOptions.GLOBAL_JOB_PARAMETERS, parameters);
 }
 
 // 

@@ -985,7 +990,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable

(flink) branch master updated: [FLINK-33360][connector/common] Clean up finishedReaders after switch to next Enumerator

2023-10-30 Thread leonard
This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 8d21c321dda [FLINK-33360][connector/common] Clean up finishedReaders 
after switch to next Enumerator
8d21c321dda is described below

commit 8d21c321dda18ad20022532c861d57ee38b70fda
Author: fengjiajie 
AuthorDate: Mon Oct 30 16:24:40 2023 +0800

[FLINK-33360][connector/common] Clean up finishedReaders after switch to 
next Enumerator

This closes #23593.
---
 .../source/hybrid/HybridSourceSplitEnumerator.java |  1 +
 .../hybrid/HybridSourceSplitEnumeratorTest.java| 36 ++
 2 files changed, 37 insertions(+)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
index c8b00b6a1f4..17c70ebbdc4 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
@@ -259,6 +259,7 @@ public class HybridSourceSplitEnumerator
 if (currentEnumerator != null) {
 try {
 currentEnumerator.close();
+finishedReaders.clear();
 } catch (Exception e) {
 throw new RuntimeException(e);
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
index fcde32811f4..8b068d645b6 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
@@ -252,6 +252,42 @@ public class HybridSourceSplitEnumeratorTest {
 assertThat(context.hasNoMoreSplits(0)).isTrue();
 }
 
+@Test
+public void testMultiSubtaskSwitchEnumerator() {
+context = new MockSplitEnumeratorContext<>(2);
+source =
+HybridSource.builder(MOCK_SOURCE)
+.addSource(MOCK_SOURCE)
+.addSource(MOCK_SOURCE)
+.build();
+
+enumerator = (HybridSourceSplitEnumerator) 
source.createEnumerator(context);
+enumerator.start();
+
+registerReader(context, enumerator, SUBTASK0);
+registerReader(context, enumerator, SUBTASK1);
+enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(-1));
+enumerator.handleSourceEvent(SUBTASK1, new 
SourceReaderFinishedEvent(-1));
+
+assertThat(getCurrentSourceIndex(enumerator)).isEqualTo(0);
+enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(0));
+assertThat(getCurrentSourceIndex(enumerator)).isEqualTo(0);
+enumerator.handleSourceEvent(SUBTASK1, new 
SourceReaderFinishedEvent(0));
+assertThat(getCurrentSourceIndex(enumerator))
+.as("all reader finished source-0")
+.isEqualTo(1);
+
+enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(1));
+assertThat(getCurrentSourceIndex(enumerator))
+.as(
+"only reader-0 has finished reading, reader-1 is not 
yet done, so do not switch to the next source")
+.isEqualTo(1);
+enumerator.handleSourceEvent(SUBTASK1, new 
SourceReaderFinishedEvent(1));
+assertThat(getCurrentSourceIndex(enumerator))
+.as("all reader finished source-1")
+.isEqualTo(2);
+}
+
 private static class UnderlyingEnumeratorWrapper
 implements SplitEnumerator {
 private static final MockSourceSplit SPLIT_1 = new MockSourceSplit(0, 
0, 1);



(flink-kubernetes-operator) branch main updated: [FLINK-33306] Always use busy tpr when observed is nan

2023-10-30 Thread gyfora
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 582c07ae [FLINK-33306] Always use busy tpr when observed is nan
582c07ae is described below

commit 582c07aeec6b4bbcc68e0a85d31000d89dbd1958
Author: Gyula Fora 
AuthorDate: Sat Oct 28 10:35:42 2023 +0200

[FLINK-33306] Always use busy tpr when observed is nan
---
 .../flink/autoscaler/ScalingMetricEvaluator.java   |  8 +++
 .../autoscaler/ScalingMetricEvaluatorTest.java | 26 ++
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index a97ac6f1..61aa6f2f 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -179,14 +179,14 @@ public class ScalingMetricEvaluator {
 double busyTimeTprAvg,
 double observedTprAvg) {
 
-if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) 
{
-return OBSERVED_TPR;
-}
-
 if (Double.isNaN(observedTprAvg)) {
 return TRUE_PROCESSING_RATE;
 }
 
+if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) 
{
+return OBSERVED_TPR;
+}
+
 double switchThreshold =
 
conf.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD);
 // If we could measure the observed tpr we decide whether to switch to 
using it
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index ce7c5cb5..839bda2e 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -433,6 +433,32 @@ public class ScalingMetricEvaluatorTest {
 .evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory))
 .get(source)
 .get(ScalingMetric.TRUE_PROCESSING_RATE));
+
+metricHistory.put(
+Instant.ofEpochMilli(100),
+new CollectedMetrics(
+Map.of(
+source,
+Map.of(
+ScalingMetric.LAG,
+0.,
+ScalingMetric.TRUE_PROCESSING_RATE,
+Double.POSITIVE_INFINITY,
+ScalingMetric.CURRENT_PROCESSING_RATE,
+100.,
+ScalingMetric.SOURCE_DATA_RATE,
+50.,
+ScalingMetric.LOAD,
+10.)),
+Map.of()));
+
+// Test that we used busy time based TPR even when infinity
+assertEquals(
+new EvaluatedScalingMetric(Double.POSITIVE_INFINITY, 
Double.POSITIVE_INFINITY),
+evaluator
+.evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory))
+.get(source)
+.get(ScalingMetric.TRUE_PROCESSING_RATE));
 }
 
 private Tuple2 getThresholds(



(flink) branch master updated: [FLINK-32181][docs] Enforce Maven 3.8.6 as required version

2023-10-30 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 957eaeda496 [FLINK-32181][docs] Enforce Maven 3.8.6 as required version
957eaeda496 is described below

commit 957eaeda496a5a0bc80c86601217a3d643671317
Author: Chesnay Schepler 
AuthorDate: Wed Oct 25 16:05:18 2023 +0200

[FLINK-32181][docs] Enforce Maven 3.8.6 as required version
---
 README.md | 5 +
 pom.xml   | 6 +-
 2 files changed, 2 insertions(+), 9 deletions(-)

diff --git a/README.md b/README.md
index b2691132ffa..a510cc89962 100644
--- a/README.md
+++ b/README.md
@@ -69,7 +69,7 @@ Prerequisites for building Flink:
 
 * Unix-like environment (we use Linux, Mac OS X, Cygwin, WSL)
 * Git
-* Maven (we recommend version 3.8.6 and require at least 3.1.1)
+* Maven (we require version 3.8.6)
 * Java 8 or 11 (Java 9 or 10 may work)
 
 ```
@@ -80,9 +80,6 @@ cd flink
 
 Flink is now installed in `build-target`.
 
-*NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain 
dependencies. Maven 3.1.1 creates the libraries properly.
-To build unit tests with Java 8, use Java 8u51 or above to prevent failures in 
unit tests that use the PowerMock runner.*
-
 ## Developing Flink
 
 The Flink committers use IntelliJ IDEA to develop the Flink codebase.
diff --git a/pom.xml b/pom.xml
index 3de7cb078b5..9d3176551dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1396,9 +1396,6 @@ under the License.





-   

-   
[3.8.6]
-   




[1.8.0,1.8.1)


@@ -1799,8 +1796,7 @@ under the License.




-   
-   
[3.1.1,)
+   
[3.8.6]





${target.java.version}



(flink) branch master updated: [FLINK-33393][doc] Fix typo in function documentation

2023-10-30 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 806147c3233 [FLINK-33393][doc] Fix typo in function documentation
806147c3233 is described below

commit 806147c3233a47eacaa630dca5fdfef83397ab31
Author: caicancai <2356672...@qq.com>
AuthorDate: Mon Oct 30 16:59:17 2023 +0800

[FLINK-33393][doc] Fix typo in function documentation
---
 docs/data/sql_functions_zh.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 69eeccc64b3..dea5b499bc1 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -339,7 +339,7 @@ string:
 table: STRING.rtrim()
 description: |
   返回从 STRING 中删除右边空格的字符串。
-  例如 `'This is a test String. '.ltrim()` 返回 `'This is a test String.'`。
+  例如 `'This is a test String. '.rtrim()` 返回 `'This is a test String.'`。
   - sql: REPEAT(string, int)
 table: STRING.repeat(INT)
 description: |



(flink) branch master updated: [hotfix][docs] Add SQL Gateway doc reference in the Chinese version of table overview

2023-10-30 Thread guoyangze
This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 701e937415f [hotfix][docs] Add SQL Gateway doc reference in the 
Chinese version of table overview
701e937415f is described below

commit 701e937415fbbfd5024d9f771e61e4c11ede01c5
Author: Xiangyu Feng 
AuthorDate: Mon Oct 30 19:40:07 2023 +0800

[hotfix][docs] Add SQL Gateway doc reference in the Chinese version of 
table overview
---
 docs/content.zh/docs/dev/table/overview.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/docs/content.zh/docs/dev/table/overview.md 
b/docs/content.zh/docs/dev/table/overview.md
index 05f758ac98c..ce62e434986 100644
--- a/docs/content.zh/docs/dev/table/overview.md
+++ b/docs/content.zh/docs/dev/table/overview.md
@@ -52,6 +52,7 @@ and later use the DataStream API to build alerting based on 
the matched patterns
 * [SQL]({{< ref "docs/dev/table/sql/overview" >}}): SQL 支持的操作和语法。
 * [内置函数]({{< ref "docs/dev/table/functions/systemFunctions" >}}): Table API 和 
SQL 中的内置函数。
 * [SQL Client]({{< ref "docs/dev/table/sqlClient" >}}): 不用编写代码就可以尝试 Flink 
SQL,可以直接提交 SQL 任务到集群上。
+* [SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}): SQL 
提交服务,支持多个客户端从远端并发提交 SQL 任务。
 * [SQL Jdbc Driver]({{< ref "docs/dev/table/jdbcDriver" >}}): 标准JDBC 
Driver,可以提交Flink SQL作业到Sql Gateway。
 * [OLAP Quickstart]({{< ref "docs/dev/table/olap_quickstart" >}}): Flink 
OLAP服务搭建指南.
 



(flink-web) 01/02: [FLINK-30768] Moves version configuration for quickstart scripts entirely into docs/config.toml

2023-10-30 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 6b3d1b55fc0516087e92bce403a7e2fd06b7f233
Author: yu <13485876233>
AuthorDate: Sun Oct 8 10:28:03 2023 +0800

[FLINK-30768] Moves version configuration for quickstart scripts entirely 
into docs/config.toml
---
 _include/q/{quickstart.sh => _utils.sh} | 43 +++--
 _include/q/gradle-quickstart.sh | 21 +++-
 _include/q/quickstart-SNAPSHOT.sh   |  5 +++-
 _include/q/quickstart.sh|  5 +++-
 _include/q/sbt-quickstart.sh| 16 ++--
 docs/config.toml|  3 +++
 6 files changed, 54 insertions(+), 39 deletions(-)

diff --git a/_include/q/quickstart.sh b/_include/q/_utils.sh
similarity index 52%
copy from _include/q/quickstart.sh
copy to _include/q/_utils.sh
index ce14e8f91..0989fced6 100644
--- a/_include/q/quickstart.sh
+++ b/_include/q/_utils.sh
@@ -1,5 +1,4 @@
 #!/usr/bin/env bash
-
 

 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
@@ -18,29 +17,25 @@
 # limitations under the License.
 

 
+SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
+CONFIG_DIR="${SCRIPT_DIR}/../../docs/config.toml"
 
-PACKAGE=quickstart
+# fatal error handling
+export PROCESS_ID=$$
+trap 'exit 1' TERM
 
-mvn archetype:generate 
\
-  -DarchetypeGroupId=org.apache.flink  \
-  -DarchetypeArtifactId=flink-quickstart-java  \
-  -DarchetypeVersion=${1:-1.17.0}  
\
-  -DgroupId=org.myorg.quickstart   \
-  -DartifactId=$PACKAGE
\
-  -Dversion=0.1
\
-  -Dpackage=org.myorg.quickstart   \
-  -DinteractiveMode=false
+function extract_parameter() {
+  if [[ "$#" != 1 ]]; then
+trigger_fatal_error "Fatal error: No parameter or too many parameters 
passed: $@"
+  fi
+  parameter_value="$(awk -F'"' '/'$1'[ ]*=[ ]*"/{print $2}' $CONFIG_DIR)"
+  if [ "$parameter_value" = "" ]; then
+trigger_fatal_error "Fatal error: $1 parameter no found valid value."
+  fi
+  echo ${parameter_value}
+}
 
-#
-# Give some guidance
-#
-echo -e "\\n\\n"
-echo -e "\\tA sample quickstart Flink Job has been created."
-echo -e "\\tSwitch into the directory using"
-echo -e "\\t\\t cd $PACKAGE"
-echo -e "\\tImport the project there using your favorite IDE (Import it as a 
maven project)"
-echo -e "\\tBuild a jar inside the directory using"
-echo -e "\\t\\t mvn clean package"
-echo -e "\\tYou will find the runnable jar in $PACKAGE/target"
-echo -e "\\tConsult our website if you have any troubles: 
http://flink.apache.org/community.html#mailing-lists";
-echo -e "\\n\\n"
+function trigger_fatal_error() {
+  echo $1 >&2
+  kill -s TERM $PROCESS_ID
+}
\ No newline at end of file
diff --git a/_include/q/gradle-quickstart.sh b/_include/q/gradle-quickstart.sh
index b8cec12cb..b7a6209fd 100644
--- a/_include/q/gradle-quickstart.sh
+++ b/_include/q/gradle-quickstart.sh
@@ -18,6 +18,8 @@
 # limitations under the License.
 

 
+source "$(dirname "$0")"/_utils.sh
+
 declare -r TRUE=0
 declare -r FALSE=1
 
@@ -41,11 +43,15 @@ function mkPackage() {
 defaultProjectName="quickstart"
 defaultOrganization="org.myorg.quickstart"
 defaultVersion="0.1-SNAPSHOT"
-defaultFlinkVersion="${1:-1.17.0}"
+defaultFlinkVersion="$(extract_parameter 'FlinkStableVersion')"
+defaultFlinkShortVersion="$(extract_parameter 'FlinkStableShortVersion')"
+flinkVersionFromParameter="${1:-$defaultFlinkVersion}"
 # flink-docs-master/docs/dev/datastream/project-configuration/#gradle
 # passes the scala version prefixed with a _, e.g.: _2.12
+defaultScalaShortVersion="$(extract_parameter 'ScalaShortVersion')"
 scalaBinaryVersionFromCmdArg="${2/_/}"
-defaultScalaBinaryVersion="${scalaBinaryVersionFromCmdArg:-2.12}"
+defaultScalaBinaryVersion="${scalaBinaryVersionFromCmdArg:-$defaultScalaShortVersion}"
+defaultJavaVersion="$(extract_parameter 'JavaVersion')"
 
 echo "This script creates a Flink project using Java and Gradle."
 
@@ -58,10 +64,12 @@ while [ $TRUE ]; do
   organization=${organization:-$defaultOrganization}
   read -p "Version ($defaultVersion): " version
   version=${version:-$defaultVersion}
-  read -p "Flink version ($defaultFlinkVersion): " flinkVersion
-  flinkVersion=${flinkVersion:-$defaultFlinkVersion}
+  read -p "Flink version ($flinkVersion

(flink-web) branch asf-site updated (86311d4a0 -> 95a7e0907)

2023-10-30 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


from 86311d4a0 Rebuild website
 new 6b3d1b55f [FLINK-30768] Moves version configuration for quickstart 
scripts entirely into docs/config.toml
 new 95a7e0907 Rebuild website

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 _utils.sh => _include/q/_utils.sh | 28 ++--
 _include/q/gradle-quickstart.sh   | 21 +++--
 _include/q/quickstart-SNAPSHOT.sh |  5 -
 _include/q/quickstart.sh  |  5 -
 _include/q/sbt-quickstart.sh  | 16 +---
 _utils.sh => content/q/_utils.sh  | 28 ++--
 content/q/gradle-quickstart.sh| 21 +++--
 content/q/quickstart-SNAPSHOT.sh  |  5 -
 content/q/quickstart.sh   |  5 -
 content/q/sbt-quickstart.sh   | 16 +---
 docs/config.toml  |  3 +++
 11 files changed, 103 insertions(+), 50 deletions(-)
 copy _utils.sh => _include/q/_utils.sh (61%)
 copy _utils.sh => content/q/_utils.sh (61%)



(flink-web) 02/02: Rebuild website

2023-10-30 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 95a7e090795e51f92bd5b073746462b837f5ac1b
Author: Matthias Pohl 
AuthorDate: Mon Oct 30 14:48:44 2023 +0100

Rebuild website
---
 content/q/{quickstart.sh => _utils.sh} | 43 +++---
 content/q/gradle-quickstart.sh | 21 -
 content/q/quickstart-SNAPSHOT.sh   |  5 +++-
 content/q/quickstart.sh|  5 +++-
 content/q/sbt-quickstart.sh| 16 +++--
 5 files changed, 51 insertions(+), 39 deletions(-)

diff --git a/content/q/quickstart.sh b/content/q/_utils.sh
similarity index 52%
copy from content/q/quickstart.sh
copy to content/q/_utils.sh
index ce14e8f91..0989fced6 100644
--- a/content/q/quickstart.sh
+++ b/content/q/_utils.sh
@@ -1,5 +1,4 @@
 #!/usr/bin/env bash
-
 

 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
@@ -18,29 +17,25 @@
 # limitations under the License.
 

 
+SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
+CONFIG_DIR="${SCRIPT_DIR}/../../docs/config.toml"
 
-PACKAGE=quickstart
+# fatal error handling
+export PROCESS_ID=$$
+trap 'exit 1' TERM
 
-mvn archetype:generate 
\
-  -DarchetypeGroupId=org.apache.flink  \
-  -DarchetypeArtifactId=flink-quickstart-java  \
-  -DarchetypeVersion=${1:-1.17.0}  
\
-  -DgroupId=org.myorg.quickstart   \
-  -DartifactId=$PACKAGE
\
-  -Dversion=0.1
\
-  -Dpackage=org.myorg.quickstart   \
-  -DinteractiveMode=false
+function extract_parameter() {
+  if [[ "$#" != 1 ]]; then
+trigger_fatal_error "Fatal error: No parameter or too many parameters 
passed: $@"
+  fi
+  parameter_value="$(awk -F'"' '/'$1'[ ]*=[ ]*"/{print $2}' $CONFIG_DIR)"
+  if [ "$parameter_value" = "" ]; then
+trigger_fatal_error "Fatal error: $1 parameter no found valid value."
+  fi
+  echo ${parameter_value}
+}
 
-#
-# Give some guidance
-#
-echo -e "\\n\\n"
-echo -e "\\tA sample quickstart Flink Job has been created."
-echo -e "\\tSwitch into the directory using"
-echo -e "\\t\\t cd $PACKAGE"
-echo -e "\\tImport the project there using your favorite IDE (Import it as a 
maven project)"
-echo -e "\\tBuild a jar inside the directory using"
-echo -e "\\t\\t mvn clean package"
-echo -e "\\tYou will find the runnable jar in $PACKAGE/target"
-echo -e "\\tConsult our website if you have any troubles: 
http://flink.apache.org/community.html#mailing-lists";
-echo -e "\\n\\n"
+function trigger_fatal_error() {
+  echo $1 >&2
+  kill -s TERM $PROCESS_ID
+}
\ No newline at end of file
diff --git a/content/q/gradle-quickstart.sh b/content/q/gradle-quickstart.sh
index b8cec12cb..b7a6209fd 100644
--- a/content/q/gradle-quickstart.sh
+++ b/content/q/gradle-quickstart.sh
@@ -18,6 +18,8 @@
 # limitations under the License.
 

 
+source "$(dirname "$0")"/_utils.sh
+
 declare -r TRUE=0
 declare -r FALSE=1
 
@@ -41,11 +43,15 @@ function mkPackage() {
 defaultProjectName="quickstart"
 defaultOrganization="org.myorg.quickstart"
 defaultVersion="0.1-SNAPSHOT"
-defaultFlinkVersion="${1:-1.17.0}"
+defaultFlinkVersion="$(extract_parameter 'FlinkStableVersion')"
+defaultFlinkShortVersion="$(extract_parameter 'FlinkStableShortVersion')"
+flinkVersionFromParameter="${1:-$defaultFlinkVersion}"
 # flink-docs-master/docs/dev/datastream/project-configuration/#gradle
 # passes the scala version prefixed with a _, e.g.: _2.12
+defaultScalaShortVersion="$(extract_parameter 'ScalaShortVersion')"
 scalaBinaryVersionFromCmdArg="${2/_/}"
-defaultScalaBinaryVersion="${scalaBinaryVersionFromCmdArg:-2.12}"
+defaultScalaBinaryVersion="${scalaBinaryVersionFromCmdArg:-$defaultScalaShortVersion}"
+defaultJavaVersion="$(extract_parameter 'JavaVersion')"
 
 echo "This script creates a Flink project using Java and Gradle."
 
@@ -58,10 +64,12 @@ while [ $TRUE ]; do
   organization=${organization:-$defaultOrganization}
   read -p "Version ($defaultVersion): " version
   version=${version:-$defaultVersion}
-  read -p "Flink version ($defaultFlinkVersion): " flinkVersion
-  flinkVersion=${flinkVersion:-$defaultFlinkVersion}
+  read -p "Flink version ($flinkVersionFromParameter): " flinkVersion
+  flinkVersion=${flinkVersion:-$flinkVersionFromParameter}
   read -p "Scala version ($defaultScalaBinaryVersion

(flink) branch master updated (701e937415f -> cc62044efc0)

2023-10-30 Thread jchan
This is an automated email from the ASF dual-hosted git repository.

jchan pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 701e937415f [hotfix][docs] Add SQL Gateway doc reference in the 
Chinese version of table overview
 add cc62044efc0 [FLINK-4][table] Make map entries sorted by keys in 
json plan to have it stable for java21

No new revisions were added by this update.

Summary of changes:
 .../plan/nodes/exec/serde/JsonSerdeUtil.java   |   2 +
 .../test/resources/jsonplan/testGetJsonPlan.out|   4 +-
 .../CalcJsonPlanTest_jsonplan/testComplexCalc.out  |   4 +-
 .../testProjectPushDown.out|   4 +-
 .../stream/CalcJsonPlanTest_jsonplan/testSarg.out  |   8 +-
 .../CalcJsonPlanTest_jsonplan/testSimpleFilter.out |   4 +-
 .../testSimpleProject.out  |   4 +-
 .../testChangelogSource.out|   8 +-
 .../testUpsertSource.out   |   8 +-
 .../testCrossJoin.out  |   4 +-
 .../testCrossJoinOverrideParameters.out|   4 +-
 .../testJoinWithFilter.out |   4 +-
 .../testLeftOuterJoinWithLiteralTrue.out   |   4 +-
 .../testDeduplication.out  |   4 +-
 .../ExpandJsonPlanTest_jsonplan/testExpand.out |   8 +-
 ...tDistinctAggCalls[isMiniBatchEnabled=false].out |   8 +-
 ...stDistinctAggCalls[isMiniBatchEnabled=true].out |   8 +-
 ...gCallsWithGroupBy[isMiniBatchEnabled=false].out |   8 +-
 ...ggCallsWithGroupBy[isMiniBatchEnabled=true].out |   8 +-
 ...AggWithoutGroupBy[isMiniBatchEnabled=false].out |   8 +-
 ...eAggWithoutGroupBy[isMiniBatchEnabled=true].out |   8 +-
 ...erDefinedAggCalls[isMiniBatchEnabled=false].out |   8 +-
 ...serDefinedAggCalls[isMiniBatchEnabled=true].out |   8 +-
 .../testIncrementalAggregate.out   |   8 +-
 ...lAggregateWithSumCountDistinctAndRetraction.out |   8 +-
 .../testProcessingTimeInnerJoinWithOnClause.out|   8 +-
 .../testRowTimeInnerJoinWithOnClause.out   |   8 +-
 .../JoinJsonPlanTest_jsonplan/testInnerJoin.out|   8 +-
 .../testInnerJoinWithEqualPk.out   |  12 +--
 .../testInnerJoinWithPk.out|  12 +--
 .../testLeftJoinNonEqui.out|  12 +--
 .../LimitJsonPlanTest_jsonplan/testLimit.out   |   8 +-
 .../testAggAndLeftJoinWithTryResolveMode.out   |  12 +--
 .../testJoinTemporalTable.out  |   8 +-
 .../testJoinTemporalTableWithAsyncHint.out |   8 +-
 .../testJoinTemporalTableWithAsyncHint2.out|   8 +-
 .../testJoinTemporalTableWithAsyncRetryHint.out|   8 +-
 .../testJoinTemporalTableWithAsyncRetryHint2.out   |   8 +-
 ...testJoinTemporalTableWithProjectionPushDown.out |   8 +-
 .../testJoinTemporalTableWithRetryHint.out |   8 +-
 ...eftJoinTemporalTableWithMultiJoinConditions.out |  10 +--
 .../testLeftJoinTemporalTableWithPostFilter.out|  10 +--
 .../testLeftJoinTemporalTableWithPreFilter.out |  10 +--
 .../testMatch.out  |  16 ++--
 .../testSkipPastLastRow.out| 100 ++---
 .../testSkipToFirst.out| 100 ++---
 .../testSkipToLast.out | 100 ++---
 .../testSkipToNextRow.out  | 100 ++---
 .../testProcTimeBoundedNonPartitionedRangeOver.out |   8 +-
 .../testProcTimeBoundedPartitionedRangeOver.out|   8 +-
 ...undedPartitionedRowsOverWithBuiltinProctime.out |   8 +-
 .../testProcTimeUnboundedPartitionedRangeOver.out  |   8 +-
 ...stProctimeBoundedDistinctPartitionedRowOver.out |   8 +-
 ...edDistinctWithNonDistinctPartitionedRowOver.out |   8 +-
 .../testRowTimeBoundedPartitionedRowsOver.out  |   8 +-
 .../testPythonCalc.out |   4 +-
 .../testPythonFunctionInWhereClause.out|   4 +-
 .../testJoinWithFilter.out |   4 +-
 .../testPythonTableFunction.out|   4 +-
 .../tesPythonAggCallsWithGroupBy.out   |   8 +-
 .../testProcTimeBoundedNonPartitionedRangeOver.out |   8 +-
 .../testProcTimeBoundedPartitionedRangeOver.out|   8 +-
 ...undedPartitionedRowsOverWithBuiltinProctime.out |   8 +-
 .../testProcTimeUnboundedPartitionedRangeOver.out  |   8 +-
 .../testRowTimeBoundedPartitionedRowsOver.out  |   8 +-
 .../stream/RankJsonPlanTest_jsonplan/testRank.out  |   8 +-
 .../stream/SortJsonPlanTest_jsonplan/testSort.out  |  10 +--
 .../testSortLimit.out  |   8 +-
 ...WithNonDeterministicFuncSinkWithDifferentPk.out |   8 +-
 .../testOverwrite.out  |   8 +-
 .../testPartialInsert.out  |   8 +-
 .../testPartitioning.out   |   8 +-
 .../testWritingMetadata.out|   4 +-

(flink-connector-elasticsearch) branch dependabot/maven/flink-connector-elasticsearch-base/org.elasticsearch-elasticsearch-7.17.13 created (now 24cd74a)

2023-10-30 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/flink-connector-elasticsearch-base/org.elasticsearch-elasticsearch-7.17.13
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


  at 24cd74a  Bump org.elasticsearch:elasticsearch

No new revisions were added by this update.



(flink) branch master updated (cc62044efc0 -> 530ebd2f4ef)

2023-10-30 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from cc62044efc0 [FLINK-4][table] Make map entries sorted by keys in 
json plan to have it stable for java21
 add 530ebd2f4ef [FLINK-32182][build] Use original japicmp plugin

No new revisions were added by this update.

Summary of changes:
 flink-scala/pom.xml  | 2 +-
 flink-streaming-scala/pom.xml| 4 ++--
 flink-table/flink-sql-jdbc-driver-bundle/pom.xml | 2 +-
 pom.xml  | 8 
 4 files changed, 8 insertions(+), 8 deletions(-)



(flink-connector-kafka) branch main updated: [hotfix] refer to sql_connector_download_table shortcode in the docs to adhere to new connector versioning format

2023-10-30 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
 new 979791c4 [hotfix] refer to sql_connector_download_table shortcode in 
the docs to adhere to new connector versioning format
979791c4 is described below

commit 979791c4c71e944c16c51419cf9a84aa1f8fea4c
Author: mas-chen 
AuthorDate: Mon Oct 30 12:29:27 2023 -0700

[hotfix] refer to sql_connector_download_table shortcode in the docs to 
adhere to new connector versioning format
---
 docs/content/docs/connectors/table/kafka.md| 2 +-
 docs/content/docs/connectors/table/upsert-kafka.md | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/content/docs/connectors/table/kafka.md 
b/docs/content/docs/connectors/table/kafka.md
index 18578569..0256301b 100644
--- a/docs/content/docs/connectors/table/kafka.md
+++ b/docs/content/docs/connectors/table/kafka.md
@@ -35,7 +35,7 @@ The Kafka connector allows for reading data from and writing 
data into Kafka top
 Dependencies
 
 
-{{< sql_download_table "kafka" >}}
+{{< sql_connector_download_table "kafka" >}}
 
 The Kafka connector is not part of the binary distribution.
 See how to link with it for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).
diff --git a/docs/content/docs/connectors/table/upsert-kafka.md 
b/docs/content/docs/connectors/table/upsert-kafka.md
index c3537a76..61237640 100644
--- a/docs/content/docs/connectors/table/upsert-kafka.md
+++ b/docs/content/docs/connectors/table/upsert-kafka.md
@@ -47,7 +47,7 @@ key will fall into the same partition.
 Dependencies
 
 
-{{< sql_download_table "upsert-kafka" >}}
+{{< sql_connector_download_table "upsert-kafka" >}}
 
 The Upsert Kafka connector is not part of the binary distribution.
 See how to link with it for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).



(flink-connector-kafka) branch v3.0 updated: [hotfix] refer to sql_connector_download_table shortcode in the docs to adhere to new connector versioning format

2023-10-30 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/v3.0 by this push:
 new 5ee821b7 [hotfix] refer to sql_connector_download_table shortcode in 
the docs to adhere to new connector versioning format
5ee821b7 is described below

commit 5ee821b7d1f631c4db99ea20ae80a0e0b9814a77
Author: mas-chen 
AuthorDate: Mon Oct 30 12:29:27 2023 -0700

[hotfix] refer to sql_connector_download_table shortcode in the docs to 
adhere to new connector versioning format
---
 docs/content/docs/connectors/table/kafka.md| 2 +-
 docs/content/docs/connectors/table/upsert-kafka.md | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/content/docs/connectors/table/kafka.md 
b/docs/content/docs/connectors/table/kafka.md
index 3c9e739c..323a2874 100644
--- a/docs/content/docs/connectors/table/kafka.md
+++ b/docs/content/docs/connectors/table/kafka.md
@@ -35,7 +35,7 @@ The Kafka connector allows for reading data from and writing 
data into Kafka top
 Dependencies
 
 
-{{< sql_download_table "kafka" >}}
+{{< sql_connector_download_table "kafka" >}}
 
 The Kafka connector is not part of the binary distribution.
 See how to link with it for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).
diff --git a/docs/content/docs/connectors/table/upsert-kafka.md 
b/docs/content/docs/connectors/table/upsert-kafka.md
index 12a23c5c..16983790 100644
--- a/docs/content/docs/connectors/table/upsert-kafka.md
+++ b/docs/content/docs/connectors/table/upsert-kafka.md
@@ -47,7 +47,7 @@ key will fall into the same partition.
 Dependencies
 
 
-{{< sql_download_table "upsert-kafka" >}}
+{{< sql_connector_download_table "upsert-kafka" >}}
 
 The Upsert Kafka connector is not part of the binary distribution.
 See how to link with it for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).



svn commit: r64942 - /dev/flink/flink-connector-kafka-3.0.1-rc1/ /release/flink/flink-connector-kafka-3.0.1/

2023-10-30 Thread tzulitai
Author: tzulitai
Date: Mon Oct 30 22:44:09 2023
New Revision: 64942

Log:
Release flink-connector-kafka 3.0.1

Added:
release/flink/flink-connector-kafka-3.0.1/
  - copied from r64941, dev/flink/flink-connector-kafka-3.0.1-rc1/
Removed:
dev/flink/flink-connector-kafka-3.0.1-rc1/



(flink-connector-kafka) annotated tag v3.0.1 updated (ea4fac39 -> 946ad296)

2023-10-30 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to annotated tag v3.0.1
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


*** WARNING: tag v3.0.1 was modified! ***

from ea4fac39 (commit)
  to 946ad296 (tag)
 tagging ea4fac3966c84f4cae8b80d70873254f03b1c333 (commit)
  by Tzu-Li (Gordon) Tai
  on Mon Oct 30 15:47:38 2023 -0700

- Log -
v3.0.1
-BEGIN PGP SIGNATURE-

iQIzBAABCAAdFiEEHB4jlNMZThlEYTSI8yCYbTXDPWoFAmVAMooACgkQ8yCYbTXD
PWr1HA//eRM7aYQfoS5psRN0OOgcK2kpSiabKsSmLcP1b2+8LM7+8CEpbmEXhA7/
WXmgS/XFADnl4PR+sNk06B3g5T1Gjf/tnPX+f+Hc/Yarv3NxCw89ql12aiukTgJQ
3lHOoNYEG1ypGdh3PCak/Ig9rFVIL84DtrZbGopI3lDow1yxJ6wKD4tCuMOZ8tdH
d1qRSVHw7iFIErj6ECteYtLydUxWT2zdmKEIMpwxvAKsZRshliaeYahwIWhUm8aJ
qLM1TaDf0V3TToPtUVwHmvOD/OoDuVrNbUiTO+ZoSEKSKN2vLsxQzGCLFG32vUVv
FqA15Cp5EQ1fPt775OvRuoZ27OmsW0GRRpt+Q17Rk6T/MTsFCOsDXdNTFEPJnZ7Q
2jn0fNMuiu/6nBLrLeZmSJPqHTbBtwA/PPI6W1QDc8f5DVuIkc0DGF0NG1AZIr+f
fkWVnTwwm7PpIsOGXuJP8GyyTMq4Cc1zPaRp8W27dKPNaCrHcC2TDuqYoL8+Yy25
IehUBieUzqIKspftED8zo9x79xEZ3pP0A9jg6VBQZaejS1WW9g6AmFZ1zFdwHBlL
kMrBjVcBzE1mPO1NdAHG/yTRO+5fNJxGA9IrCHtTnT3CJlZvKG1bGAqrnsvFJeV8
0etyufT+JUzhGiMoj9hUWL9Pt+0T1xSMMq1gmcU57fMYHFOWr78=
=xGtk
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



(flink) branch master updated: [FLINK-33080][runtime] Fix ‘state.checkpoint-storage’ not taking effect in job generation stage

2023-10-30 Thread zhuzh
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 25697476095 [FLINK-33080][runtime] Fix ‘state.checkpoint-storage’ not 
taking effect in job generation stage
25697476095 is described below

commit 25697476095a5b9cf38dc3b61c684d0e912b1353
Author: JunRuiLee 
AuthorDate: Wed Sep 13 16:00:52 2023 +0800

[FLINK-33080][runtime] Fix ‘state.checkpoint-storage’ not taking effect in 
job generation stage

This closes #23408.
---
 .../client/program/StreamContextEnvironment.java   |  1 +
 .../program/StreamContextEnvironmentTest.java  | 30 ++
 .../environment/StreamExecutionEnvironment.java| 18 +
 .../StreamExecutionEnvironmentTest.java| 19 ++
 4 files changed, 68 insertions(+)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 21ddca61694..07f47573666 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -303,6 +303,7 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
 private void checkCheckpointConfig(Configuration clusterConfigMap, 
List errors) {
 CheckpointConfig expectedCheckpointConfig = new CheckpointConfig();
 expectedCheckpointConfig.configure(clusterConfigMap);
+configureCheckpointStorage(clusterConfigMap, expectedCheckpointConfig);
 checkConfigurationObject(
 expectedCheckpointConfig.toConfiguration(),
 checkpointCfg.toConfiguration(),
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
index 85f6279fc58..1af115a402a 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
@@ -121,6 +121,36 @@ class StreamContextEnvironmentTest {
 CheckpointConfig.class.getSimpleName(), 
"setCheckpointStorage");
 }
 
+@ParameterizedTest
+@MethodSource("provideExecutors")
+void testDisallowCheckpointStorageByConfiguration(
+ThrowingConsumer executor) {
+final Configuration clusterConfig = new Configuration();
+
+Configuration jobConfig = new Configuration();
+String disallowedPath = "file:///flink/disallowed/modification";
+jobConfig.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
+jobConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
disallowedPath);
+final ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+final StreamContextEnvironment environment =
+new StreamContextEnvironment(
+new MockExecutorServiceLoader(),
+clusterConfig,
+jobConfig,
+classLoader,
+true,
+true,
+false,
+Collections.emptyList());
+
+environment.fromCollection(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
+assertThatThrownBy(() -> executor.accept(environment))
+.isInstanceOf(MutatedConfigurationException.class)
+.hasMessageContainingAll(
+CheckpointingOptions.CHECKPOINT_STORAGE.key(),
+CheckpointingOptions.CHECKPOINTS_DIRECTORY.key());
+}
+
 @ParameterizedTest
 @MethodSource("provideExecutors")
 void testAllowCheckpointStorage(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 891aabc52f6..343308edcf2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -73,6 +73,8 @@ import 
org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStora