(flink) branch master updated: [FLINK-34072][scripts] Replace java command to JAVA_RUN in config.sh

2024-01-17 Thread tangyun
This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e7c8cd1562e [FLINK-34072][scripts] Replace java command to JAVA_RUN in 
config.sh
e7c8cd1562e is described below

commit e7c8cd1562ebd45c1f7b48f519a11c6cd4fdf100
Author: Yu Chen 
AuthorDate: Sun Jan 14 20:57:05 2024 +0800

[FLINK-34072][scripts] Replace java command to JAVA_RUN in config.sh
---
 flink-dist/src/main/flink-bin/bin/config.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index dcd48a256f7..363e17a375f 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -335,7 +335,7 @@ if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
 # Remove leading and ending double quotes (if present) of value
 FLINK_ENV_JAVA_OPTS="-XX:+IgnoreUnrecognizedVMOptions $( echo 
"${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//'  -e 's/"$//' )"
 
-JAVA_SPEC_VERSION=`${JAVA_HOME}/bin/java -XshowSettings:properties 2>&1 | 
grep "java.specification.version" | cut -d "=" -f 2 | tr -d '[:space:]' | rev | 
cut -d "." -f 1 | rev`
+JAVA_SPEC_VERSION=`"${JAVA_RUN}" -XshowSettings:properties 2>&1 | grep 
"java.specification.version" | cut -d "=" -f 2 | tr -d '[:space:]' | rev | cut 
-d "." -f 1 | rev`
 if [[ $(( $JAVA_SPEC_VERSION > 17 )) == 1 ]]; then
   # set security manager property to allow calls to 
System.setSecurityManager() at runtime
   FLINK_ENV_JAVA_OPTS="$FLINK_ENV_JAVA_OPTS -Djava.security.manager=allow"



(flink) branch master updated: [FLINK-34119][doc] Improve description about changelog in document

2024-01-17 Thread hangxiang
This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 2ec8f8157f9 [FLINK-34119][doc] Improve description about changelog in 
document
2ec8f8157f9 is described below

commit 2ec8f8157f95a79ee94d609657f9b08f8f0b6a26
Author: Hangxiang Yu 
AuthorDate: Sat Jan 13 14:50:36 2024 +0800

[FLINK-34119][doc] Improve description about changelog in document
---
 docs/content.zh/docs/deployment/config.md|  3 +--
 docs/content.zh/docs/ops/state/state_backends.md | 13 +++--
 docs/content/docs/deployment/config.md   |  3 +--
 docs/content/docs/ops/state/state_backends.md| 15 ---
 4 files changed, 17 insertions(+), 17 deletions(-)

diff --git a/docs/content.zh/docs/deployment/config.md 
b/docs/content.zh/docs/deployment/config.md
index 34d04f733e5..cf0740bf8de 100644
--- a/docs/content.zh/docs/deployment/config.md
+++ b/docs/content.zh/docs/deployment/config.md
@@ -370,8 +370,7 @@ Advanced options to tune RocksDB and RocksDB checkpoints.
 ### State Changelog Options
 
 Please refer to [State Backends]({{< ref 
"docs/ops/state/state_backends#enabling-changelog" >}}) for information on
-using State Changelog. {{< hint warning >}} The feature is in experimental 
status. {{< /hint >}} {{<
-generated/state_backend_changelog_section >}}
+using State Changelog.
 
  FileSystem-based Changelog options
 
diff --git a/docs/content.zh/docs/ops/state/state_backends.md 
b/docs/content.zh/docs/ops/state/state_backends.md
index eda37dada7e..5d7d4f92b1c 100644
--- a/docs/content.zh/docs/ops/state/state_backends.md
+++ b/docs/content.zh/docs/ops/state/state_backends.md
@@ -349,10 +349,6 @@ Python API 中尚不支持该特性。
 
 ## 开启 Changelog
 
-{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}}
-
-{{< hint warning >}} 开启 Changelog 可能会给您的应用带来性能损失。(见下文) {{< /hint >}}
-
 
 
 ### 介绍
@@ -372,16 +368,21 @@ Changelog 是一项旨在减少 checkpointing 时间的功能,因此也可以
 
 开启 Changelog 功能之后,Flink 会不断上传状态变更并形成 changelog。创建 checkpoint 时,只有 changelog 
中的相关部分需要上传。而配置的状态后端则会定期在后台进行快照,快照成功上传后,相关的changelog 将会被截断。
 
-基于此,异步阶段的持续时间减少(另外因为不需要将数据刷新到磁盘,同步阶段持续时间也减少了),特别是长尾延迟得到了改善。
+基于此,异步阶段的持续时间减少(另外因为不需要将数据刷新到磁盘,同步阶段持续时间也减少了),特别是长尾延迟得到了改善。同时,还可以获得一些其他好处:
+1. 更稳定、更低的端到端时延。
+2. Failover 后数据重放更少。
+3. 资源利用更加稳定。
 
 但是,资源使用会变得更高:
 
 - 将会在 DFS 上创建更多文件
-- 将可能在 DFS 上残留更多文件(这将在 FLINK-25511 和 FLINK-25512 之后的新版本中被解决)
 - 将使用更多的 IO 带宽用来上传状态变更
 - 将使用更多 CPU 资源来序列化状态变更
 - Task Managers 将会使用更多内存来缓存状态变更
 
+值得注意的是虽然 Changelog 增加了少量的日常 CPU 和网络带宽资源使用,
+但会降低峰值的 CPU 和网络带宽使用量。
+
 另一项需要考虑的事情是恢复时间。取决于 `state.backend.changelog.periodic-materialize.interval` 
的设置,changelog 可能会变得冗长,因此重放会花费更多时间。即使这样,恢复时间加上 checkpoint 持续时间仍然可能低于不开启 
changelog 功能的时间,从而在故障恢复的情况下也能提供更低的端到端延迟。当然,取决于上述时间的实际比例,有效恢复时间也有可能会增加。
 
 有关更多详细信息,请参阅 
[FLIP-158](https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints)。
diff --git a/docs/content/docs/deployment/config.md 
b/docs/content/docs/deployment/config.md
index cbdc4f25a77..c4e70ba7235 100644
--- a/docs/content/docs/deployment/config.md
+++ b/docs/content/docs/deployment/config.md
@@ -372,8 +372,7 @@ Advanced options to tune RocksDB and RocksDB checkpoints.
 ### State Changelog Options
 
 Please refer to [State Backends]({{< ref 
"docs/ops/state/state_backends#enabling-changelog" >}}) for information on
-using State Changelog. {{< hint warning >}} The feature is in experimental 
status. {{< /hint >}} {{<
-generated/state_backend_changelog_section >}}
+using State Changelog.
 
  FileSystem-based Changelog options
 
diff --git a/docs/content/docs/ops/state/state_backends.md 
b/docs/content/docs/ops/state/state_backends.md
index b645eefcd8b..bd04491977f 100644
--- a/docs/content/docs/ops/state/state_backends.md
+++ b/docs/content/docs/ops/state/state_backends.md
@@ -346,10 +346,6 @@ Still not supported in Python API.
 
 ## Enabling Changelog
 
-{{< hint warning >}} This feature is in experimental status. {{< /hint >}}
-
-{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
-
 ### Introduction
 
 Changelog is a feature that aims to decrease checkpointing time and, 
therefore, end-to-end latency in exactly-once mode.
@@ -361,7 +357,7 @@ Most commonly, checkpoint duration is affected by:
and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
 2. Snapshot creation time (so-called synchronous phase), addressed by 
asynchronous snapshots (mentioned [above]({{<
ref "#the-embeddedrocksdbstatebackend">}}))
-4. Snapshot upload time (asynchronous phase)
+3. Snapshot upload time (asynchronous phase)
 
 Upload time can be decreased by [incremental checkpoints]({{< ref 
"#incremental-checkpoints" >}}).
 However, most incremental state backends perform some form of compaction 
periodically, which results 

(flink-connector-kafka) branch main updated: [FLINK-32416] Fix flaky tests by ensuring test utilities produce records with consistency and cleanup notify no more splits to ensure it is sent. This clos

2024-01-17 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
 new cdfa328b [FLINK-32416] Fix flaky tests by ensuring test utilities 
produce records with consistency and cleanup notify no more splits to ensure it 
is sent. This closes #79
cdfa328b is described below

commit cdfa328b5ec34d711ae2c9e93de6de7565fd1db6
Author: Mason Chen 
AuthorDate: Wed Jan 17 12:55:29 2024 -0800

[FLINK-32416] Fix flaky tests by ensuring test utilities produce records 
with consistency and cleanup notify no more splits to ensure it is sent. This 
closes #79
---
 .../enumerator/DynamicKafkaSourceEnumerator.java   | 49 --
 .../enumerator/StoppableKafkaEnumContextProxy.java | 29 ++---
 .../source/reader/DynamicKafkaSourceReader.java|  4 +-
 .../DynamicKafkaSourceEnumeratorTest.java  |  6 ++-
 .../StoppableKafkaEnumContextProxyTest.java|  3 +-
 .../kafka/DynamicKafkaSourceTestHelper.java|  4 +-
 6 files changed, 63 insertions(+), 32 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
index cce8ab28..e14a36d9 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
@@ -96,6 +96,7 @@ public class DynamicKafkaSourceEnumerator
 private int kafkaMetadataServiceDiscoveryFailureCount;
 private Map> latestClusterTopicsMap;
 private Set latestKafkaStreams;
+private boolean firstDiscoveryComplete;
 
 public DynamicKafkaSourceEnumerator(
 KafkaStreamSubscriber kafkaStreamSubscriber,
@@ -151,6 +152,7 @@ public class DynamicKafkaSourceEnumerator
 
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD,
 Integer::parseInt);
 this.kafkaMetadataServiceDiscoveryFailureCount = 0;
+this.firstDiscoveryComplete = false;
 
 this.kafkaMetadataService = kafkaMetadataService;
 this.stoppableKafkaEnumContextProxyFactory = 
stoppableKafkaEnumContextProxyFactory;
@@ -212,32 +214,27 @@ public class DynamicKafkaSourceEnumerator
 
 private void handleNoMoreSplits() {
 if (Boundedness.BOUNDED.equals(boundedness)) {
-enumContext.runInCoordinatorThread(
-() -> {
-boolean allEnumeratorsHaveSignalledNoMoreSplits = true;
-for (StoppableKafkaEnumContextProxy context :
-clusterEnumContextMap.values()) {
-allEnumeratorsHaveSignalledNoMoreSplits =
-allEnumeratorsHaveSignalledNoMoreSplits
-&& context.isNoMoreSplits();
-}
-
-if (allEnumeratorsHaveSignalledNoMoreSplits) {
-logger.info(
-"Signal no more splits to all readers: {}",
-enumContext.registeredReaders().keySet());
-enumContext
-.registeredReaders()
-.keySet()
-.forEach(enumContext::signalNoMoreSplits);
-}
-});
+boolean allEnumeratorsHaveSignalledNoMoreSplits = true;
+for (StoppableKafkaEnumContextProxy context : 
clusterEnumContextMap.values()) {
+allEnumeratorsHaveSignalledNoMoreSplits =
+allEnumeratorsHaveSignalledNoMoreSplits && 
context.isNoMoreSplits();
+}
+
+if (firstDiscoveryComplete && 
allEnumeratorsHaveSignalledNoMoreSplits) {
+logger.info(
+"Signal no more splits to all readers: {}",
+enumContext.registeredReaders().keySet());
+
enumContext.registeredReaders().keySet().forEach(enumContext::signalNoMoreSplits);
+} else {
+logger.info("Not ready to notify no more splits to readers.");
+}
 }
 }
 
 // --- private methods for metadata discovery ---
 
 private void onHandleSubscribedStreamsFetch(Set 
fetchedKafkaStreams, Throwable t) {
+firstDiscoveryComplete = true;
 Set handledFetchKafkaStreams =
 

(flink) branch master updated: [FLINK-34070][test] Adds dedicated tests in MiniClusterITCase for scenarios where there are not enough slots available.

2024-01-17 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 024ed963ed8 [FLINK-34070][test] Adds dedicated tests in 
MiniClusterITCase for scenarios where there are not enough slots available.
024ed963ed8 is described below

commit 024ed963ed8358ae78e728108ac6c95046924f67
Author: Matthias Pohl 
AuthorDate: Fri Jan 12 16:57:40 2024 +0100

[FLINK-34070][test] Adds dedicated tests in MiniClusterITCase for scenarios 
where there are not enough slots available.
---
 .../runtime/minicluster/MiniClusterITCase.java | 81 +++---
 1 file changed, 57 insertions(+), 24 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index ba3ca9b5237..e527dda14c0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -46,6 +47,7 @@ import 
org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable;
 
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -100,43 +102,66 @@ class MiniClusterITCase {
 }
 
 @Test
-void testHandleStreamingJobsWhenNotEnoughSlot() {
-final JobVertex vertex1 = new JobVertex("Test Vertex1");
-vertex1.setParallelism(1);
-vertex1.setMaxParallelism(1);
-vertex1.setInvokableClass(BlockingNoOpInvokable.class);
+void testHandlingNotEnoughSlotsThroughTimeout() throws Exception {
+final Configuration config = new Configuration();
 
-final JobVertex vertex2 = new JobVertex("Test Vertex2");
-vertex2.setParallelism(1);
-vertex2.setMaxParallelism(1);
-vertex2.setInvokableClass(BlockingNoOpInvokable.class);
+// the slot timeout needs to be high enough to avoid causing 
TimeoutException
+final Duration slotRequestTimeout = Duration.ofMillis(100);
 
-vertex2.connectNewDataSetAsInput(
-vertex1, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
+// this triggers the failure for the default scheduler
+config.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 
slotRequestTimeout.toMillis());
+// this triggers the failure for the adaptive scheduler
+config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
slotRequestTimeout);
 
-final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex1, 
vertex2);
+// we have to disable sending the slot-unavailable request to allow 
for the timeout to kick
+// in
+config.set(
+ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY, 
Duration.ofNanos(Long.MAX_VALUE));
 
-assertThatThrownBy(() -> runHandleJobsWhenNotEnoughSlots(jobGraph))
-.isInstanceOf(JobExecutionException.class)
-.hasRootCauseInstanceOf(NoResourceAvailableException.class)
-.hasMessageContaining("Job execution failed");
+tryRunningJobWithoutEnoughSlots(config);
 }
 
-private void runHandleJobsWhenNotEnoughSlots(final JobGraph jobGraph) 
throws Exception {
-final Configuration configuration = new Configuration();
+@Test
+// The AdaptiveScheduler is supposed to work with the resources that are 
available.
+// That is why there is no resource allocation abort request supported.
+@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler")
+void testHandlingNotEnoughSlotsThroughEarlyAbortRequest() throws Exception 
{
+final Configuration config = new Configuration();
 
 // the slot timeout needs to be high enough to avoid causing 
TimeoutException
-Duration slotRequestTimeout = Duration.ofNanos(Long.MAX_VALUE);
+final Duration slotRequestTimeout = Duration.ofNanos(Long.MAX_VALUE);
 
 // this triggers the failure for the default scheduler
-configuration.setLong(
-JobManagerOptions.SLOT_REQUEST_TIMEOUT, 
slotRequestTimeout.toMillis());
+

(flink) branch master updated: [FLINK-34097] Remove JobMasterGateway#requestJobDetails

2024-01-17 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1ffb48f [FLINK-34097] Remove JobMasterGateway#requestJobDetails
1ffb48f is described below

commit 1ffb48f658b699702357921a48e914d13caf
Author: Chesnay Schepler 
AuthorDate: Mon Jan 15 16:20:55 2024 +0100

[FLINK-34097] Remove JobMasterGateway#requestJobDetails
---
 .../java/org/apache/flink/runtime/jobmaster/JobMaster.java |  6 --
 .../apache/flink/runtime/jobmaster/JobMasterGateway.java   |  9 -
 .../org/apache/flink/runtime/scheduler/SchedulerBase.java  |  7 ---
 .../org/apache/flink/runtime/scheduler/SchedulerNG.java|  3 ---
 .../runtime/scheduler/adaptive/AdaptiveScheduler.java  |  6 --
 .../runtime/jobmaster/utils/TestingJobMasterGateway.java   | 14 --
 .../jobmaster/utils/TestingJobMasterGatewayBuilder.java| 12 ++--
 .../apache/flink/runtime/scheduler/TestingSchedulerNG.java |  7 ---
 8 files changed, 10 insertions(+), 54 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index bfea710db66..59455b787a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -71,7 +71,6 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
@@ -861,11 +860,6 @@ public class JobMaster extends 
FencedRpcEndpoint
 return resourceManagerHeartbeatManager.requestHeartbeat(resourceID, 
null);
 }
 
-@Override
-public CompletableFuture requestJobDetails(Time timeout) {
-return 
CompletableFuture.completedFuture(schedulerNG.requestJobDetails());
-}
-
 @Override
 public CompletableFuture requestJobStatus(Time timeout) {
 return 
CompletableFuture.completedFuture(schedulerNG.requestJobStatus());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 6c1b79568a8..02c3c7d501a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -39,7 +39,6 @@ import 
org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -183,14 +182,6 @@ public interface JobMasterGateway
  */
 CompletableFuture heartbeatFromResourceManager(final ResourceID 
resourceID);
 
-/**
- * Request the details of the executed job.
- *
- * @param timeout for the rpc call
- * @return Future details of the executed job
- */
-CompletableFuture requestJobDetails(@RpcTimeout Time timeout);
-
 /**
  * Requests the current job status.
  *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index c11d7b2ca86..7f4ba383e43 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -78,7 +78,6 @@ import 
org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import 

(flink) branch master updated: [FLINK-34084][config] Deprecate unused configuration in BinaryInput/OutputFormat and FileInput/OutputFormat

2024-01-17 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 4559b851b22 [FLINK-34084][config] Deprecate unused configuration in 
BinaryInput/OutputFormat and FileInput/OutputFormat
4559b851b22 is described below

commit 4559b851b22d6ffa197aa311adbea15b21a43c66
Author: sxnan 
AuthorDate: Tue Jan 9 17:37:55 2024 +0800

[FLINK-34084][config] Deprecate unused configuration in 
BinaryInput/OutputFormat and FileInput/OutputFormat

This closes #24090
---
 .../src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java | 2 +-
 .../main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java| 2 +-
 .../src/main/java/org/apache/flink/api/common/io/FileInputFormat.java   | 1 +
 .../src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java  | 2 +-
 4 files changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index 75f4bb2a7fa..6d7c7fcdbb9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -61,7 +61,7 @@ public abstract class BinaryInputFormat extends 
FileInputFormat
 private static final Logger LOG = 
LoggerFactory.getLogger(BinaryInputFormat.class);
 
 /** The config parameter which defines the fixed length of a record. */
-public static final String BLOCK_SIZE_PARAMETER_KEY = "input.block_size";
+@Deprecated public static final String BLOCK_SIZE_PARAMETER_KEY = 
"input.block_size";
 
 public static final long NATIVE_BLOCK_SIZE = Long.MIN_VALUE;
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
index 4a58be3aa28..f2dc9ca8883 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
@@ -33,7 +33,7 @@ public abstract class BinaryOutputFormat extends 
FileOutputFormat {
 private static final long serialVersionUID = 1L;
 
 /** The config parameter which defines the fixed length of a record. */
-public static final String BLOCK_SIZE_PARAMETER_KEY = "output.block_size";
+@Deprecated public static final String BLOCK_SIZE_PARAMETER_KEY = 
"output.block_size";
 
 public static final long NATIVE_BLOCK_SIZE = Long.MIN_VALUE;
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 04844512001..b8c57eb6615 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -1140,5 +1140,6 @@ public abstract class FileInputFormat extends 
RichInputFormat extends 
RichOutputFormat
 private static final Logger LOG = 
LoggerFactory.getLogger(FileOutputFormat.class);
 
 /** The key under which the name of the target path is stored in the 
configuration. */
-public static final String FILE_PARAMETER_KEY = "flink.output.file";
+@Deprecated public static final String FILE_PARAMETER_KEY = 
"flink.output.file";
 
 /** The path of the file to be written. */
 protected Path outputFilePath;