(flink) branch master updated: [FLINK-34072][scripts] Replace java command to JAVA_RUN in config.sh
This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e7c8cd1562e [FLINK-34072][scripts] Replace java command to JAVA_RUN in config.sh e7c8cd1562e is described below commit e7c8cd1562ebd45c1f7b48f519a11c6cd4fdf100 Author: Yu Chen AuthorDate: Sun Jan 14 20:57:05 2024 +0800 [FLINK-34072][scripts] Replace java command to JAVA_RUN in config.sh --- flink-dist/src/main/flink-bin/bin/config.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index dcd48a256f7..363e17a375f 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -335,7 +335,7 @@ if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then # Remove leading and ending double quotes (if present) of value FLINK_ENV_JAVA_OPTS="-XX:+IgnoreUnrecognizedVMOptions $( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )" -JAVA_SPEC_VERSION=`${JAVA_HOME}/bin/java -XshowSettings:properties 2>&1 | grep "java.specification.version" | cut -d "=" -f 2 | tr -d '[:space:]' | rev | cut -d "." -f 1 | rev` +JAVA_SPEC_VERSION=`"${JAVA_RUN}" -XshowSettings:properties 2>&1 | grep "java.specification.version" | cut -d "=" -f 2 | tr -d '[:space:]' | rev | cut -d "." -f 1 | rev` if [[ $(( $JAVA_SPEC_VERSION > 17 )) == 1 ]]; then # set security manager property to allow calls to System.setSecurityManager() at runtime FLINK_ENV_JAVA_OPTS="$FLINK_ENV_JAVA_OPTS -Djava.security.manager=allow"
(flink) branch master updated: [FLINK-34119][doc] Improve description about changelog in document
This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 2ec8f8157f9 [FLINK-34119][doc] Improve description about changelog in document 2ec8f8157f9 is described below commit 2ec8f8157f95a79ee94d609657f9b08f8f0b6a26 Author: Hangxiang Yu AuthorDate: Sat Jan 13 14:50:36 2024 +0800 [FLINK-34119][doc] Improve description about changelog in document --- docs/content.zh/docs/deployment/config.md| 3 +-- docs/content.zh/docs/ops/state/state_backends.md | 13 +++-- docs/content/docs/deployment/config.md | 3 +-- docs/content/docs/ops/state/state_backends.md| 15 --- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md index 34d04f733e5..cf0740bf8de 100644 --- a/docs/content.zh/docs/deployment/config.md +++ b/docs/content.zh/docs/deployment/config.md @@ -370,8 +370,7 @@ Advanced options to tune RocksDB and RocksDB checkpoints. ### State Changelog Options Please refer to [State Backends]({{< ref "docs/ops/state/state_backends#enabling-changelog" >}}) for information on -using State Changelog. {{< hint warning >}} The feature is in experimental status. {{< /hint >}} {{< -generated/state_backend_changelog_section >}} +using State Changelog. FileSystem-based Changelog options diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md index eda37dada7e..5d7d4f92b1c 100644 --- a/docs/content.zh/docs/ops/state/state_backends.md +++ b/docs/content.zh/docs/ops/state/state_backends.md @@ -349,10 +349,6 @@ Python API 中尚不支持该特性。 ## 开启 Changelog -{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}} - -{{< hint warning >}} 开启 Changelog 可能会给您的应用带来性能损失。(见下文) {{< /hint >}} - ### 介绍 @@ -372,16 +368,21 @@ Changelog 是一项旨在减少 checkpointing 时间的功能,因此也可以 开启 Changelog 功能之后,Flink 会不断上传状态变更并形成 changelog。创建 checkpoint 时,只有 changelog 中的相关部分需要上传。而配置的状态后端则会定期在后台进行快照,快照成功上传后,相关的changelog 将会被截断。 -基于此,异步阶段的持续时间减少(另外因为不需要将数据刷新到磁盘,同步阶段持续时间也减少了),特别是长尾延迟得到了改善。 +基于此,异步阶段的持续时间减少(另外因为不需要将数据刷新到磁盘,同步阶段持续时间也减少了),特别是长尾延迟得到了改善。同时,还可以获得一些其他好处: +1. 更稳定、更低的端到端时延。 +2. Failover 后数据重放更少。 +3. 资源利用更加稳定。 但是,资源使用会变得更高: - 将会在 DFS 上创建更多文件 -- 将可能在 DFS 上残留更多文件(这将在 FLINK-25511 和 FLINK-25512 之后的新版本中被解决) - 将使用更多的 IO 带宽用来上传状态变更 - 将使用更多 CPU 资源来序列化状态变更 - Task Managers 将会使用更多内存来缓存状态变更 +值得注意的是虽然 Changelog 增加了少量的日常 CPU 和网络带宽资源使用, +但会降低峰值的 CPU 和网络带宽使用量。 + 另一项需要考虑的事情是恢复时间。取决于 `state.backend.changelog.periodic-materialize.interval` 的设置,changelog 可能会变得冗长,因此重放会花费更多时间。即使这样,恢复时间加上 checkpoint 持续时间仍然可能低于不开启 changelog 功能的时间,从而在故障恢复的情况下也能提供更低的端到端延迟。当然,取决于上述时间的实际比例,有效恢复时间也有可能会增加。 有关更多详细信息,请参阅 [FLIP-158](https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints)。 diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md index cbdc4f25a77..c4e70ba7235 100644 --- a/docs/content/docs/deployment/config.md +++ b/docs/content/docs/deployment/config.md @@ -372,8 +372,7 @@ Advanced options to tune RocksDB and RocksDB checkpoints. ### State Changelog Options Please refer to [State Backends]({{< ref "docs/ops/state/state_backends#enabling-changelog" >}}) for information on -using State Changelog. {{< hint warning >}} The feature is in experimental status. {{< /hint >}} {{< -generated/state_backend_changelog_section >}} +using State Changelog. FileSystem-based Changelog options diff --git a/docs/content/docs/ops/state/state_backends.md b/docs/content/docs/ops/state/state_backends.md index b645eefcd8b..bd04491977f 100644 --- a/docs/content/docs/ops/state/state_backends.md +++ b/docs/content/docs/ops/state/state_backends.md @@ -346,10 +346,6 @@ Still not supported in Python API. ## Enabling Changelog -{{< hint warning >}} This feature is in experimental status. {{< /hint >}} - -{{< hint warning >}} Enabling Changelog may have a negative performance impact on your application (see below). {{< /hint >}} - ### Introduction Changelog is a feature that aims to decrease checkpointing time and, therefore, end-to-end latency in exactly-once mode. @@ -361,7 +357,7 @@ Most commonly, checkpoint duration is affected by: and [Buffer debloating]({{< ref "docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}}) 2. Snapshot creation time (so-called synchronous phase), addressed by asynchronous snapshots (mentioned [above]({{< ref "#the-embeddedrocksdbstatebackend">}})) -4. Snapshot upload time (asynchronous phase) +3. Snapshot upload time (asynchronous phase) Upload time can be decreased by [incremental checkpoints]({{< ref "#incremental-checkpoints" >}}). However, most incremental state backends perform some form of compaction periodically, which results
(flink-connector-kafka) branch main updated: [FLINK-32416] Fix flaky tests by ensuring test utilities produce records with consistency and cleanup notify no more splits to ensure it is sent. This clos
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git The following commit(s) were added to refs/heads/main by this push: new cdfa328b [FLINK-32416] Fix flaky tests by ensuring test utilities produce records with consistency and cleanup notify no more splits to ensure it is sent. This closes #79 cdfa328b is described below commit cdfa328b5ec34d711ae2c9e93de6de7565fd1db6 Author: Mason Chen AuthorDate: Wed Jan 17 12:55:29 2024 -0800 [FLINK-32416] Fix flaky tests by ensuring test utilities produce records with consistency and cleanup notify no more splits to ensure it is sent. This closes #79 --- .../enumerator/DynamicKafkaSourceEnumerator.java | 49 -- .../enumerator/StoppableKafkaEnumContextProxy.java | 29 ++--- .../source/reader/DynamicKafkaSourceReader.java| 4 +- .../DynamicKafkaSourceEnumeratorTest.java | 6 ++- .../StoppableKafkaEnumContextProxyTest.java| 3 +- .../kafka/DynamicKafkaSourceTestHelper.java| 4 +- 6 files changed, 63 insertions(+), 32 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java index cce8ab28..e14a36d9 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java @@ -96,6 +96,7 @@ public class DynamicKafkaSourceEnumerator private int kafkaMetadataServiceDiscoveryFailureCount; private Map> latestClusterTopicsMap; private Set latestKafkaStreams; +private boolean firstDiscoveryComplete; public DynamicKafkaSourceEnumerator( KafkaStreamSubscriber kafkaStreamSubscriber, @@ -151,6 +152,7 @@ public class DynamicKafkaSourceEnumerator DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD, Integer::parseInt); this.kafkaMetadataServiceDiscoveryFailureCount = 0; +this.firstDiscoveryComplete = false; this.kafkaMetadataService = kafkaMetadataService; this.stoppableKafkaEnumContextProxyFactory = stoppableKafkaEnumContextProxyFactory; @@ -212,32 +214,27 @@ public class DynamicKafkaSourceEnumerator private void handleNoMoreSplits() { if (Boundedness.BOUNDED.equals(boundedness)) { -enumContext.runInCoordinatorThread( -() -> { -boolean allEnumeratorsHaveSignalledNoMoreSplits = true; -for (StoppableKafkaEnumContextProxy context : -clusterEnumContextMap.values()) { -allEnumeratorsHaveSignalledNoMoreSplits = -allEnumeratorsHaveSignalledNoMoreSplits -&& context.isNoMoreSplits(); -} - -if (allEnumeratorsHaveSignalledNoMoreSplits) { -logger.info( -"Signal no more splits to all readers: {}", -enumContext.registeredReaders().keySet()); -enumContext -.registeredReaders() -.keySet() -.forEach(enumContext::signalNoMoreSplits); -} -}); +boolean allEnumeratorsHaveSignalledNoMoreSplits = true; +for (StoppableKafkaEnumContextProxy context : clusterEnumContextMap.values()) { +allEnumeratorsHaveSignalledNoMoreSplits = +allEnumeratorsHaveSignalledNoMoreSplits && context.isNoMoreSplits(); +} + +if (firstDiscoveryComplete && allEnumeratorsHaveSignalledNoMoreSplits) { +logger.info( +"Signal no more splits to all readers: {}", +enumContext.registeredReaders().keySet()); + enumContext.registeredReaders().keySet().forEach(enumContext::signalNoMoreSplits); +} else { +logger.info("Not ready to notify no more splits to readers."); +} } } // --- private methods for metadata discovery --- private void onHandleSubscribedStreamsFetch(Set fetchedKafkaStreams, Throwable t) { +firstDiscoveryComplete = true; Set handledFetchKafkaStreams =
(flink) branch master updated: [FLINK-34070][test] Adds dedicated tests in MiniClusterITCase for scenarios where there are not enough slots available.
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 024ed963ed8 [FLINK-34070][test] Adds dedicated tests in MiniClusterITCase for scenarios where there are not enough slots available. 024ed963ed8 is described below commit 024ed963ed8358ae78e728108ac6c95046924f67 Author: Matthias Pohl AuthorDate: Fri Jan 12 16:57:40 2024 +0100 [FLINK-34070][test] Adds dedicated tests in MiniClusterITCase for scenarios where there are not enough slots available. --- .../runtime/minicluster/MiniClusterITCase.java | 81 +++--- 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index ba3ca9b5237..e527dda14c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -46,6 +47,7 @@ import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -100,43 +102,66 @@ class MiniClusterITCase { } @Test -void testHandleStreamingJobsWhenNotEnoughSlot() { -final JobVertex vertex1 = new JobVertex("Test Vertex1"); -vertex1.setParallelism(1); -vertex1.setMaxParallelism(1); -vertex1.setInvokableClass(BlockingNoOpInvokable.class); +void testHandlingNotEnoughSlotsThroughTimeout() throws Exception { +final Configuration config = new Configuration(); -final JobVertex vertex2 = new JobVertex("Test Vertex2"); -vertex2.setParallelism(1); -vertex2.setMaxParallelism(1); -vertex2.setInvokableClass(BlockingNoOpInvokable.class); +// the slot timeout needs to be high enough to avoid causing TimeoutException +final Duration slotRequestTimeout = Duration.ofMillis(100); -vertex2.connectNewDataSetAsInput( -vertex1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); +// this triggers the failure for the default scheduler +config.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, slotRequestTimeout.toMillis()); +// this triggers the failure for the adaptive scheduler +config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, slotRequestTimeout); -final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex1, vertex2); +// we have to disable sending the slot-unavailable request to allow for the timeout to kick +// in +config.set( +ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY, Duration.ofNanos(Long.MAX_VALUE)); -assertThatThrownBy(() -> runHandleJobsWhenNotEnoughSlots(jobGraph)) -.isInstanceOf(JobExecutionException.class) -.hasRootCauseInstanceOf(NoResourceAvailableException.class) -.hasMessageContaining("Job execution failed"); +tryRunningJobWithoutEnoughSlots(config); } -private void runHandleJobsWhenNotEnoughSlots(final JobGraph jobGraph) throws Exception { -final Configuration configuration = new Configuration(); +@Test +// The AdaptiveScheduler is supposed to work with the resources that are available. +// That is why there is no resource allocation abort request supported. +@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") +void testHandlingNotEnoughSlotsThroughEarlyAbortRequest() throws Exception { +final Configuration config = new Configuration(); // the slot timeout needs to be high enough to avoid causing TimeoutException -Duration slotRequestTimeout = Duration.ofNanos(Long.MAX_VALUE); +final Duration slotRequestTimeout = Duration.ofNanos(Long.MAX_VALUE); // this triggers the failure for the default scheduler -configuration.setLong( -JobManagerOptions.SLOT_REQUEST_TIMEOUT, slotRequestTimeout.toMillis()); +
(flink) branch master updated: [FLINK-34097] Remove JobMasterGateway#requestJobDetails
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1ffb48f [FLINK-34097] Remove JobMasterGateway#requestJobDetails 1ffb48f is described below commit 1ffb48f658b699702357921a48e914d13caf Author: Chesnay Schepler AuthorDate: Mon Jan 15 16:20:55 2024 +0100 [FLINK-34097] Remove JobMasterGateway#requestJobDetails --- .../java/org/apache/flink/runtime/jobmaster/JobMaster.java | 6 -- .../apache/flink/runtime/jobmaster/JobMasterGateway.java | 9 - .../org/apache/flink/runtime/scheduler/SchedulerBase.java | 7 --- .../org/apache/flink/runtime/scheduler/SchedulerNG.java| 3 --- .../runtime/scheduler/adaptive/AdaptiveScheduler.java | 6 -- .../runtime/jobmaster/utils/TestingJobMasterGateway.java | 14 -- .../jobmaster/utils/TestingJobMasterGatewayBuilder.java| 12 ++-- .../apache/flink/runtime/scheduler/TestingSchedulerNG.java | 7 --- 8 files changed, 10 insertions(+), 54 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index bfea710db66..59455b787a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -71,7 +71,6 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; @@ -861,11 +860,6 @@ public class JobMaster extends FencedRpcEndpoint return resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null); } -@Override -public CompletableFuture requestJobDetails(Time timeout) { -return CompletableFuture.completedFuture(schedulerNG.requestJobDetails()); -} - @Override public CompletableFuture requestJobStatus(Time timeout) { return CompletableFuture.completedFuture(schedulerNG.requestJobStatus()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 6c1b79568a8..02c3c7d501a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -183,14 +182,6 @@ public interface JobMasterGateway */ CompletableFuture heartbeatFromResourceManager(final ResourceID resourceID); -/** - * Request the details of the executed job. - * - * @param timeout for the rpc call - * @return Future details of the executed job - */ -CompletableFuture requestJobDetails(@RpcTimeout Time timeout); - /** * Requests the current job status. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index c11d7b2ca86..7f4ba383e43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -78,7 +78,6 @@ import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import
(flink) branch master updated: [FLINK-34084][config] Deprecate unused configuration in BinaryInput/OutputFormat and FileInput/OutputFormat
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4559b851b22 [FLINK-34084][config] Deprecate unused configuration in BinaryInput/OutputFormat and FileInput/OutputFormat 4559b851b22 is described below commit 4559b851b22d6ffa197aa311adbea15b21a43c66 Author: sxnan AuthorDate: Tue Jan 9 17:37:55 2024 +0800 [FLINK-34084][config] Deprecate unused configuration in BinaryInput/OutputFormat and FileInput/OutputFormat This closes #24090 --- .../src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java | 2 +- .../main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java| 2 +- .../src/main/java/org/apache/flink/api/common/io/FileInputFormat.java | 1 + .../src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java index 75f4bb2a7fa..6d7c7fcdbb9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java @@ -61,7 +61,7 @@ public abstract class BinaryInputFormat extends FileInputFormat private static final Logger LOG = LoggerFactory.getLogger(BinaryInputFormat.class); /** The config parameter which defines the fixed length of a record. */ -public static final String BLOCK_SIZE_PARAMETER_KEY = "input.block_size"; +@Deprecated public static final String BLOCK_SIZE_PARAMETER_KEY = "input.block_size"; public static final long NATIVE_BLOCK_SIZE = Long.MIN_VALUE; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java index 4a58be3aa28..f2dc9ca8883 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java @@ -33,7 +33,7 @@ public abstract class BinaryOutputFormat extends FileOutputFormat { private static final long serialVersionUID = 1L; /** The config parameter which defines the fixed length of a record. */ -public static final String BLOCK_SIZE_PARAMETER_KEY = "output.block_size"; +@Deprecated public static final String BLOCK_SIZE_PARAMETER_KEY = "output.block_size"; public static final long NATIVE_BLOCK_SIZE = Long.MIN_VALUE; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 04844512001..b8c57eb6615 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -1140,5 +1140,6 @@ public abstract class FileInputFormat extends RichInputFormat extends RichOutputFormat private static final Logger LOG = LoggerFactory.getLogger(FileOutputFormat.class); /** The key under which the name of the target path is stored in the configuration. */ -public static final String FILE_PARAMETER_KEY = "flink.output.file"; +@Deprecated public static final String FILE_PARAMETER_KEY = "flink.output.file"; /** The path of the file to be written. */ protected Path outputFilePath;