(flink) branch release-1.19 updated: [FLINK-35786] Fix NPE BlobServer / shutdownHook
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.19 by this push: new 3693d07db27 [FLINK-35786] Fix NPE BlobServer / shutdownHook 3693d07db27 is described below commit 3693d07db27ba2977b575f9d1c3ecfda5f5613f1 Author: Roman Khachatryan AuthorDate: Mon Jul 8 15:54:58 2024 +0200 [FLINK-35786] Fix NPE BlobServer / shutdownHook --- .../org/apache/flink/runtime/blob/BlobServer.java | 25 ++ 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 2a3c53ab1c1..56df2fb05cd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -97,7 +97,8 @@ public class BlobServer extends Thread private final AtomicLong tempFileCounter = new AtomicLong(0); /** The server socket listening for incoming connections. */ -private final ServerSocket serverSocket; +// can be null if BlobServer is shut down before constructor completion +@Nullable private final ServerSocket serverSocket; /** Blob Server configuration. */ private final Configuration blobServiceConfiguration; @@ -354,10 +355,12 @@ public class BlobServer extends Thread if (shutdownRequested.compareAndSet(false, true)) { Exception exception = null; -try { -this.serverSocket.close(); -} catch (IOException ioe) { -exception = ioe; +if (serverSocket != null) { +try { +this.serverSocket.close(); +} catch (IOException ioe) { +exception = ioe; +} } // wake the thread up, in case it is waiting on some operation @@ -394,10 +397,14 @@ public class BlobServer extends Thread ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); if (LOG.isInfoEnabled()) { -LOG.info( -"Stopped BLOB server at {}:{}", -serverSocket.getInetAddress().getHostAddress(), -getPort()); +if (serverSocket != null) { +LOG.info( +"Stopped BLOB server at {}:{}", +serverSocket.getInetAddress().getHostAddress(), +getPort()); +} else { +LOG.info("Stopped BLOB server before initializing the socket"); +} } ExceptionUtils.tryRethrowIOException(exception);
(flink) branch master updated: [FLINK-35786] Fix NPE BlobServer / shutdownHook
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 050767cda9d [FLINK-35786] Fix NPE BlobServer / shutdownHook 050767cda9d is described below commit 050767cda9de4f41748b0169b28679a87b5c5a11 Author: Roman Khachatryan AuthorDate: Mon Jul 8 15:54:58 2024 +0200 [FLINK-35786] Fix NPE BlobServer / shutdownHook --- .../org/apache/flink/runtime/blob/BlobServer.java | 25 ++ 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 2a3c53ab1c1..56df2fb05cd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -97,7 +97,8 @@ public class BlobServer extends Thread private final AtomicLong tempFileCounter = new AtomicLong(0); /** The server socket listening for incoming connections. */ -private final ServerSocket serverSocket; +// can be null if BlobServer is shut down before constructor completion +@Nullable private final ServerSocket serverSocket; /** Blob Server configuration. */ private final Configuration blobServiceConfiguration; @@ -354,10 +355,12 @@ public class BlobServer extends Thread if (shutdownRequested.compareAndSet(false, true)) { Exception exception = null; -try { -this.serverSocket.close(); -} catch (IOException ioe) { -exception = ioe; +if (serverSocket != null) { +try { +this.serverSocket.close(); +} catch (IOException ioe) { +exception = ioe; +} } // wake the thread up, in case it is waiting on some operation @@ -394,10 +397,14 @@ public class BlobServer extends Thread ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); if (LOG.isInfoEnabled()) { -LOG.info( -"Stopped BLOB server at {}:{}", -serverSocket.getInetAddress().getHostAddress(), -getPort()); +if (serverSocket != null) { +LOG.info( +"Stopped BLOB server at {}:{}", +serverSocket.getInetAddress().getHostAddress(), +getPort()); +} else { +LOG.info("Stopped BLOB server before initializing the socket"); +} } ExceptionUtils.tryRethrowIOException(exception);
(flink) branch master updated (482969aa24d -> 0b0158b4df9)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 482969aa24d [FLINK-35552][runtime] Rework how CheckpointStatsTracker is constructed. new 320d1a5fc5b [FLINK-35742] Don't create RocksDB Column Families if task cancellation is in progress new 0b0158b4df9 [hotfix] Propagate cancellation to RocksDBWriteBatchWrapper during recovery The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/flink/core/fs/CloseableRegistry.java| 14 +-- .../apache/flink/core/fs/ICloseableRegistry.java | 109 + .../streaming/state/RocksDBKeyedStateBackend.java | 6 +- .../state/RocksDBKeyedStateBackendBuilder.java | 6 +- .../streaming/state/RocksDBOperationUtils.java | 21 +++- .../state/RocksDBPriorityQueueSetFactory.java | 6 +- .../state/restore/RocksDBFullRestoreOperation.java | 17 +++- .../streaming/state/restore/RocksDBHandle.java | 21 ++-- .../RocksDBHeapTimersFullRestoreOperation.java | 16 ++- .../RocksDBIncrementalRestoreOperation.java| 21 +++- 10 files changed, 201 insertions(+), 36 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/ICloseableRegistry.java
(flink) 01/02: [FLINK-35742] Don't create RocksDB Column Families if task cancellation is in progress
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 320d1a5fc5b2e2f5b8f55110f4bb0a3b44f1d80d Author: Roman Khachatryan AuthorDate: Tue Mar 26 21:43:14 2024 +0100 [FLINK-35742] Don't create RocksDB Column Families if task cancellation is in progress We observe a lot of TMs stuck for > 30s in RocksDBHandle.registerStateColumnFamilyHandleWithImport which boil down to native calls to create Column Family. This change registers prevents Column Family creation if task cancellation is in progress. --- .../apache/flink/core/fs/CloseableRegistry.java| 14 +--- .../apache/flink/core/fs/ICloseableRegistry.java | 94 ++ .../streaming/state/RocksDBKeyedStateBackend.java | 6 +- .../state/RocksDBKeyedStateBackendBuilder.java | 6 +- .../streaming/state/RocksDBOperationUtils.java | 21 - .../state/RocksDBPriorityQueueSetFactory.java | 6 +- .../state/restore/RocksDBFullRestoreOperation.java | 9 ++- .../streaming/state/restore/RocksDBHandle.java | 21 +++-- .../RocksDBHeapTimersFullRestoreOperation.java | 8 +- .../RocksDBIncrementalRestoreOperation.java| 14 +++- 10 files changed, 166 insertions(+), 33 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java index fbe285d636e..60904328b21 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java @@ -33,21 +33,11 @@ import java.util.Map; import static org.apache.flink.shaded.guava31.com.google.common.collect.Lists.reverse; -/** - * This class allows to register instances of {@link Closeable}, which are all closed if this - * registry is closed. - * - * Registering to an already closed registry will throw an exception and close the provided - * {@link Closeable} - * - * All methods in this class are thread-safe. - * - * This class closes all registered {@link Closeable}s in the reverse registration order. - */ +/** {@link ICloseableRegistry} implementation. */ @Internal public class CloseableRegistry extends AbstractAutoCloseableRegistry -implements Closeable { +implements ICloseableRegistry { private static final Object DUMMY = new Object(); diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ICloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/ICloseableRegistry.java new file mode 100644 index 000..90fd201f341 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/ICloseableRegistry.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; + +import java.io.Closeable; +import java.io.IOException; + +/** + * This class allows to register instances of {@link Closeable}, which are all closed if this + * registry is closed. + * + * Registering to an already closed registry will throw an exception and close the provided + * {@link Closeable} + * + * All methods in this class are thread-safe. + * + * This class closes all registered {@link Closeable}s in the reverse registration order. + */ +@Internal +public interface ICloseableRegistry extends Closeable { + +/** + * Registers a {@link Closeable} with the registry. In case the registry is already closed, this + * method throws an {@link IllegalStateException} and closes the passed {@link Closeable}. + * + * @param closeable Closeable to register. + * @throws IOException exception when the registry was closed before. + */ +void registerCloseable(Closeable closeable) throws IOException; + +/** + * Same as {@link #registerCloseable(Closeable)} but allows to {@link + * #unregisterCloseable(Closeable) unregister} the passed closeable by closing the returned + * closeable. + * + * @param closeable Closeable to register. + * @return another Closeable that
(flink) 02/02: [hotfix] Propagate cancellation to RocksDBWriteBatchWrapper during recovery
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 0b0158b4df97a0a0ae44d43f6c7b6947e8ed433f Author: Roman Khachatryan AuthorDate: Thu Apr 4 19:24:50 2024 +0200 [hotfix] Propagate cancellation to RocksDBWriteBatchWrapper during recovery This change prevents TM shutdown in case when the task is cancelled but recovery takes longer than 30s. --- .../java/org/apache/flink/core/fs/ICloseableRegistry.java | 15 +++ .../state/restore/RocksDBFullRestoreOperation.java| 8 +++- .../restore/RocksDBHeapTimersFullRestoreOperation.java| 8 +++- .../state/restore/RocksDBIncrementalRestoreOperation.java | 11 --- 4 files changed, 37 insertions(+), 5 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ICloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/ICloseableRegistry.java index 90fd201f341..ebdb0f13692 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/ICloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/ICloseableRegistry.java @@ -19,9 +19,11 @@ package org.apache.flink.core.fs; import org.apache.flink.annotation.Internal; +import org.apache.flink.util.ExceptionUtils; import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; /** * This class allows to register instances of {@link Closeable}, which are all closed if this @@ -37,6 +39,19 @@ import java.io.IOException; @Internal public interface ICloseableRegistry extends Closeable { +static Closeable asCloseable(AutoCloseable autoCloseable) { +AtomicBoolean closed = new AtomicBoolean(false); +return () -> { +if (closed.compareAndSet(false, true)) { +try { +autoCloseable.close(); +} catch (Exception e) { +ExceptionUtils.rethrowIOException(e); +} +} +}; +} + /** * Registers a {@link Closeable} with the registry. In case the registry is already closed, this * method throws an {@link IllegalStateException} and closes the passed {@link Closeable}. diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java index 26414f1469b..fdaf21fb180 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java @@ -43,6 +43,7 @@ import org.rocksdb.RocksDBException; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.Collection; @@ -51,6 +52,8 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import static org.apache.flink.core.fs.ICloseableRegistry.asCloseable; + /** Encapsulates the process of restoring a RocksDB instance from a full snapshot. */ public class RocksDBFullRestoreOperation implements RocksDBRestoreOperation { private final FullSnapshotRestoreOperation savepointRestoreOperation; @@ -143,7 +146,10 @@ public class RocksDBFullRestoreOperation implements RocksDBRestoreOperation { throws IOException, RocksDBException, StateMigrationException { // for all key-groups in the current state handle... try (RocksDBWriteBatchWrapper writeBatchWrapper = -new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), writeBatchSize)) { +new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), writeBatchSize); +Closeable ignored = + cancelStreamRegistryForRestore.registerCloseableTemporarily( +asCloseable(writeBatchWrapper))) { ColumnFamilyHandle handle = null; while (keyGroups.hasNext()) { KeyGroup keyGroup = keyGroups.next(); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java index ef7483fbf43..c4eed5ff3d4 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java +++ b/flink-state-backe
(flink) 01/02: [FLINK-35501] Replace inheritance with encapsulation for RocksDBStateDataTransfer*
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 5855f5354985cabea6f01b6b2effaaa8cfcbee55 Author: Roman Khachatryan AuthorDate: Wed May 29 21:46:11 2024 +0200 [FLINK-35501] Replace inheritance with encapsulation for RocksDBStateDataTransfer* --- .../state/RocksDBKeyedStateBackendBuilder.java | 4 +- .../streaming/state/RocksDBStateDataTransfer.java | 47 .../state/RocksDBStateDataTransferHelper.java | 63 ++ .../streaming/state/RocksDBStateDownloader.java| 22 ++-- .../streaming/state/RocksDBStateUploader.java | 20 +-- .../RocksDBIncrementalRestoreOperation.java| 4 +- 6 files changed, 105 insertions(+), 55 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 1a63950305d..e0b1359060b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -585,7 +585,9 @@ public class RocksDBKeyedStateBackendBuilder extends AbstractKeyedStateBacken RocksDBSnapshotStrategyBase checkpointSnapshotStrategy; RocksDBStateUploader stateUploader = injectRocksDBStateUploader == null -? new RocksDBStateUploader(numberOfTransferingThreads) +? new RocksDBStateUploader( +RocksDBStateDataTransferHelper.forThreadNum( +numberOfTransferingThreads)) : injectRocksDBStateUploader; if (enableIncrementalCheckpointing) { checkpointSnapshotStrategy = diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransfer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransfer.java deleted file mode 100644 index 246332671ce..000 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransfer.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.util.concurrent.ExecutorThreadFactory; - -import java.io.Closeable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.apache.flink.util.concurrent.Executors.newDirectExecutorService; - -/** Data transfer base class for {@link RocksDBKeyedStateBackend}. */ -class RocksDBStateDataTransfer implements Closeable { - -protected final ExecutorService executorService; - -RocksDBStateDataTransfer(int threadNum) { -if (threadNum > 1) { -executorService = -Executors.newFixedThreadPool( -threadNum, new ExecutorThreadFactory("Flink-RocksDBStateDataTransfer")); -} else { -executorService = newDirectExecutorService(); -} -} - -@Override -public void close() { -executorService.shutdownNow(); -} -} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferHelper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferHelper.java new file mode 100644 index 000..70f31b52616 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferHelper.java @@ -0,0 +1,63
(flink) branch master updated (ceb4db4d88a -> 9708f9fd657)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from ceb4db4d88a [FLINK-35532][Runtime/Web Frontend] Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard new 5855f535498 [FLINK-35501] Replace inheritance with encapsulation for RocksDBStateDataTransfer* new 9708f9fd657 [FLINK-35501] Use common IO thread pool for RocksDB data transfer The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../generated/expert_rocksdb_section.html | 2 +- .../generated/rocksdb_configuration.html | 2 +- .../state/RocksDBKeyedStateBackendBuilder.java | 7 ++- .../contrib/streaming/state/RocksDBOptions.java| 6 +- .../streaming/state/RocksDBStateDataTransfer.java | 47 -- .../state/RocksDBStateDataTransferHelper.java | 72 ++ .../streaming/state/RocksDBStateDownloader.java| 22 ++- .../streaming/state/RocksDBStateUploader.java | 20 +- .../RocksDBIncrementalRestoreOperation.java| 11 +++- .../snapshot/RocksDBSnapshotStrategyBase.java | 2 +- .../snapshot/RocksIncrementalSnapshotStrategy.java | 3 +- .../snapshot/RocksNativeFullSnapshotStrategy.java | 3 +- .../state/EmbeddedRocksDBStateBackendTest.java | 2 +- 13 files changed, 135 insertions(+), 64 deletions(-) delete mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransfer.java create mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferHelper.java
(flink) 02/02: [FLINK-35501] Use common IO thread pool for RocksDB data transfer
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 9708f9fd65751296b3b0377c964207b630958259 Author: Roman Khachatryan AuthorDate: Wed May 29 22:30:50 2024 +0200 [FLINK-35501] Use common IO thread pool for RocksDB data transfer Currently, each RocksDB state backend creates an executor backed by a thread pool. This makes it difficult to control the total number of threads per TM because it might have at least one task per slot and theoretically, many state backends per task (because of chaining). Additionally, using a common thread pool allows to indirectly control the load on the underlying DFS (e.g. the total number of requests to S3 from a TM). This change allows to control the total number of data transfers per TM. --- docs/layouts/shortcodes/generated/expert_rocksdb_section.html| 2 +- docs/layouts/shortcodes/generated/rocksdb_configuration.html | 2 +- .../contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java | 7 --- .../org/apache/flink/contrib/streaming/state/RocksDBOptions.java | 6 +- .../contrib/streaming/state/RocksDBStateDataTransferHelper.java | 9 + .../state/restore/RocksDBIncrementalRestoreOperation.java| 9 +++-- .../streaming/state/snapshot/RocksDBSnapshotStrategyBase.java| 2 +- .../state/snapshot/RocksIncrementalSnapshotStrategy.java | 3 ++- .../state/snapshot/RocksNativeFullSnapshotStrategy.java | 3 ++- .../contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java | 2 +- 10 files changed, 33 insertions(+), 12 deletions(-) diff --git a/docs/layouts/shortcodes/generated/expert_rocksdb_section.html b/docs/layouts/shortcodes/generated/expert_rocksdb_section.html index 47e4edce3ce..eac1d574e26 100644 --- a/docs/layouts/shortcodes/generated/expert_rocksdb_section.html +++ b/docs/layouts/shortcodes/generated/expert_rocksdb_section.html @@ -12,7 +12,7 @@ state.backend.rocksdb.checkpoint.transfer.thread.num 4 Integer -The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend. +The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend.If negative, the common (TM) IO thread pool is used (see cluster.io-pool.size) state.backend.rocksdb.localdir diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_configuration.html index b6ad67234e0..e9cca6e415a 100644 --- a/docs/layouts/shortcodes/generated/rocksdb_configuration.html +++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html @@ -12,7 +12,7 @@ state.backend.rocksdb.checkpoint.transfer.thread.num 4 Integer -The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend. +The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend.If negative, the common (TM) IO thread pool is used (see cluster.io-pool.size) state.backend.rocksdb.localdir diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index e0b1359060b..bcdb32f4e79 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -536,7 +536,8 @@ public class RocksDBKeyedStateBackendBuilder extends AbstractKeyedStateBacken overlapFractionThreshold, useIngestDbRestoreMode, incrementalRestoreAsyncCompactAfterRescale, -rescalingUseDeleteFilesInRange); +rescalingUseDeleteFilesInRange, +ioExecutor); } else if (priorityQueueConfig.getPriorityQueueStateType() == EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) { return new RocksDBHeapTimersFullRestoreOperation<>( @@ -586,8 +587,8 @@ public class RocksDBKeyedStateBackendBuilder extends AbstractKeyedStateBacken RocksDBStateUploader stateUploader = injectRocksDBStateUploader == null ? new RocksDBStateUploader( -RocksDBStateDataTransferHelper.forThr
(flink) branch master updated: [FLINK-26050] Manually compact small SST files
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new cd722033fb3 [FLINK-26050] Manually compact small SST files cd722033fb3 is described below commit cd722033fb326837a80a6233603d12ad176da15c Author: Roman Khachatryan AuthorDate: Fri Feb 16 19:42:43 2024 +0100 [FLINK-26050] Manually compact small SST files In some cases, the number of files produced by RocksDB state backend grows indefinitely. This might cause task state info (TDD and checkpoint ACK) to exceed RPC message size and fail recovery/checkpoint in addition to having lots of small files. With this change, such files are merged in the background using RocksDB API. --- .../state/api/runtime/SavepointEnvironment.java| 6 +- .../flink/runtime/io/disk/iomanager/IOManager.java | 10 +- .../runtime/io/disk/iomanager/IOManagerAsync.java | 31 ++- .../runtime/taskexecutor/TaskManagerServices.java | 2 +- .../runtime/io/disk/iomanager/IOManagerTest.java | 3 +- .../flink-statebackend-rocksdb/pom.xml | 5 + .../state/EmbeddedRocksDBStateBackend.java | 20 +- .../streaming/state/RocksDBKeyedStateBackend.java | 14 +- .../state/RocksDBKeyedStateBackendBuilder.java | 33 ++- .../state/RocksDBPriorityQueueSetFactory.java | 7 +- .../state/sstmerge/ColumnFamilyLookup.java | 77 ++ .../state/sstmerge/CompactionScheduler.java| 149 +++ .../streaming/state/sstmerge/CompactionTask.java | 73 ++ .../state/sstmerge/CompactionTaskProducer.java | 234 + .../state/sstmerge/CompactionTracker.java | 131 ++ .../streaming/state/sstmerge/Compactor.java| 80 ++ .../sstmerge/RocksDBManualCompactionConfig.java| 167 + .../sstmerge/RocksDBManualCompactionManager.java | 70 ++ .../RocksDBManualCompactionManagerImpl.java| 84 +++ .../sstmerge/RocksDBManualCompactionOptions.java | 86 +++ .../state/EmbeddedRocksDBStateBackendTest.java | 53 .../state/sstmerge/CompactionSchedulerTest.java| 50 .../state/sstmerge/CompactionTaskProducerTest.java | 278 + .../ManuallyTriggeredScheduledExecutorService.java | 4 + 24 files changed, 1652 insertions(+), 15 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java index 00f471f1a7b..42ffb7e6037 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java @@ -68,6 +68,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.UserCodeClassLoader; +import org.apache.flink.util.concurrent.Executors; import java.util.Collections; import java.util.Map; @@ -141,7 +142,10 @@ public class SavepointEnvironment implements Environment { this.registry = new KvStateRegistry().createTaskRegistry(jobID, vertexID); this.taskStateManager = new SavepointTaskStateManager(prioritizedOperatorSubtaskState); -this.ioManager = new IOManagerAsync(ConfigurationUtils.parseTempDirectories(configuration)); +this.ioManager = +new IOManagerAsync( +ConfigurationUtils.parseTempDirectories(configuration), +Executors.newDirectExecutorService()); this.memoryManager = MemoryManager.create(64 * 1024 * 1024, DEFAULT_PAGE_SIZE); this.sharedResources = new SharedResources(); this.accumulatorRegistry = new AccumulatorRegistry(jobID, attemptID); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index 9c1e9d9c245..d56b122f25c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -33,6 +33,7 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; @@ -44,6 +45,8 @@ public abstract class IOManager implements AutoCloseable { private final FileChannelManager fileChannelManager; +protected final ExecutorService executorService
(flink) branch master updated (d301839dfe2 -> f86c0804121)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from d301839dfe2 [FLINK-35000][build] Updates link to test code convention in pull request template new 875683082a5 [FLINK-34994][tests] Ignore unknown task checkpoint confirmation log in JobIDLoggingITCase new f86c0804121 [hotfix] Log JobID in checkpoint abort message if available The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/runtime/taskexecutor/TaskExecutor.java | 18 ++ .../org/apache/flink/test/misc/JobIDLoggingITCase.java | 2 ++ 2 files changed, 12 insertions(+), 8 deletions(-)
(flink) 02/02: [hotfix] Log JobID in checkpoint abort message if available
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f86c08041211bbeddf36c9ff0fbe6ae4abaa3b9d Author: Roman Khachatryan AuthorDate: Wed Apr 3 10:23:14 2024 +0200 [hotfix] Log JobID in checkpoint abort message if available --- .../flink/runtime/taskexecutor/TaskExecutor.java | 18 ++ 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 99b4ca7d370..f26e9ef7610 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -1116,18 +1116,20 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { long checkpointId, long latestCompletedCheckpointId, long checkpointTimestamp) { -log.debug( -"Abort checkpoint {}@{} for {}.", -checkpointId, -checkpointTimestamp, -executionAttemptID); - final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { -task.notifyCheckpointAborted(checkpointId, latestCompletedCheckpointId); +try (MdcCloseable ignored = + MdcUtils.withContext(MdcUtils.asContextData(task.getJobID( { +log.debug( +"Abort checkpoint {}@{} for {}.", +checkpointId, +checkpointTimestamp, +executionAttemptID); +task.notifyCheckpointAborted(checkpointId, latestCompletedCheckpointId); -return CompletableFuture.completedFuture(Acknowledge.get()); +return CompletableFuture.completedFuture(Acknowledge.get()); +} } else { final String message = "TaskManager received an aborted checkpoint for unknown task "
(flink) 01/02: [FLINK-34994][tests] Ignore unknown task checkpoint confirmation log in JobIDLoggingITCase
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 875683082a58636b377bbb0a82bac4d273455e6e Author: Roman Khachatryan AuthorDate: Wed Apr 3 10:19:34 2024 +0200 [FLINK-34994][tests] Ignore unknown task checkpoint confirmation log in JobIDLoggingITCase --- .../src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java| 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java index fb4e27604fe..0d99a9fce80 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java @@ -153,6 +153,8 @@ class JobIDLoggingITCase { "Received task .*", "Trigger checkpoint .*", "Confirm completed checkpoint .*"), +"TaskManager received a checkpoint confirmation for unknown task.*", +"TaskManager received an aborted checkpoint for unknown task.*", "Un-registering task.*", "Successful registration.*", "Establish JobManager connection.*",
(flink) 02/02: [FLINK-26515][tests] Tolerate TimeoutException in RetryingExecutorTest.testDiscardOnTimeout
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit febdaeeda5117f54dcb7fa8f8e9802eb9fd2898e Author: Roman Khachatryan AuthorDate: Tue Mar 26 18:53:00 2024 +0100 [FLINK-26515][tests] Tolerate TimeoutException in RetryingExecutorTest.testDiscardOnTimeout --- .../java/org/apache/flink/changelog/fs/RetryingExecutorTest.java | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java index eb204b0ac98..14cd00b3875 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.util.function.ThrowingConsumer; -import org.assertj.core.data.Percentage; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -69,7 +68,7 @@ class RetryingExecutorTest { @Test void testDiscardOnTimeout() throws Exception { -int timeoutMs = 5; +int timeoutMs = 50; // should be long enough for the last attempt to succeed int numAttempts = 7; int successfulAttempt = numAttempts - 1; List completed = new CopyOnWriteArrayList<>(); @@ -123,7 +122,11 @@ class RetryingExecutorTest { Thread.sleep(10); } } -assertThat(unexpectedException).hasValue(null); +if (unexpectedException.get() != null) { +// the last attempt might still timeout if the worker node is overloaded +// and the thread is unscheduled for more than timeoutMs + assertThat(unexpectedException).isInstanceOf(TimeoutException.class); +} assertThat(singletonList(successfulAttempt)).isEqualTo(completed); assertThat(IntStream.range(0, successfulAttempt).boxed().collect(toList())) .isEqualTo(discarded.stream().sorted().collect(toList()));
(flink) 02/02: [FLINK-26515][tests] Tolerate TimeoutException in RetryingExecutorTest.testDiscardOnTimeout
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git commit b4767aeeebcb482584ddd70dfcd329a2a2c4ff03 Author: Roman Khachatryan AuthorDate: Tue Mar 26 18:53:00 2024 +0100 [FLINK-26515][tests] Tolerate TimeoutException in RetryingExecutorTest.testDiscardOnTimeout --- .../java/org/apache/flink/changelog/fs/RetryingExecutorTest.java | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java index eb204b0ac98..14cd00b3875 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.util.function.ThrowingConsumer; -import org.assertj.core.data.Percentage; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -69,7 +68,7 @@ class RetryingExecutorTest { @Test void testDiscardOnTimeout() throws Exception { -int timeoutMs = 5; +int timeoutMs = 50; // should be long enough for the last attempt to succeed int numAttempts = 7; int successfulAttempt = numAttempts - 1; List completed = new CopyOnWriteArrayList<>(); @@ -123,7 +122,11 @@ class RetryingExecutorTest { Thread.sleep(10); } } -assertThat(unexpectedException).hasValue(null); +if (unexpectedException.get() != null) { +// the last attempt might still timeout if the worker node is overloaded +// and the thread is unscheduled for more than timeoutMs + assertThat(unexpectedException).isInstanceOf(TimeoutException.class); +} assertThat(singletonList(successfulAttempt)).isEqualTo(completed); assertThat(IntStream.range(0, successfulAttempt).boxed().collect(toList())) .isEqualTo(discarded.stream().sorted().collect(toList()));
(flink) 01/02: [FLINK-26615][tests] Assert only the minimum delay in RetryingExecutorTest.testTimeout
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git commit 4069d87c3b5e7281923d8d7fddeb83703c9b Author: Roman Khachatryan AuthorDate: Tue Mar 26 18:00:03 2024 +0100 [FLINK-26615][tests] Assert only the minimum delay in RetryingExecutorTest.testTimeout --- .../test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java index 614c4284413..eb204b0ac98 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java @@ -219,7 +219,7 @@ class RetryingExecutorTest { Executors.newScheduledThreadPool(2)); /* future completion can be delayed arbitrarily causing start delta be less than timeout */ assertThat(((double) secondStart.get() - firstStart.get()) / 1_000_000) -.isCloseTo(timeout, Percentage.withPercentage(75)); +.isGreaterThanOrEqualTo(timeout * .75); } private void testPolicy(
(flink) branch release-1.19 updated (f82ff7c656d -> b4767aeeebc)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git from f82ff7c656d [FLINK-34409][ci] Enable any still disabled e2e tests for the AdaptiveScheduler new 4069d87c333 [FLINK-26615][tests] Assert only the minimum delay in RetryingExecutorTest.testTimeout new b4767aeeebc [FLINK-26515][tests] Tolerate TimeoutException in RetryingExecutorTest.testDiscardOnTimeout The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/flink/changelog/fs/RetryingExecutorTest.java | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-)
(flink) 01/02: [FLINK-26615][tests] Assert only the minimum delay in RetryingExecutorTest.testTimeout
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit f3625bc87faa8ff9020e95d8083b22f197391feb Author: Roman Khachatryan AuthorDate: Tue Mar 26 18:00:03 2024 +0100 [FLINK-26615][tests] Assert only the minimum delay in RetryingExecutorTest.testTimeout --- .../test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java index 614c4284413..eb204b0ac98 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java @@ -219,7 +219,7 @@ class RetryingExecutorTest { Executors.newScheduledThreadPool(2)); /* future completion can be delayed arbitrarily causing start delta be less than timeout */ assertThat(((double) secondStart.get() - firstStart.get()) / 1_000_000) -.isCloseTo(timeout, Percentage.withPercentage(75)); +.isGreaterThanOrEqualTo(timeout * .75); } private void testPolicy(
(flink) branch release-1.18 updated (f2a6ff5a97b -> febdaeeda51)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git from f2a6ff5a97b [FLINK-34409][ci] Enable any still disabled e2e tests for the AdaptiveScheduler new f3625bc87fa [FLINK-26615][tests] Assert only the minimum delay in RetryingExecutorTest.testTimeout new febdaeeda51 [FLINK-26515][tests] Tolerate TimeoutException in RetryingExecutorTest.testDiscardOnTimeout The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/flink/changelog/fs/RetryingExecutorTest.java | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-)
(flink) branch master updated: [FLINK-34643] Use AdaptiveScheduler in JobIDLoggingITCase
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new d4c1a0a1ba4 [FLINK-34643] Use AdaptiveScheduler in JobIDLoggingITCase d4c1a0a1ba4 is described below commit d4c1a0a1ba4a1f1919f5ecccd3baa3d2cd44cef6 Author: Roman Khachatryan AuthorDate: Mon Mar 25 11:12:43 2024 +0100 [FLINK-34643] Use AdaptiveScheduler in JobIDLoggingITCase --- .../apache/flink/test/misc/JobIDLoggingITCase.java | 35 +- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java index 162d57db6cd..fb4e27604fe 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java @@ -22,11 +22,14 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -55,6 +58,7 @@ import java.util.regex.Pattern; import static java.util.Arrays.asList; import static java.util.stream.Collectors.toList; +import static org.apache.flink.configuration.JobManagerOptions.SCHEDULER; import static org.apache.flink.util.MdcUtils.JOB_ID; import static org.assertj.core.api.Assertions.assertThat; import static org.slf4j.event.Level.DEBUG; @@ -86,6 +90,10 @@ class JobIDLoggingITCase { public final LoggerAuditingExtension jobMasterLogging = new LoggerAuditingExtension(JobMaster.class, DEBUG); +@RegisterExtension +public final LoggerAuditingExtension adaptiveSchedulerLogging = +new LoggerAuditingExtension(AdaptiveScheduler.class, DEBUG); + @RegisterExtension public final LoggerAuditingExtension asyncCheckpointRunnableLogging = // this class is private @@ -96,10 +104,17 @@ class JobIDLoggingITCase { public static MiniClusterExtension miniClusterResource = new MiniClusterExtension( new MiniClusterResourceConfiguration.Builder() +.setConfiguration(getConfiguration()) .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(1) .build()); +private static Configuration getConfiguration() { +Configuration configuration = new Configuration(); +configuration.set(SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); +return configuration; +} + @Test void testJobIDLogging(@InjectClusterClient ClusterClient clusterClient) throws Exception { JobID jobID = runJob(clusterClient); @@ -170,18 +185,22 @@ class JobIDLoggingITCase { assertJobIDPresent( jobID, -jobMasterLogging, +adaptiveSchedulerLogging, asList( "Checkpoint storage is set to .*", -"Initializing job .*", "Running initialization on master for job .*", -"Starting execution of job .*", -"Starting scheduling.*", -"State backend is set to .*", "Successfully created execution graph from job graph .*", -"Successfully ran initialization on master.*", -"Triggering a manual checkpoint for job .*.", -"Using failover strategy .*", +"Successfully ran initialization on master.*"), +"Registration at ResourceManager.*", +"Registration with ResourceManager.*", +"Resolved ResourceManager address.*"); + +assertJobIDPresent( +jobID, +jobMasterLogging, +asList( +&q
(flink) branch master updated (7ea5bcce6a5 -> 7d6cdc5e5b9)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 7ea5bcce6a5 [FLINK-34938] Fix incorrect behaviour for comparison functions (#24566) new fc60d176e3a [FLINK-26615][tests] Assert only the minimum delay in RetryingExecutorTest.testTimeout new 7d6cdc5e5b9 [FLINK-26515][tests] Tolerate TimeoutException in RetryingExecutorTest.testDiscardOnTimeout The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/flink/changelog/fs/RetryingExecutorTest.java | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-)
(flink) 01/02: [FLINK-26615][tests] Assert only the minimum delay in RetryingExecutorTest.testTimeout
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit fc60d176e3ad7f06a93ebb37da491737f196fbe6 Author: Roman Khachatryan AuthorDate: Tue Mar 26 18:00:03 2024 +0100 [FLINK-26615][tests] Assert only the minimum delay in RetryingExecutorTest.testTimeout --- .../test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java index 614c4284413..eb204b0ac98 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java @@ -219,7 +219,7 @@ class RetryingExecutorTest { Executors.newScheduledThreadPool(2)); /* future completion can be delayed arbitrarily causing start delta be less than timeout */ assertThat(((double) secondStart.get() - firstStart.get()) / 1_000_000) -.isCloseTo(timeout, Percentage.withPercentage(75)); +.isGreaterThanOrEqualTo(timeout * .75); } private void testPolicy(
(flink) 02/02: [FLINK-26515][tests] Tolerate TimeoutException in RetryingExecutorTest.testDiscardOnTimeout
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 7d6cdc5e5b97db6ac9daf8636fe1873765479246 Author: Roman Khachatryan AuthorDate: Tue Mar 26 18:53:00 2024 +0100 [FLINK-26515][tests] Tolerate TimeoutException in RetryingExecutorTest.testDiscardOnTimeout --- .../java/org/apache/flink/changelog/fs/RetryingExecutorTest.java | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java index eb204b0ac98..14cd00b3875 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.util.function.ThrowingConsumer; -import org.assertj.core.data.Percentage; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -69,7 +68,7 @@ class RetryingExecutorTest { @Test void testDiscardOnTimeout() throws Exception { -int timeoutMs = 5; +int timeoutMs = 50; // should be long enough for the last attempt to succeed int numAttempts = 7; int successfulAttempt = numAttempts - 1; List completed = new CopyOnWriteArrayList<>(); @@ -123,7 +122,11 @@ class RetryingExecutorTest { Thread.sleep(10); } } -assertThat(unexpectedException).hasValue(null); +if (unexpectedException.get() != null) { +// the last attempt might still timeout if the worker node is overloaded +// and the thread is unscheduled for more than timeoutMs + assertThat(unexpectedException).isInstanceOf(TimeoutException.class); +} assertThat(singletonList(successfulAttempt)).isEqualTo(completed); assertThat(IntStream.range(0, successfulAttempt).boxed().collect(toList())) .isEqualTo(discarded.stream().sorted().collect(toList()));
(flink) 01/02: [FLINK-34643] Fix concurrency issue in LoggerAuditingExtension
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ed4d6f091f27ffc778cbb6de6a3fa19251277bdc Author: Roman Khachatryan AuthorDate: Thu Mar 21 14:26:44 2024 +0100 [FLINK-34643] Fix concurrency issue in LoggerAuditingExtension --- .../org/apache/flink/testutils/logging/LoggerAuditingExtension.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java index a1e7367713d..b70611a621a 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java @@ -48,7 +48,7 @@ public class LoggerAuditingExtension implements BeforeEachCallback, AfterEachCal private final String loggerName; private final org.slf4j.event.Level level; -private ConcurrentLinkedQueue loggingEvents; +private volatile ConcurrentLinkedQueue loggingEvents; public LoggerAuditingExtension(Class clazz, org.slf4j.event.Level level) { this(clazz.getCanonicalName(), level); @@ -77,11 +77,12 @@ public class LoggerAuditingExtension implements BeforeEachCallback, AfterEachCal public void beforeEach(ExtensionContext context) throws Exception { loggingEvents = new ConcurrentLinkedQueue<>(); +final ConcurrentLinkedQueue loggingEventsLocal = loggingEvents; Appender testAppender = new AbstractAppender("test-appender", null, null, false, Property.EMPTY_ARRAY) { @Override public void append(LogEvent event) { -loggingEvents.add(event.toImmutable()); +loggingEventsLocal.add(event.toImmutable()); } }; testAppender.start();
(flink) 02/02: [FLINK-34643][tests] Ignore 'lost leadership' log messages
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 4edafcc8b0b96920036a1afaaa37ae87b77668ed Author: Roman Khachatryan AuthorDate: Fri Mar 22 16:10:07 2024 +0100 [FLINK-34643][tests] Ignore 'lost leadership' log messages --- .../src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java index e13bfce16e3..162d57db6cd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java @@ -145,7 +145,8 @@ class JobIDLoggingITCase { ".*ResourceManager.*", "Operator event.*", "Recovered slot allocation snapshots.*", -".*heartbeat.*"); +".*heartbeat.*", +".*leadership.*"); assertJobIDPresent( jobID,
(flink) branch master updated (8dcb0ae9063 -> 4edafcc8b0b)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 8dcb0ae9063 [FLINK-32513][core] Add predecessor caching new ed4d6f091f2 [FLINK-34643] Fix concurrency issue in LoggerAuditingExtension new 4edafcc8b0b [FLINK-34643][tests] Ignore 'lost leadership' log messages The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/flink/testutils/logging/LoggerAuditingExtension.java | 5 +++-- .../src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-)
(flink) branch master updated: [FLINK-34643][tests] Fix JobIDLoggingITCase
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 6b5ae445724 [FLINK-34643][tests] Fix JobIDLoggingITCase 6b5ae445724 is described below commit 6b5ae445724b68db05a3f9687cff6dd68e2129d7 Author: Roman Khachatryan AuthorDate: Mon Mar 11 16:22:42 2024 +0100 [FLINK-34643][tests] Fix JobIDLoggingITCase --- .../apache/flink/test/misc/JobIDLoggingITCase.java | 134 +++-- 1 file changed, 98 insertions(+), 36 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java index 3380698feb7..e13bfce16e3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java @@ -37,9 +37,9 @@ import org.apache.flink.test.junit5.InjectClusterClient; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.testutils.logging.LoggerAuditingExtension; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.MdcUtils; import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.util.ReadOnlyStringMap; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; @@ -52,17 +52,14 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; -import java.util.stream.Collectors; -import static org.apache.flink.util.Preconditions.checkState; +import static java.util.Arrays.asList; +import static java.util.stream.Collectors.toList; +import static org.apache.flink.util.MdcUtils.JOB_ID; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.slf4j.event.Level.DEBUG; -/** - * Tests adding of {@link JobID} to logs (via {@link org.slf4j.MDC}) in the most important cases. - */ -public class JobIDLoggingITCase { +class JobIDLoggingITCase { private static final Logger logger = LoggerFactory.getLogger(JobIDLoggingITCase.class); @RegisterExtension @@ -104,8 +101,7 @@ public class JobIDLoggingITCase { .build()); @Test -public void testJobIDLogging(@InjectClusterClient ClusterClient clusterClient) -throws Exception { +void testJobIDLogging(@InjectClusterClient ClusterClient clusterClient) throws Exception { JobID jobID = runJob(clusterClient); clusterClient.cancel(jobID).get(); @@ -114,53 +110,113 @@ public class JobIDLoggingITCase { // - how many messages to expect // - which log patterns to ignore -assertJobIDPresent(jobID, 3, checkpointCoordinatorLogging); -assertJobIDPresent(jobID, 6, streamTaskLogging); assertJobIDPresent( jobID, -9, +checkpointCoordinatorLogging, +asList( +"No checkpoint found during restore.", +"Resetting the master hooks.", +"Triggering checkpoint .*", +"Received acknowledge message for checkpoint .*", +"Completed checkpoint .*", +"Checkpoint state: .*")); + +assertJobIDPresent( +jobID, +streamTaskLogging, +asList( +"State backend is set to .*", +"Initializing Source: .*", +"Invoking Source: .*", +"Starting checkpoint .*", +"Notify checkpoint \\d+ complete .*")); + +assertJobIDPresent( +jobID, taskExecutorLogging, +asList( +"Received task .*", +"Trigger checkpoint .*", +"Confirm completed checkpoint .*"), "Un-registering task.*", "Successful registration.*", "Establish JobManager connection.*", "Offer reserved slots.*", ".*ResourceManager.*", -"Operator event.*"); +"Operator event.*", +"Recovered slot allocation snapshots.*", +".*heartbeat.*"); + +assertJobIDPresent( +jobID, +taskLogging, +asList( +"Sou
(flink) branch master updated: [FLINK-34417] Log Job ID via MDC
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new d6a4eb966fb [FLINK-34417] Log Job ID via MDC d6a4eb966fb is described below commit d6a4eb966fbc47277e07b79e7c64939a62eb1d54 Author: Roman Khachatryan AuthorDate: Sat Feb 3 13:17:36 2024 +0100 [FLINK-34417] Log Job ID via MDC --- .../content.zh/docs/deployment/advanced/logging.md | 15 ++ docs/content/docs/deployment/advanced/logging.md | 14 ++ .../org/apache/flink/util/MdcAwareExecutor.java| 39 .../apache/flink/util/MdcAwareExecutorService.java | 114 +++ .../util/MdcAwareScheduledExecutorService.java | 61 ++ .../main/java/org/apache/flink/util/MdcUtils.java | 112 +++ .../java/org/apache/flink/util/MdcUtilsTest.java | 148 ++ .../runtime/rpc/pekko/FencedPekkoRpcActor.java | 7 +- .../flink/runtime/rpc/pekko/PekkoRpcActor.java | 75 +++ .../flink/runtime/rpc/pekko/PekkoRpcService.java | 11 +- .../flink/runtime/rpc/FencedRpcEndpoint.java | 14 +- .../org/apache/flink/runtime/rpc/RpcEndpoint.java | 33 +++- .../org/apache/flink/runtime/rpc/RpcService.java | 7 +- .../runtime/checkpoint/CheckpointCoordinator.java | 47 ++--- .../ChannelStateWriteRequestExecutorFactory.java | 3 +- .../ChannelStateWriteRequestExecutorImpl.java | 60 +++--- .../flink/runtime/dispatcher/Dispatcher.java | 41 ++-- .../JobMasterServiceLeadershipRunnerFactory.java | 4 +- .../executiongraph/DefaultExecutionGraph.java | 10 +- .../apache/flink/runtime/jobmaster/JobMaster.java | 21 +- .../runtime/resourcemanager/ResourceManager.java | 48 +++-- .../flink/runtime/taskexecutor/TaskExecutor.java | 183 + .../org/apache/flink/runtime/taskmanager/Task.java | 28 ++- .../ChannelStateWriteRequestExecutorImplTest.java | 29 +-- .../flink/runtime/rpc/TestingRpcService.java | 7 +- .../flink/streaming/runtime/tasks/StreamTask.java | 24 ++- .../testutils/logging/LoggerAuditingExtension.java | 21 +- flink-tests/pom.xml| 2 +- .../OperatorEventSendingCheckpointITCase.java | 6 +- .../apache/flink/test/misc/JobIDLoggingITCase.java | 220 + .../src/test/resources/log4j2-test.properties | 2 +- 31 files changed, 1148 insertions(+), 258 deletions(-) diff --git a/docs/content.zh/docs/deployment/advanced/logging.md b/docs/content.zh/docs/deployment/advanced/logging.md index abb4b1025f0..432336946de 100644 --- a/docs/content.zh/docs/deployment/advanced/logging.md +++ b/docs/content.zh/docs/deployment/advanced/logging.md @@ -40,6 +40,21 @@ Flink 中的日志记录是使用 [SLF4J](http://www.slf4j.org/) 日志接口实 +### Structured logging + +Flink adds the following fields to [MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log messages (experimental feature): +- Job ID +- key: `flink-job-id` +- format: string +- length 32 + +This is most useful in environments with structured logging and allows you to quickly filter the relevant logs. + +The MDC is propagated by slf4j to the logging backend which usually adds it to the log records automatically (e.g. in [log4j json layout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html)). +Alternatively, it can be configured explicitly - [log4j pattern layout](https://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html) might look like this: + +`[%-32X{flink-job-id}] %c{0} %m%n`. + ## 配置 Log4j 2 Log4j 2 是通过 property 配置文件进行配置的。 diff --git a/docs/content/docs/deployment/advanced/logging.md b/docs/content/docs/deployment/advanced/logging.md index 6c01e1ddff1..cc2d0201e17 100644 --- a/docs/content/docs/deployment/advanced/logging.md +++ b/docs/content/docs/deployment/advanced/logging.md @@ -38,6 +38,20 @@ This allows you to use any logging framework that supports SLF4J, without having By default, [Log4j 2](https://logging.apache.org/log4j/2.x/index.html) is used as the underlying logging framework. +### Structured logging + +Flink adds the following fields to [MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log messages (experimental feature): +- Job ID + - key: `flink-job-id` + - format: string + - length 32 + +This is most useful in environments with structured logging and allows you to quickly filter the relevant logs. + +The MDC is propagated by slf4j to the logging backend which usually adds it to the log records automatically (e.g. in [log4j json layout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html)). +Alternatively, it can be configured explicitly - [log4j pattern layout](https://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html) might look like this: + +`[%-32X{flink-job-id
(flink) branch release-1.19 updated: [FLINK-34344] Pass JobID to CheckpointStatsTracker
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.19 by this push: new 37756561d99 [FLINK-34344] Pass JobID to CheckpointStatsTracker 37756561d99 is described below commit 37756561d99ff73ba8cbf445c57f57fe11250867 Author: Roman Khachatryan AuthorDate: Fri Feb 2 16:02:14 2024 +0100 [FLINK-34344] Pass JobID to CheckpointStatsTracker --- .../runtime/checkpoint/CheckpointStatsTracker.java | 6 -- .../scheduler/DefaultExecutionGraphFactory.java | 3 ++- .../checkpoint/CheckpointCoordinatorFailureTest.java | 4 +++- .../CheckpointCoordinatorMasterHooksTest.java| 2 +- .../checkpoint/CheckpointCoordinatorTest.java| 20 +--- .../CheckpointCoordinatorTestingUtils.java | 2 +- .../checkpoint/CheckpointStatsTrackerTest.java | 12 ++-- .../flink/runtime/dispatcher/DispatcherTest.java | 4 +++- .../TestingDefaultExecutionGraphBuilder.java | 3 ++- .../AbstractCheckpointStatsHandlerTest.java | 4 +++- 10 files changed, 38 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index cf66341fc06..ea04211d6f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -111,9 +111,11 @@ public class CheckpointStatsTracker { * @param numRememberedCheckpoints Maximum number of checkpoints to remember, including in * progress ones. * @param metricGroup Metric group for exposed metrics + * @param jobID ID of the job being checkpointed */ -public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup metricGroup) { -this(numRememberedCheckpoints, metricGroup, new JobID(), Integer.MAX_VALUE); +public CheckpointStatsTracker( +int numRememberedCheckpoints, MetricGroup metricGroup, JobID jobID) { +this(numRememberedCheckpoints, metricGroup, jobID, Integer.MAX_VALUE); } CheckpointStatsTracker( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java index aaeb8b6d4c7..67e91a887a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java @@ -129,7 +129,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { () -> new CheckpointStatsTracker( configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE), -jobManagerJobMetricGroup)); +jobManagerJobMetricGroup, +jobManagerJobMetricGroup.jobId())); this.isDynamicGraph = isDynamicGraph; this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory); this.nonFinishedHybridPartitionShouldBeUnknown = nonFinishedHybridPartitionShouldBeUnknown; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 8873b938f1a..6e6bcffe762 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder; @@ -212,7 +213,8 @@ class CheckpointCoordinatorFailureTest { new FailingCompletedCheckpointStore(failure); CheckpointStatsTracker statsTracker = -new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); +new CheckpointStatsTracker( +Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new JobID()); final AtomicInteger cleanupCallCount = new AtomicInteger(0); final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuil
(flink) branch release-1.18 updated: [FLINK-34344] Pass JobID to CheckpointStatsTracker
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new 33fb37aac5f [FLINK-34344] Pass JobID to CheckpointStatsTracker 33fb37aac5f is described below commit 33fb37aac5fbc709a62d35445879c75a6ba48086 Author: Roman Khachatryan AuthorDate: Fri Feb 2 16:02:14 2024 +0100 [FLINK-34344] Pass JobID to CheckpointStatsTracker --- .../runtime/checkpoint/CheckpointStatsTracker.java | 11 +-- .../scheduler/DefaultExecutionGraphFactory.java| 3 ++- .../checkpoint/CheckpointCoordinatorFailureTest.java | 4 +++- .../CheckpointCoordinatorMasterHooksTest.java | 2 +- .../runtime/checkpoint/CheckpointCoordinatorTest.java | 18 -- .../checkpoint/CheckpointCoordinatorTestingUtils.java | 2 +- .../runtime/checkpoint/CheckpointStatsTrackerTest.java | 11 ++- .../flink/runtime/dispatcher/DispatcherTest.java | 4 +++- .../TestingDefaultExecutionGraphBuilder.java | 3 ++- .../AbstractCheckpointStatsHandlerTest.java| 4 +++- 10 files changed, 34 insertions(+), 28 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index f868f3fb4ba..5a8f72f0805 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -25,7 +25,6 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; import org.apache.flink.runtime.rest.util.RestMapperUtils; @@ -106,17 +105,9 @@ public class CheckpointStatsTracker { * @param numRememberedCheckpoints Maximum number of checkpoints to remember, including in * progress ones. * @param metricGroup Metric group for exposed metrics + * @param jobID ID of the job being checkpointed */ public CheckpointStatsTracker( -int numRememberedCheckpoints, JobManagerJobMetricGroup metricGroup) { -this(numRememberedCheckpoints, metricGroup, metricGroup.jobId()); -} - -public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup metricGroup) { -this(numRememberedCheckpoints, metricGroup, new JobID()); -} - -private CheckpointStatsTracker( int numRememberedCheckpoints, MetricGroup metricGroup, JobID jobID) { checkArgument(numRememberedCheckpoints >= 0, "Negative number of remembered checkpoints"); this.history = new CheckpointStatsHistory(numRememberedCheckpoints); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java index 29eb7222d95..0c5279f11fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java @@ -129,7 +129,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { new CheckpointStatsTracker( configuration.getInteger( WebOptions.CHECKPOINTS_HISTORY_SIZE), -jobManagerJobMetricGroup)); +jobManagerJobMetricGroup, +jobManagerJobMetricGroup.jobId())); this.isDynamicGraph = isDynamicGraph; this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory); this.nonFinishedHybridPartitionShouldBeUnknown = nonFinishedHybridPartitionShouldBeUnknown; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 2538072c516..54e06728fe8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSt
(flink) branch master updated: [FLINK-34344] Pass JobID to CheckpointStatsTracker
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c84f42c1a7e [FLINK-34344] Pass JobID to CheckpointStatsTracker c84f42c1a7e is described below commit c84f42c1a7e752eaf8b9c3beb23fb9b01d39443d Author: Roman Khachatryan AuthorDate: Fri Feb 2 16:02:14 2024 +0100 [FLINK-34344] Pass JobID to CheckpointStatsTracker --- .../runtime/checkpoint/CheckpointStatsTracker.java | 6 -- .../scheduler/DefaultExecutionGraphFactory.java | 3 ++- .../checkpoint/CheckpointCoordinatorFailureTest.java | 4 +++- .../CheckpointCoordinatorMasterHooksTest.java| 2 +- .../checkpoint/CheckpointCoordinatorTest.java| 20 +--- .../CheckpointCoordinatorTestingUtils.java | 2 +- .../checkpoint/CheckpointStatsTrackerTest.java | 12 ++-- .../flink/runtime/dispatcher/DispatcherTest.java | 4 +++- .../TestingDefaultExecutionGraphBuilder.java | 3 ++- .../AbstractCheckpointStatsHandlerTest.java | 4 +++- 10 files changed, 38 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index cf66341fc06..ea04211d6f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -111,9 +111,11 @@ public class CheckpointStatsTracker { * @param numRememberedCheckpoints Maximum number of checkpoints to remember, including in * progress ones. * @param metricGroup Metric group for exposed metrics + * @param jobID ID of the job being checkpointed */ -public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup metricGroup) { -this(numRememberedCheckpoints, metricGroup, new JobID(), Integer.MAX_VALUE); +public CheckpointStatsTracker( +int numRememberedCheckpoints, MetricGroup metricGroup, JobID jobID) { +this(numRememberedCheckpoints, metricGroup, jobID, Integer.MAX_VALUE); } CheckpointStatsTracker( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java index aaeb8b6d4c7..67e91a887a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java @@ -129,7 +129,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { () -> new CheckpointStatsTracker( configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE), -jobManagerJobMetricGroup)); +jobManagerJobMetricGroup, +jobManagerJobMetricGroup.jobId())); this.isDynamicGraph = isDynamicGraph; this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory); this.nonFinishedHybridPartitionShouldBeUnknown = nonFinishedHybridPartitionShouldBeUnknown; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 8873b938f1a..6e6bcffe762 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder; @@ -212,7 +213,8 @@ class CheckpointCoordinatorFailureTest { new FailingCompletedCheckpointStore(failure); CheckpointStatsTracker statsTracker = -new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); +new CheckpointStatsTracker( +Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new JobID()); final AtomicInteger cleanupCallCount = new AtomicInteger(0); final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder() d
(flink) branch release-1.17 updated: [FLINK-33442] Copy local state collection preventively
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new 9f067c4a825 [FLINK-33442] Copy local state collection preventively 9f067c4a825 is described below commit 9f067c4a825c51d5856fa77ff66d6cbcb9a62336 Author: Roman Khachatryan AuthorDate: Thu Nov 2 21:46:14 2023 + [FLINK-33442] Copy local state collection preventively --- .../streaming/state/restore/RocksDBIncrementalRestoreOperation.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index 5a89403617e..6fbae63c432 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -164,7 +164,7 @@ public class RocksDBIncrementalRestoreOperation implements RocksDBRestoreOper || !Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange)); if (isRescaling) { -restoreWithRescaling(restoreStateHandles); +restoreWithRescaling(new ArrayList<>(restoreStateHandles)); } else { restoreWithoutRescaling(theFirstStateHandle); }
[flink] branch master updated: [FLINK-33077][runtime] Minimize the risk of hard back-pressure with buffer debloating enabled
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c7f6470bb8c [FLINK-33077][runtime] Minimize the risk of hard back-pressure with buffer debloating enabled c7f6470bb8c is described below commit c7f6470bb8cc314e7651b03e171af057f4edec1e Author: Roman Khachatryan AuthorDate: Fri Aug 11 13:51:07 2023 +0200 [FLINK-33077][runtime] Minimize the risk of hard back-pressure with buffer debloating enabled Problem: Buffer debloating sets buffer size to 256 bytes because of back-pressure. Such small buffers might not be enough to emit the processing results of a single record. The task thread would request new buffers, and often block. That results in significant checkpoint delays (up to minutes instead of seconds). Adding more overdraft buffers helps, but depends on the job DoP Raising taskmanager.memory.min-segment-size from 256 helps, but depends on the multiplication factor of the operator. Solution: - Ignore Buffer Debloater hints and extend the buffer if possible - when this prevents emitting an output record fully AND this is the last available buffer. - Prevent the subsequent flush of the buffer so that more output records can be emitted (flatMap-like and join operators) --- .../partition/BufferWritingResultPartition.java| 31 +++--- .../io/network/partition/ResultPartitionTest.java | 21 +++ 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java index c4ced3d7c25..ddfe117cab7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java @@ -295,11 +295,34 @@ public abstract class BufferWritingResultPartition extends ResultPartition { addToSubpartition(buffer, targetSubpartition, 0, record.remaining()); } -buffer.appendAndCommit(record); +append(record, buffer); return buffer; } +private int append(ByteBuffer record, BufferBuilder buffer) { +// Try to avoid hard back-pressure in the subsequent calls to request buffers +// by ignoring Buffer Debloater hints and extending the buffer if possible (trim). +// This decreases the probability of hard back-pressure in cases when +// the output size varies significantly and BD suggests too small values. +// The hint will be re-applied on the next iteration. +if (record.remaining() >= buffer.getWritableBytes()) { +// This 2nd check is expensive, so it shouldn't be re-ordered. +// However, it has the same cost as the subsequent call to request buffer, so it doesn't +// affect the performance much. +if (!bufferPool.isAvailable()) { +// add 1 byte to prevent immediately flushing the buffer and potentially fit the +// next record +int newSize = +buffer.getMaxCapacity() ++ (record.remaining() - buffer.getWritableBytes()) ++ 1; +buffer.trim(Math.max(buffer.getMaxCapacity(), newSize)); +} +} +return buffer.appendAndCommit(record); +} + private void addToSubpartition( BufferBuilder buffer, int targetSubpartition, @@ -339,7 +362,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition { // starting // with a complete record. // !! The next two lines can not change order. -final int partialRecordBytes = buffer.appendAndCommit(remainingRecordBytes); +final int partialRecordBytes = append(remainingRecordBytes, buffer); addToSubpartition(buffer, targetSubpartition, partialRecordBytes, partialRecordBytes); return buffer; @@ -354,7 +377,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition { createBroadcastBufferConsumers(buffer, 0, record.remaining()); } -buffer.appendAndCommit(record); +append(record, buffer); return buffer; } @@ -368,7 +391,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition { // starting // with a complete record. // !! The next two lines can not change order. -final int partialRecordBytes = buffer.appendAndCommit(remainingRecordBytes); +final
[flink] branch master updated (3ef71327177 -> 5f9de3eda3e)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 3ef71327177 [FLINK-32835][runtime] Migrate unit tests in "accumulators" and "blob" packages to JUnit5 add 5f9de3eda3e [FLINK-32761][state/backends] Use UUID based on PhysicalStateHandleID as SharedStateRegistryKey ChangelogStateHandleStreamImpl No new revisions were added by this update. Summary of changes: .../state/IncrementalRemoteKeyedStateHandle.java | 13 ++--- .../runtime/state/SharedStateRegistryKey.java | 11 +++ .../changelog/ChangelogStateHandleStreamImpl.java | 22 +++--- .../IncrementalRemoteKeyedStateHandleTest.java | 6 -- .../state/EmbeddedRocksDBStateBackendTest.java | 3 ++- 5 files changed, 22 insertions(+), 33 deletions(-)
[flink] branch release-1.17 updated: [FLINK-31139][state/changelog] not upload empty state changelog file
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new 8798fde04e4 [FLINK-31139][state/changelog] not upload empty state changelog file 8798fde04e4 is described below commit 8798fde04e4a28af8bee7f2641b4aa89f2e0995f Author: wangfeifan AuthorDate: Mon Feb 20 20:26:58 2023 +0800 [FLINK-31139][state/changelog] not upload empty state changelog file --- .../flink/changelog/fs/BatchingStateChangeUploadScheduler.java | 5 + 1 file changed, 5 insertions(+) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java index 3495c6bc525..39023bf96d1 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java @@ -223,6 +223,11 @@ class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler { scheduledBytesCounter = 0; scheduledFuture = null; } + +if (tasks.size() == 0) { +return; +} + try { Throwable error = getErrorSafe(); if (error != null) {
[flink] branch release-1.16 updated: [FLINK-31139][state/changelog] not upload empty state changelog file
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.16 by this push: new 5157ac5921d [FLINK-31139][state/changelog] not upload empty state changelog file 5157ac5921d is described below commit 5157ac5921d406f577c83a4fe57b373d8ae0bf79 Author: wangfeifan AuthorDate: Mon Feb 20 20:26:58 2023 +0800 [FLINK-31139][state/changelog] not upload empty state changelog file --- .../flink/changelog/fs/BatchingStateChangeUploadScheduler.java | 5 + 1 file changed, 5 insertions(+) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java index 3495c6bc525..39023bf96d1 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java @@ -223,6 +223,11 @@ class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler { scheduledBytesCounter = 0; scheduledFuture = null; } + +if (tasks.size() == 0) { +return; +} + try { Throwable error = getErrorSafe(); if (error != null) {
[flink] branch release-1.17 updated: [hotfix][checkpointing] Fix outdated java doc in SharedStateRegistry after FLINK-29913
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new e213267ebf6 [hotfix][checkpointing] Fix outdated java doc in SharedStateRegistry after FLINK-29913 e213267ebf6 is described below commit e213267ebf691f1e959be6f5154a790c2ec6a2e9 Author: wangfeifan AuthorDate: Sat Aug 5 19:14:55 2023 +0800 [hotfix][checkpointing] Fix outdated java doc in SharedStateRegistry after FLINK-29913 Co-authored-by: Roman --- .../apache/flink/runtime/state/SharedStateRegistry.java | 15 --- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index 097e7f12da8..53beb00bb35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -54,19 +54,20 @@ public interface SharedStateRegistry extends AutoCloseable { } /** - * Register a reference to the given shared state in the registry. If there is already a state - * handle registered under the given key, the "new" state handle is disposed . + * Register a reference to the given shared state in the registry. The registry key should be + * based on the physical identifier of the state. If there is already a state handle registered + * under the same key and the 'new' state is not equal to the old one, an exception will be + * thrown. * - * IMPORTANT: caller should check the state handle returned by the result, because the - * registry is performing de-duplication and could potentially return a handle that is supposed - * to replace the one from the registration request. + * IMPORTANT: the caller must use the returned state handle instead of the passed one because + * the registry might replace or update it. * * @param state the shared state for which we register a reference. * @param checkpointID which uses the state * @param preventDiscardingCreatedCheckpoint as long as this state is still in use. The * "checkpoint that created the state" is recorded on the first state registration. - * @return the result of this registration request, consisting of the state handle that is - * registered under the key by the end of the operation and its current reference count. + * @return the state handle registered under the given key. It might differ from the passed + * state handle, e.g. if it was a placeholder. */ StreamStateHandle registerReference( SharedStateRegistryKey registrationKey,
[flink] branch release-1.16 updated: [hotfix][checkpointing] Fix outdated java doc in SharedStateRegistry after FLINK-29913
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.16 by this push: new f2d3ca0bf4d [hotfix][checkpointing] Fix outdated java doc in SharedStateRegistry after FLINK-29913 f2d3ca0bf4d is described below commit f2d3ca0bf4d49621450c7e71c1bd7c9de2deb89b Author: wangfeifan AuthorDate: Sat Aug 5 19:14:55 2023 +0800 [hotfix][checkpointing] Fix outdated java doc in SharedStateRegistry after FLINK-29913 Co-authored-by: Roman --- .../apache/flink/runtime/state/SharedStateRegistry.java | 15 --- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index 097e7f12da8..53beb00bb35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -54,19 +54,20 @@ public interface SharedStateRegistry extends AutoCloseable { } /** - * Register a reference to the given shared state in the registry. If there is already a state - * handle registered under the given key, the "new" state handle is disposed . + * Register a reference to the given shared state in the registry. The registry key should be + * based on the physical identifier of the state. If there is already a state handle registered + * under the same key and the 'new' state is not equal to the old one, an exception will be + * thrown. * - * IMPORTANT: caller should check the state handle returned by the result, because the - * registry is performing de-duplication and could potentially return a handle that is supposed - * to replace the one from the registration request. + * IMPORTANT: the caller must use the returned state handle instead of the passed one because + * the registry might replace or update it. * * @param state the shared state for which we register a reference. * @param checkpointID which uses the state * @param preventDiscardingCreatedCheckpoint as long as this state is still in use. The * "checkpoint that created the state" is recorded on the first state registration. - * @return the result of this registration request, consisting of the state handle that is - * registered under the key by the end of the operation and its current reference count. + * @return the state handle registered under the given key. It might differ from the passed + * state handle, e.g. if it was a placeholder. */ StreamStateHandle registerReference( SharedStateRegistryKey registrationKey,
[flink] branch master updated: [hotfix][checkpointing] Fix outdated java doc in SharedStateRegistry after FLINK-29913
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c7d3e360530 [hotfix][checkpointing] Fix outdated java doc in SharedStateRegistry after FLINK-29913 c7d3e360530 is described below commit c7d3e36053091b7d7b10e89c5f9dba3df46ec0bd Author: wangfeifan AuthorDate: Sat Aug 5 19:14:55 2023 +0800 [hotfix][checkpointing] Fix outdated java doc in SharedStateRegistry after FLINK-29913 Co-authored-by: Roman --- .../apache/flink/runtime/state/SharedStateRegistry.java | 15 --- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index 097e7f12da8..53beb00bb35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -54,19 +54,20 @@ public interface SharedStateRegistry extends AutoCloseable { } /** - * Register a reference to the given shared state in the registry. If there is already a state - * handle registered under the given key, the "new" state handle is disposed . + * Register a reference to the given shared state in the registry. The registry key should be + * based on the physical identifier of the state. If there is already a state handle registered + * under the same key and the 'new' state is not equal to the old one, an exception will be + * thrown. * - * IMPORTANT: caller should check the state handle returned by the result, because the - * registry is performing de-duplication and could potentially return a handle that is supposed - * to replace the one from the registration request. + * IMPORTANT: the caller must use the returned state handle instead of the passed one because + * the registry might replace or update it. * * @param state the shared state for which we register a reference. * @param checkpointID which uses the state * @param preventDiscardingCreatedCheckpoint as long as this state is still in use. The * "checkpoint that created the state" is recorded on the first state registration. - * @return the result of this registration request, consisting of the state handle that is - * registered under the key by the end of the operation and its current reference count. + * @return the state handle registered under the given key. It might differ from the passed + * state handle, e.g. if it was a placeholder. */ StreamStateHandle registerReference( SharedStateRegistryKey registrationKey,
[flink] 02/04: [refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks for state handles
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit 46fa6a03062b90c460141aa275fa06347bc876da Author: wangfeifan AuthorDate: Mon Jul 24 14:44:02 2023 +0800 [refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks for state handles --- .../checkpoint/CheckpointCoordinatorTest.java | 47 -- .../checkpoint/metadata/CheckpointTestUtils.java | 13 +++--- ...Handle.java => DiscardRecordedStateObject.java} | 36 +++-- .../IncrementalRemoteKeyedStateHandleTest.java | 36 ++--- ...le.java => TestingRelativeFileStateHandle.java} | 22 +- .../runtime/state/TestingStreamStateHandle.java| 8 +++- 6 files changed, 83 insertions(+), 79 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index baf4527ba88..83c82d6acfb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.DiscardRecordedStateObject; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; @@ -74,6 +75,7 @@ import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TernaryBoolean; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; @@ -135,7 +137,6 @@ import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -2940,13 +2941,15 @@ class CheckpointCoordinatorTest extends TestLogger { streamStateHandle instanceof PlaceholderStreamStateHandle) .isFalse(); -verify(streamStateHandle, never()).discardState(); +DiscardRecordedStateObject.verifyDiscard( +streamStateHandle, TernaryBoolean.FALSE); ++sharedHandleCount; } for (HandleAndLocalPath handleAndLocalPath : incrementalKeyedStateHandle.getPrivateState()) { -verify(handleAndLocalPath.getHandle(), never()).discardState(); +DiscardRecordedStateObject.verifyDiscard( +handleAndLocalPath.getHandle(), TernaryBoolean.FALSE); } verify(incrementalKeyedStateHandle.getMetaStateHandle(), never()) @@ -2969,7 +2972,8 @@ class CheckpointCoordinatorTest extends TestLogger { // by CP1 for (List cpList : sharedHandlesByCheckpoint) { for (HandleAndLocalPath handleAndLocalPath : cpList) { -verify(handleAndLocalPath.getHandle(), never()).discardState(); +DiscardRecordedStateObject.verifyDiscard( +handleAndLocalPath.getHandle(), TernaryBoolean.FALSE); } } @@ -3028,14 +3032,19 @@ class CheckpointCoordinatorTest extends TestLogger { // references the state from CP1, so we expect they are not discarded. verifyDiscard( sharedHandlesByCheckpoint, -cpId -> restoreMode == RestoreMode.CLAIM && cpId == 0 ? times(1) : never()); +cpId -> +restoreMode == RestoreMode.CLAIM && cpId == 0 +? TernaryBoolean.TRUE +: TernaryBoolean.FALSE
[flink] 01/04: [FLINK-29913][checkpointing] Use PhysicalStateHandleID as a key for shared state of IncrementalRemoteKeyedStateHandle
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit f3d212956660a77d58ab99f4de6611760e5f0b9a Author: wangfeifan AuthorDate: Mon Jul 24 15:39:51 2023 +0800 [FLINK-29913][checkpointing] Use PhysicalStateHandleID as a key for shared state of IncrementalRemoteKeyedStateHandle Before this pr, IncrementalRemoteKeyedStateHandle use local file name of shared states as SharedStateRegistryKey, which cause the handles under the same key are not equal (file may be re-upload when maxConcurrentCheckpoint > 1 or using no-claim mode when recovering from a retained checkpoint). The collision of registry key causes SharedStateRegistry to delete the state handle wrongly. Co-authored-by: Roman --- .../metadata/MetadataV2V3SerializerBase.java | 31 ++--- .../runtime/state/IncrementalKeyedStateHandle.java | 69 - .../state/IncrementalLocalKeyedStateHandle.java| 16 +-- .../state/IncrementalRemoteKeyedStateHandle.java | 86 ++-- .../state/PlaceholderStreamStateHandle.java| 7 +- .../changelog/ChangelogStateBackendHandle.java | 31 +++-- .../checkpoint/CheckpointCoordinatorTest.java | 92 ++-- .../checkpoint/metadata/CheckpointTestUtils.java | 14 +- .../runtime/scheduler/SchedulerUtilsTest.java | 34 +++-- .../IncrementalRemoteKeyedStateHandleTest.java | 155 + .../runtime/state/SharedStateRegistryTest.java | 4 +- .../state/RocksDBKeyedStateBackendBuilder.java | 8 +- .../streaming/state/RocksDBStateDownloader.java| 23 ++- .../streaming/state/RocksDBStateUploader.java | 64 - .../RocksDBIncrementalRestoreOperation.java| 4 +- .../state/restore/RocksDBRestoreResult.java| 11 +- .../snapshot/RocksDBSnapshotStrategyBase.java | 38 +++-- .../snapshot/RocksIncrementalSnapshotStrategy.java | 120 .../snapshot/RocksNativeFullSnapshotStrategy.java | 44 +++--- .../state/snapshot/RocksSnapshotUtil.java | 13 -- .../state/EmbeddedRocksDBStateBackendTest.java | 24 ++-- .../state/RocksDBStateDownloaderTest.java | 19 ++- .../streaming/state/RocksDBStateUploaderTest.java | 36 ++--- .../RocksIncrementalSnapshotStrategyTest.java | 22 +-- .../test/checkpointing/StateHandleReuseITCase.java | 6 +- 25 files changed, 542 insertions(+), 429 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java index 8f5be0f46a7..e04240fc9de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.InputChannelStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; @@ -342,8 +343,8 @@ public abstract class MetadataV2V3SerializerBase { serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), dos); - serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), dos); - serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), dos); + serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getSharedState(), dos); + serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getPrivateState(), dos); writeStateHandleId(incrementalKeyedStateHandle, dos); } else if (stateHandle instanceof ChangelogStateBackendHandle) { @@ -553,10 +554,8 @@ public abstract class MetadataV2V3SerializerBase { KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1); StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis, context); -Map sharedStates = -deserializeStreamStateHandleMap(dis, context); -Map privateStates = -deserializeStreamStateHandleMap(dis, context); +List sharedStates = deserializeHandleAndLocalPathList(dis, context); +List privateStates = deserializeHandleAndLocalPathList(dis, context); UUID uuid; @@ -809,26 +808,26 @@ public abstract class MetadataV2V3SerializerBase { return
[flink] branch release-1.17 updated (96a6d96451a -> 65e7004dd32)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git from 96a6d96451a [hotfix][doc] Fix typo in JobManagerOptions (#22992) new f3d21295666 [FLINK-29913][checkpointing] Use PhysicalStateHandleID as a key for shared state of IncrementalRemoteKeyedStateHandle new 46fa6a03062 [refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks for state handles new 9217bd9f4dc [FLINK-29913][checkpointing] Remove de-duplication from SharedStateRegistry#registerReference() new 65e7004dd32 [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../metadata/MetadataV2V3SerializerBase.java | 31 ++-- .../runtime/state/IncrementalKeyedStateHandle.java | 69 - .../state/IncrementalLocalKeyedStateHandle.java| 16 +- .../state/IncrementalRemoteKeyedStateHandle.java | 86 +-- .../state/PlaceholderStreamStateHandle.java| 7 +- .../runtime/state/SharedStateRegistryImpl.java | 51 ++- .../changelog/ChangelogStateBackendHandle.java | 31 ++-- .../checkpoint/CheckpointCoordinatorTest.java | 107 -- .../checkpoint/metadata/CheckpointTestUtils.java | 27 ++-- .../runtime/scheduler/SchedulerUtilsTest.java | 34 +++-- .../runtime/state/DiscardRecordedStateObject.java | 30 ++-- .../IncrementalRemoteKeyedStateHandleTest.java | 161 + .../runtime/state/SharedStateRegistryTest.java | 48 ++ ...le.java => TestingRelativeFileStateHandle.java} | 22 ++- .../runtime/state/TestingStreamStateHandle.java| 8 +- .../changelog/ChangelogKeyedStateBackend.java | 28 .../changelog/ChangelogKeyedStateBackendTest.java | 56 +++ .../common/PeriodicMaterializationManager.java | 33 - .../state/RocksDBKeyedStateBackendBuilder.java | 8 +- .../streaming/state/RocksDBStateDownloader.java| 23 ++- .../streaming/state/RocksDBStateUploader.java | 64 .../RocksDBIncrementalRestoreOperation.java| 4 +- .../state/restore/RocksDBRestoreResult.java| 11 +- .../snapshot/RocksDBSnapshotStrategyBase.java | 38 +++-- .../snapshot/RocksIncrementalSnapshotStrategy.java | 120 --- .../snapshot/RocksNativeFullSnapshotStrategy.java | 44 +++--- .../state/snapshot/RocksSnapshotUtil.java | 13 -- .../state/EmbeddedRocksDBStateBackendTest.java | 24 +-- .../state/RocksDBStateDownloaderTest.java | 19 ++- .../streaming/state/RocksDBStateUploaderTest.java | 36 +++-- .../RocksIncrementalSnapshotStrategyTest.java | 22 +-- .../test/checkpointing/StateHandleReuseITCase.java | 6 +- 32 files changed, 729 insertions(+), 548 deletions(-) copy flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java => flink-runtime/src/test/java/org/apache/flink/runtime/state/DiscardRecordedStateObject.java (53%) copy flink-runtime/src/test/java/org/apache/flink/runtime/state/{TestingStreamStateHandle.java => TestingRelativeFileStateHandle.java} (66%)
[flink] 03/04: [FLINK-29913][checkpointing] Remove de-duplication from SharedStateRegistry#registerReference()
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit 9217bd9f4dc937f1aff55fe5696e33954445970f Author: wangfeifan AuthorDate: Mon Jul 24 15:12:38 2023 +0800 [FLINK-29913][checkpointing] Remove de-duplication from SharedStateRegistry#registerReference() Since FLINK-29913, we eliminate the possibility of register key collision by using unique keys, such a collision would mean a bug; so we remove that resolution logic and raise an error instead. --- .../runtime/state/SharedStateRegistryImpl.java | 51 ++ .../runtime/state/SharedStateRegistryTest.java | 44 +-- 2 files changed, 24 insertions(+), 71 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java index e2ebcc19108..9e5e8b7b303 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java @@ -86,7 +86,6 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { checkNotNull(newHandle, "State handle should not be null."); -StreamStateHandle scheduledStateDeletion = null; SharedStateEntry entry; synchronized (registeredStates) { @@ -122,37 +121,23 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { LOG.trace( "Duplicated registration under key {} with a placeholder (normal case)", registrationKey); -scheduledStateDeletion = newHandle; -} else if (entry.confirmed) { -LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen if checkpoint confirmation was delayed and state backend re-uploaded the state. " -+ "Discarding the new state and keeping the old one which is included into a completed checkpoint", -registrationKey, -newHandle); -scheduledStateDeletion = newHandle; } else { -// Old entry is not in a confirmed checkpoint yet, and the new one differs. -// This might result from (omitted KG range here for simplicity): -// 1. Flink recovers from a failure using a checkpoint 1 -// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst } -// 3. JM triggers checkpoint 2 -// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst" -// 5. TM crashes; everything is repeated from (2) -// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } -// 7. JM triggers checkpoint 3 -// 8. TM sends NEW state "xyz-002.sst" -// 9. JM discards it as duplicate -// 10. checkpoint completes, but a wrong SST file is used -// So we use a new entry and discard the old one: +// might be a bug expect the StreamStateHandleWrapper used by +// ChangelogStateBackendHandleImpl LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen during the task failover if state backend creates different states with the same key before and after the failure. " -+ "Discarding the OLD state and keeping the NEW one which is included into a completed checkpoint", +"the registered handle should equal to the previous one or is a placeholder, register key:{}, handle:{}", registrationKey, newHandle); -scheduledStateDeletion = entry.stateHandle; -entry.stateHandle = newHandle; +if (entry.stateHandle instanceof EmptyDiscardStateObjectForRegister) { +// This situation means that newHandle is a StreamStateHandleWrapper registered +// by ChangelogStateBackendHandleImpl, keep the new one for discard the +// underlying handle while it was useless. Refactor this once FLINK-25862 is +// resolved. +entry.stateHandle = newHandle; +} else { +throw new IllegalStateException( +"StateObjects underlying same key sho
[flink] 04/04: [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit 65e7004dd32633f9dfc87b0808ffcb587daf525c Author: wangfeifan AuthorDate: Fri Jun 16 11:48:58 2023 +0800 [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled Co-authored-by: Roman --- .../changelog/ChangelogKeyedStateBackend.java | 28 +++ .../changelog/ChangelogKeyedStateBackendTest.java | 56 ++ .../common/PeriodicMaterializationManager.java | 33 ++--- 3 files changed, 111 insertions(+), 6 deletions(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index f286018eb31..f224098bb8e 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -190,6 +190,9 @@ public class ChangelogKeyedStateBackend private long lastConfirmedMaterializationId = -1L; +/** last failed or cancelled materialization. */ +private long lastFailedMaterializationId = -1L; + private final ChangelogTruncateHelper changelogTruncateHelper; public ChangelogKeyedStateBackend( @@ -728,6 +731,7 @@ public class ChangelogKeyedStateBackend materializationId = Math.max(materializationId, h.getMaterializationID()); } } +this.lastConfirmedMaterializationId = materializationId; this.materializedId = materializationId + 1; if (!localMaterialized.isEmpty() || !localRestoredNonMaterialized.isEmpty()) { @@ -758,6 +762,18 @@ public class ChangelogKeyedStateBackend */ @Override public Optional initMaterialization() throws Exception { +if (lastConfirmedMaterializationId < materializedId - 1 +&& lastFailedMaterializationId < materializedId - 1) { +// SharedStateRegistry potentially requires that the checkpoint's dependency on the +// shared file be continuous, it will be broken if we trigger a new materialization +// before the previous one has either confirmed or failed. See discussion in +// https://github.com/apache/flink/pull/22669#issuecomment-1593370772 . +LOG.info( +"materialization:{} not confirmed or failed or cancelled, skip trigger new one.", +materializedId - 1); +return Optional.empty(); +} + SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber(); SequenceNumber lastMaterializedTo = changelogSnapshotState.lastMaterializedTo(); @@ -832,6 +848,18 @@ public class ChangelogKeyedStateBackend changelogTruncateHelper.materialized(upTo); } +@Override +public void handleMaterializationFailureOrCancellation( +long materializationID, SequenceNumber upTo, Throwable cause) { + +LOG.info( +"Task {} failed or cancelled materialization:{} which is upTo:{}", +subtaskName, +materializationID, +upTo); +lastFailedMaterializationId = Math.max(lastFailedMaterializationId, materializationID); +} + // TODO: this method may change after the ownership PR private List getMaterializedResult( @Nonnull SnapshotResult materializedSnapshot) { diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java index 03366b62be9..7581b2720f6 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java @@ -38,16 +38,21 @@ import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackendBuilder; import org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.DummyCheckpointingStorageAccess; +import org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationRunnable; import org.junit.Test; import org.junit.runner.RunWith; import org.ju
[flink] 02/04: [refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks for state handles
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git commit d00e2eb73aa5f2d2506370b5fbe9c8867cbec987 Author: wangfeifan AuthorDate: Mon Jul 24 14:44:02 2023 +0800 [refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks for state handles --- .../checkpoint/CheckpointCoordinatorTest.java | 47 -- .../checkpoint/metadata/CheckpointTestUtils.java | 13 +++--- ...Handle.java => DiscardRecordedStateObject.java} | 36 +++-- .../IncrementalRemoteKeyedStateHandleTest.java | 36 ++--- ...le.java => TestingRelativeFileStateHandle.java} | 22 +- .../runtime/state/TestingStreamStateHandle.java| 8 +++- 6 files changed, 83 insertions(+), 79 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 6fa95fb5380..4eedbc60134 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.DiscardRecordedStateObject; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; @@ -73,6 +74,7 @@ import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorResource; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TernaryBoolean; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; @@ -141,7 +143,6 @@ import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -2940,13 +2941,15 @@ public class CheckpointCoordinatorTest extends TestLogger { streamStateHandle instanceof PlaceholderStreamStateHandle) .isFalse(); -verify(streamStateHandle, never()).discardState(); +DiscardRecordedStateObject.verifyDiscard( +streamStateHandle, TernaryBoolean.FALSE); ++sharedHandleCount; } for (HandleAndLocalPath handleAndLocalPath : incrementalKeyedStateHandle.getPrivateState()) { -verify(handleAndLocalPath.getHandle(), never()).discardState(); +DiscardRecordedStateObject.verifyDiscard( +handleAndLocalPath.getHandle(), TernaryBoolean.FALSE); } verify(incrementalKeyedStateHandle.getMetaStateHandle(), never()) @@ -2969,7 +2972,8 @@ public class CheckpointCoordinatorTest extends TestLogger { // by CP1 for (List cpList : sharedHandlesByCheckpoint) { for (HandleAndLocalPath handleAndLocalPath : cpList) { -verify(handleAndLocalPath.getHandle(), never()).discardState(); +DiscardRecordedStateObject.verifyDiscard( +handleAndLocalPath.getHandle(), TernaryBoolean.FALSE); } } @@ -3027,14 +3031,19 @@ public class CheckpointCoordinatorTest extends TestLogger { // references the state from CP1, so we expect they are not discarded. verifyDiscard( sharedHandlesByCheckpoint, -cpId -> restoreMode == RestoreMode.CLAIM && cpId == 0 ? times(1) : never()); +cpId -> +restoreMode == RestoreMode.CLAIM && cpId == 0 +? TernaryBoolean.TRUE +
[flink] 04/04: [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git commit aad658910fd6967199bce69f33872d245315bca1 Author: wangfeifan AuthorDate: Fri Jun 16 11:48:58 2023 +0800 [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled Co-authored-by: Roman --- .../changelog/ChangelogKeyedStateBackend.java | 28 +++ .../changelog/ChangelogKeyedStateBackendTest.java | 56 ++ .../common/PeriodicMaterializationManager.java | 33 ++--- 3 files changed, 111 insertions(+), 6 deletions(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index f286018eb31..f224098bb8e 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -190,6 +190,9 @@ public class ChangelogKeyedStateBackend private long lastConfirmedMaterializationId = -1L; +/** last failed or cancelled materialization. */ +private long lastFailedMaterializationId = -1L; + private final ChangelogTruncateHelper changelogTruncateHelper; public ChangelogKeyedStateBackend( @@ -728,6 +731,7 @@ public class ChangelogKeyedStateBackend materializationId = Math.max(materializationId, h.getMaterializationID()); } } +this.lastConfirmedMaterializationId = materializationId; this.materializedId = materializationId + 1; if (!localMaterialized.isEmpty() || !localRestoredNonMaterialized.isEmpty()) { @@ -758,6 +762,18 @@ public class ChangelogKeyedStateBackend */ @Override public Optional initMaterialization() throws Exception { +if (lastConfirmedMaterializationId < materializedId - 1 +&& lastFailedMaterializationId < materializedId - 1) { +// SharedStateRegistry potentially requires that the checkpoint's dependency on the +// shared file be continuous, it will be broken if we trigger a new materialization +// before the previous one has either confirmed or failed. See discussion in +// https://github.com/apache/flink/pull/22669#issuecomment-1593370772 . +LOG.info( +"materialization:{} not confirmed or failed or cancelled, skip trigger new one.", +materializedId - 1); +return Optional.empty(); +} + SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber(); SequenceNumber lastMaterializedTo = changelogSnapshotState.lastMaterializedTo(); @@ -832,6 +848,18 @@ public class ChangelogKeyedStateBackend changelogTruncateHelper.materialized(upTo); } +@Override +public void handleMaterializationFailureOrCancellation( +long materializationID, SequenceNumber upTo, Throwable cause) { + +LOG.info( +"Task {} failed or cancelled materialization:{} which is upTo:{}", +subtaskName, +materializationID, +upTo); +lastFailedMaterializationId = Math.max(lastFailedMaterializationId, materializationID); +} + // TODO: this method may change after the ownership PR private List getMaterializedResult( @Nonnull SnapshotResult materializedSnapshot) { diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java index 03366b62be9..7581b2720f6 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java @@ -38,16 +38,21 @@ import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackendBuilder; import org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.DummyCheckpointingStorageAccess; +import org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationRunnable; import org.junit.Test; import org.junit.runner.RunWith; import org.ju
[flink] 03/04: [FLINK-29913][checkpointing] Remove de-duplication from SharedStateRegistry#registerReference()
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git commit bf8456df6954def6f2134511840522b8e959b92b Author: wangfeifan AuthorDate: Mon Jul 24 15:12:38 2023 +0800 [FLINK-29913][checkpointing] Remove de-duplication from SharedStateRegistry#registerReference() Since FLINK-29913, we eliminate the possibility of register key collision by using unique keys, such a collision would mean a bug; so we remove that resolution logic and raise an error instead. --- .../runtime/state/SharedStateRegistryImpl.java | 114 ++--- .../runtime/state/SharedStateRegistryTest.java | 44 ++-- 2 files changed, 64 insertions(+), 94 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java index 2e16c528997..f625ed4a410 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java @@ -79,14 +79,13 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { @Override public StreamStateHandle registerReference( -SharedStateRegistryKey registrationKey, -StreamStateHandle state, -long checkpointID, -boolean preventDiscardingCreatedCheckpoint) { +final SharedStateRegistryKey registrationKey, +final StreamStateHandle newHandle, +final long checkpointID, +final boolean preventDiscardingCreatedCheckpoint) { -checkNotNull(state); +checkNotNull(newHandle, "State handle should not be null."); -StreamStateHandle scheduledStateDeletion = null; SharedStateEntry entry; synchronized (registeredStates) { @@ -95,62 +94,64 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { entry = registeredStates.get(registrationKey); if (entry == null) { -// Additional check that should never fail, because only state handles that are not -// placeholders should -// ever be inserted to the registry. checkState( -!isPlaceholder(state), +!isPlaceholder(newHandle), "Attempt to reference unknown state: " + registrationKey); -entry = new SharedStateEntry(state, checkpointID); +LOG.trace( +"Registered new shared state {} under key {}.", newHandle, registrationKey); +entry = new SharedStateEntry(newHandle, checkpointID); registeredStates.put(registrationKey, entry); -LOG.trace("Registered new shared state {} under key {}.", entry, registrationKey); -} else { -// Delete if this is a real duplicate. -// Note that task (backend) is not required to re-upload state -// if the confirmation notification was missing. -// However, it's also not required to use exactly the same handle or placeholder -if (!Objects.equals(state, entry.stateHandle)) { -if (entry.confirmed || isPlaceholder(state)) { -scheduledStateDeletion = state; -} else { -// Old entry is not in a confirmed checkpoint yet, and the new one differs. -// This might result from (omitted KG range here for simplicity): -// 1. Flink recovers from a failure using a checkpoint 1 -// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst } -// 3. JM triggers checkpoint 2 -// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst" -// 5. TM crashes; everything is repeated from (2) -// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } -// 7. JM triggers checkpoint 3 -// 8. TM sends NEW state "xyz-002.sst" -// 9. JM discards it as duplicate -// 10. checkpoint completes, but a wrong SST file is used -// So we use a new entry and discard the old one: -scheduledStateDeletion = entry.stateHandle; -entry.stateHandle = state; -} -LOG.trace( -"Identified duplicate state registration under key {}. New state
[flink] 01/04: [FLINK-29913][checkpointing] Use PhysicalStateHandleID as a key for shared state of IncrementalRemoteKeyedStateHandle
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git commit c6c54412bf70af4432071dfdf131b62ff8236fe7 Author: wangfeifan AuthorDate: Mon Jul 24 15:39:51 2023 +0800 [FLINK-29913][checkpointing] Use PhysicalStateHandleID as a key for shared state of IncrementalRemoteKeyedStateHandle Before this pr, IncrementalRemoteKeyedStateHandle use local file name of shared states as SharedStateRegistryKey, which cause the handles under the same key are not equal (file may be re-upload when maxConcurrentCheckpoint > 1 or using no-claim mode when recovering from a retained checkpoint). The collision of registry key causes SharedStateRegistry to delete the state handle wrongly. Co-authored-by: Roman --- .../metadata/MetadataV2V3SerializerBase.java | 31 ++--- .../runtime/state/IncrementalKeyedStateHandle.java | 69 - .../state/IncrementalLocalKeyedStateHandle.java| 16 +-- .../state/IncrementalRemoteKeyedStateHandle.java | 86 ++-- .../state/PlaceholderStreamStateHandle.java| 7 +- .../changelog/ChangelogStateBackendHandle.java | 31 +++-- .../checkpoint/CheckpointCoordinatorTest.java | 101 -- .../checkpoint/metadata/CheckpointTestUtils.java | 14 +- .../runtime/scheduler/SchedulerUtilsTest.java | 34 +++-- .../IncrementalRemoteKeyedStateHandleTest.java | 155 + .../runtime/state/SharedStateRegistryTest.java | 4 +- .../state/RocksDBKeyedStateBackendBuilder.java | 8 +- .../streaming/state/RocksDBStateDownloader.java| 24 ++-- .../streaming/state/RocksDBStateUploader.java | 64 - .../RocksDBIncrementalRestoreOperation.java| 4 +- .../state/restore/RocksDBRestoreResult.java| 11 +- .../snapshot/RocksDBSnapshotStrategyBase.java | 38 +++-- .../snapshot/RocksIncrementalSnapshotStrategy.java | 120 .../snapshot/RocksNativeFullSnapshotStrategy.java | 44 +++--- .../state/snapshot/RocksSnapshotUtil.java | 13 -- .../state/EmbeddedRocksDBStateBackendTest.java | 24 ++-- .../state/RocksDBStateDownloaderTest.java | 19 ++- .../streaming/state/RocksDBStateUploaderTest.java | 36 ++--- .../RocksIncrementalSnapshotStrategyTest.java | 22 +-- .../test/checkpointing/StateHandleReuseITCase.java | 6 +- 25 files changed, 548 insertions(+), 433 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java index 37f22ee95ad..5f898359f96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.InputChannelStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; @@ -341,8 +342,8 @@ public abstract class MetadataV2V3SerializerBase { serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), dos); - serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), dos); - serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), dos); + serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getSharedState(), dos); + serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getPrivateState(), dos); writeStateHandleId(incrementalKeyedStateHandle, dos); } else if (stateHandle instanceof ChangelogStateBackendHandle) { @@ -548,10 +549,8 @@ public abstract class MetadataV2V3SerializerBase { KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1); StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis, context); -Map sharedStates = -deserializeStreamStateHandleMap(dis, context); -Map privateStates = -deserializeStreamStateHandleMap(dis, context); +List sharedStates = deserializeHandleAndLocalPathList(dis, context); +List privateStates = deserializeHandleAndLocalPathList(dis, context); UUID uuid; @@ -804,26 +803,26 @@ public abstract class MetadataV2V3SerializerBase { return
[flink] branch release-1.16 updated (d4c52b372a8 -> aad658910fd)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from d4c52b372a8 [hotfix][doc] Fix typo in JobManagerOptions (#22992) new c6c54412bf7 [FLINK-29913][checkpointing] Use PhysicalStateHandleID as a key for shared state of IncrementalRemoteKeyedStateHandle new d00e2eb73aa [refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks for state handles new bf8456df695 [FLINK-29913][checkpointing] Remove de-duplication from SharedStateRegistry#registerReference() new aad658910fd [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../metadata/MetadataV2V3SerializerBase.java | 31 ++-- .../runtime/state/IncrementalKeyedStateHandle.java | 69 - .../state/IncrementalLocalKeyedStateHandle.java| 16 +- .../state/IncrementalRemoteKeyedStateHandle.java | 86 +-- .../state/PlaceholderStreamStateHandle.java| 7 +- .../runtime/state/SharedStateRegistryImpl.java | 114 +++ .../changelog/ChangelogStateBackendHandle.java | 31 ++-- .../checkpoint/CheckpointCoordinatorTest.java | 116 --- .../checkpoint/metadata/CheckpointTestUtils.java | 27 ++-- .../runtime/scheduler/SchedulerUtilsTest.java | 34 +++-- .../runtime/state/DiscardRecordedStateObject.java | 30 ++-- .../IncrementalRemoteKeyedStateHandleTest.java | 161 + .../runtime/state/SharedStateRegistryTest.java | 48 ++ ...le.java => TestingRelativeFileStateHandle.java} | 22 ++- .../runtime/state/TestingStreamStateHandle.java| 8 +- .../changelog/ChangelogKeyedStateBackend.java | 28 .../changelog/ChangelogKeyedStateBackendTest.java | 56 +++ .../common/PeriodicMaterializationManager.java | 33 - .../state/RocksDBKeyedStateBackendBuilder.java | 8 +- .../streaming/state/RocksDBStateDownloader.java| 24 +-- .../streaming/state/RocksDBStateUploader.java | 64 .../RocksDBIncrementalRestoreOperation.java| 4 +- .../state/restore/RocksDBRestoreResult.java| 11 +- .../snapshot/RocksDBSnapshotStrategyBase.java | 38 +++-- .../snapshot/RocksIncrementalSnapshotStrategy.java | 120 --- .../snapshot/RocksNativeFullSnapshotStrategy.java | 44 +++--- .../state/snapshot/RocksSnapshotUtil.java | 13 -- .../state/EmbeddedRocksDBStateBackendTest.java | 24 +-- .../state/RocksDBStateDownloaderTest.java | 19 ++- .../streaming/state/RocksDBStateUploaderTest.java | 36 +++-- .../RocksIncrementalSnapshotStrategyTest.java | 22 +-- .../test/checkpointing/StateHandleReuseITCase.java | 6 +- 32 files changed, 775 insertions(+), 575 deletions(-) copy flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java => flink-runtime/src/test/java/org/apache/flink/runtime/state/DiscardRecordedStateObject.java (53%) copy flink-runtime/src/test/java/org/apache/flink/runtime/state/{TestingStreamStateHandle.java => TestingRelativeFileStateHandle.java} (66%)
[flink] 02/04: [refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks for state handles
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 4467ad05bb9823d90be770d5e1c36989f3905d1b Author: wangfeifan AuthorDate: Mon Jul 24 14:44:02 2023 +0800 [refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks for state handles --- .../checkpoint/CheckpointCoordinatorTest.java | 47 -- .../checkpoint/metadata/CheckpointTestUtils.java | 13 +++--- ...Handle.java => DiscardRecordedStateObject.java} | 36 +++-- .../IncrementalRemoteKeyedStateHandleTest.java | 36 ++--- ...le.java => TestingRelativeFileStateHandle.java} | 22 +- .../runtime/state/TestingStreamStateHandle.java| 8 +++- 6 files changed, 83 insertions(+), 79 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 2f274dab3d8..79752468045 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.DiscardRecordedStateObject; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; @@ -75,6 +76,7 @@ import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TernaryBoolean; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; @@ -137,7 +139,6 @@ import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -2942,13 +2943,15 @@ class CheckpointCoordinatorTest extends TestLogger { streamStateHandle instanceof PlaceholderStreamStateHandle) .isFalse(); -verify(streamStateHandle, never()).discardState(); +DiscardRecordedStateObject.verifyDiscard( +streamStateHandle, TernaryBoolean.FALSE); ++sharedHandleCount; } for (HandleAndLocalPath handleAndLocalPath : incrementalKeyedStateHandle.getPrivateState()) { -verify(handleAndLocalPath.getHandle(), never()).discardState(); +DiscardRecordedStateObject.verifyDiscard( +handleAndLocalPath.getHandle(), TernaryBoolean.FALSE); } verify(incrementalKeyedStateHandle.getMetaStateHandle(), never()) @@ -2971,7 +2974,8 @@ class CheckpointCoordinatorTest extends TestLogger { // by CP1 for (List cpList : sharedHandlesByCheckpoint) { for (HandleAndLocalPath handleAndLocalPath : cpList) { -verify(handleAndLocalPath.getHandle(), never()).discardState(); +DiscardRecordedStateObject.verifyDiscard( +handleAndLocalPath.getHandle(), TernaryBoolean.FALSE); } } @@ -3030,14 +3034,19 @@ class CheckpointCoordinatorTest extends TestLogger { // references the state from CP1, so we expect they are not discarded. verifyDiscard( sharedHandlesByCheckpoint, -cpId -> restoreMode == RestoreMode.CLAIM && cpId == 0 ? times(1) : never()); +cpId -> +restoreMode == RestoreMode.CLAIM && cpId == 0 +? TernaryBoolean.TRUE +: TernaryBoolean.FALSE);
[flink] 01/04: [FLINK-29913][checkpointing] Use PhysicalStateHandleID as a key for shared state of IncrementalRemoteKeyedStateHandle
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6dcb10abf319b9e5494a82bee71f1ae1a4e4b211 Author: wangfeifan AuthorDate: Mon Jul 24 15:39:51 2023 +0800 [FLINK-29913][checkpointing] Use PhysicalStateHandleID as a key for shared state of IncrementalRemoteKeyedStateHandle Before this pr, IncrementalRemoteKeyedStateHandle use local file name of shared states as SharedStateRegistryKey, which cause the handles under the same key are not equal (file may be re-upload when maxConcurrentCheckpoint > 1 or using no-claim mode when recovering from a retained checkpoint). The collision of registry key causes SharedStateRegistry to delete the state handle wrongly. Co-authored-by: Roman --- .../metadata/MetadataV2V3SerializerBase.java | 32 ++--- .../runtime/state/IncrementalKeyedStateHandle.java | 69 - .../state/IncrementalLocalKeyedStateHandle.java| 16 +-- .../state/IncrementalRemoteKeyedStateHandle.java | 86 ++-- .../state/PlaceholderStreamStateHandle.java| 7 +- .../changelog/ChangelogStateBackendHandle.java | 31 +++-- .../checkpoint/CheckpointCoordinatorTest.java | 92 ++-- .../checkpoint/metadata/CheckpointTestUtils.java | 14 +- .../runtime/scheduler/SchedulerUtilsTest.java | 34 +++-- .../IncrementalRemoteKeyedStateHandleTest.java | 155 + .../runtime/state/SharedStateRegistryTest.java | 4 +- .../state/RocksDBKeyedStateBackendBuilder.java | 8 +- .../streaming/state/RocksDBStateDownloader.java| 13 +- .../streaming/state/RocksDBStateUploader.java | 65 - .../RocksDBIncrementalRestoreOperation.java| 3 +- .../state/restore/RocksDBRestoreResult.java| 11 +- .../snapshot/RocksDBSnapshotStrategyBase.java | 38 +++-- .../snapshot/RocksIncrementalSnapshotStrategy.java | 120 .../snapshot/RocksNativeFullSnapshotStrategy.java | 44 +++--- .../state/snapshot/RocksSnapshotUtil.java | 13 -- .../state/EmbeddedRocksDBStateBackendTest.java | 24 ++-- .../state/RocksDBStateDownloaderTest.java | 32 ++--- .../streaming/state/RocksDBStateUploaderTest.java | 36 ++--- .../RocksIncrementalSnapshotStrategyTest.java | 21 +-- .../test/checkpointing/StateHandleReuseITCase.java | 6 +- 25 files changed, 542 insertions(+), 432 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java index 25de2f0b626..db3b5dd22d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.InputChannelStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; @@ -342,8 +343,8 @@ public abstract class MetadataV2V3SerializerBase { serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), dos); - serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), dos); - serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), dos); + serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getSharedState(), dos); + serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getPrivateState(), dos); writeStateHandleId(incrementalKeyedStateHandle, dos); } else if (stateHandle instanceof ChangelogStateBackendHandle) { @@ -553,10 +554,8 @@ public abstract class MetadataV2V3SerializerBase { KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1); StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis, context); -Map sharedStates = -deserializeStreamStateHandleMap(dis, context); -Map privateStates = -deserializeStreamStateHandleMap(dis, context); +List sharedStates = deserializeHandleAndLocalPathList(dis, context); +List privateStates = deserializeHandleAndLocalPathList(dis, context); UUID uuid; @@ -810,27 +809,26 @@ public abstract class MetadataV2V3SerializerBase { return
[flink] 04/04: [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 85f32d6bcb31708c9c2c845ea03ef117726b7c1a Author: wangfeifan AuthorDate: Fri Jun 16 11:48:58 2023 +0800 [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled Co-authored-by: Roman --- .../changelog/ChangelogKeyedStateBackend.java | 28 +++ .../changelog/ChangelogKeyedStateBackendTest.java | 56 ++ .../common/PeriodicMaterializationManager.java | 33 ++--- 3 files changed, 111 insertions(+), 6 deletions(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index f998bc45cf7..599441b8970 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -191,6 +191,9 @@ public class ChangelogKeyedStateBackend private long lastConfirmedMaterializationId = -1L; +/** last failed or cancelled materialization. */ +private long lastFailedMaterializationId = -1L; + private final ChangelogTruncateHelper changelogTruncateHelper; public ChangelogKeyedStateBackend( @@ -729,6 +732,7 @@ public class ChangelogKeyedStateBackend materializationId = Math.max(materializationId, h.getMaterializationID()); } } +this.lastConfirmedMaterializationId = materializationId; this.materializedId = materializationId + 1; if (!localMaterialized.isEmpty() || !localRestoredNonMaterialized.isEmpty()) { @@ -759,6 +763,18 @@ public class ChangelogKeyedStateBackend */ @Override public Optional initMaterialization() throws Exception { +if (lastConfirmedMaterializationId < materializedId - 1 +&& lastFailedMaterializationId < materializedId - 1) { +// SharedStateRegistry potentially requires that the checkpoint's dependency on the +// shared file be continuous, it will be broken if we trigger a new materialization +// before the previous one has either confirmed or failed. See discussion in +// https://github.com/apache/flink/pull/22669#issuecomment-1593370772 . +LOG.info( +"materialization:{} not confirmed or failed or cancelled, skip trigger new one.", +materializedId - 1); +return Optional.empty(); +} + SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber(); SequenceNumber lastMaterializedTo = changelogSnapshotState.lastMaterializedTo(); @@ -833,6 +849,18 @@ public class ChangelogKeyedStateBackend changelogTruncateHelper.materialized(upTo); } +@Override +public void handleMaterializationFailureOrCancellation( +long materializationID, SequenceNumber upTo, Throwable cause) { + +LOG.info( +"Task {} failed or cancelled materialization:{} which is upTo:{}", +subtaskName, +materializationID, +upTo); +lastFailedMaterializationId = Math.max(lastFailedMaterializationId, materializationID); +} + // TODO: this method may change after the ownership PR private List getMaterializedResult( @Nonnull SnapshotResult materializedSnapshot) { diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java index 980d81e0a5d..ff4c93c5460 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java @@ -38,16 +38,21 @@ import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackendBuilder; import org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.DummyCheckpointingStorageAccess; +import org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationRunnable; import org.junit.Test; import org.junit.runner.RunWith; import org.ju
[flink] 03/04: [FLINK-29913][checkpointing] Remove de-duplication from SharedStateRegistry#registerReference()
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit bde09516be2c3693c8f8fef3678999b6e47333c6 Author: wangfeifan AuthorDate: Mon Jul 24 15:12:38 2023 +0800 [FLINK-29913][checkpointing] Remove de-duplication from SharedStateRegistry#registerReference() Since FLINK-29913, we eliminate the possibility of register key collision by using unique keys, such a collision would mean a bug; so we remove that resolution logic and raise an error instead. --- .../runtime/state/SharedStateRegistryImpl.java | 51 ++ .../runtime/state/SharedStateRegistryTest.java | 44 +-- 2 files changed, 24 insertions(+), 71 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java index e2ebcc19108..9e5e8b7b303 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java @@ -86,7 +86,6 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { checkNotNull(newHandle, "State handle should not be null."); -StreamStateHandle scheduledStateDeletion = null; SharedStateEntry entry; synchronized (registeredStates) { @@ -122,37 +121,23 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { LOG.trace( "Duplicated registration under key {} with a placeholder (normal case)", registrationKey); -scheduledStateDeletion = newHandle; -} else if (entry.confirmed) { -LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen if checkpoint confirmation was delayed and state backend re-uploaded the state. " -+ "Discarding the new state and keeping the old one which is included into a completed checkpoint", -registrationKey, -newHandle); -scheduledStateDeletion = newHandle; } else { -// Old entry is not in a confirmed checkpoint yet, and the new one differs. -// This might result from (omitted KG range here for simplicity): -// 1. Flink recovers from a failure using a checkpoint 1 -// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst } -// 3. JM triggers checkpoint 2 -// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst" -// 5. TM crashes; everything is repeated from (2) -// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } -// 7. JM triggers checkpoint 3 -// 8. TM sends NEW state "xyz-002.sst" -// 9. JM discards it as duplicate -// 10. checkpoint completes, but a wrong SST file is used -// So we use a new entry and discard the old one: +// might be a bug expect the StreamStateHandleWrapper used by +// ChangelogStateBackendHandleImpl LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen during the task failover if state backend creates different states with the same key before and after the failure. " -+ "Discarding the OLD state and keeping the NEW one which is included into a completed checkpoint", +"the registered handle should equal to the previous one or is a placeholder, register key:{}, handle:{}", registrationKey, newHandle); -scheduledStateDeletion = entry.stateHandle; -entry.stateHandle = newHandle; +if (entry.stateHandle instanceof EmptyDiscardStateObjectForRegister) { +// This situation means that newHandle is a StreamStateHandleWrapper registered +// by ChangelogStateBackendHandleImpl, keep the new one for discard the +// underlying handle while it was useless. Refactor this once FLINK-25862 is +// resolved. +entry.stateHandle = newHandle; +} else { +throw new IllegalStateException( +"StateObjects underlying same key should be equa
[flink] branch master updated (255d83087c6 -> 85f32d6bcb3)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 255d83087c6 [FLINK-32560][Scala] Deprecate all Scala API new 6dcb10abf31 [FLINK-29913][checkpointing] Use PhysicalStateHandleID as a key for shared state of IncrementalRemoteKeyedStateHandle new 4467ad05bb9 [refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks for state handles new bde09516be2 [FLINK-29913][checkpointing] Remove de-duplication from SharedStateRegistry#registerReference() new 85f32d6bcb3 [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../metadata/MetadataV2V3SerializerBase.java | 32 ++-- .../runtime/state/IncrementalKeyedStateHandle.java | 69 - .../state/IncrementalLocalKeyedStateHandle.java| 16 +- .../state/IncrementalRemoteKeyedStateHandle.java | 86 +-- .../state/PlaceholderStreamStateHandle.java| 7 +- .../runtime/state/SharedStateRegistryImpl.java | 51 ++- .../changelog/ChangelogStateBackendHandle.java | 31 ++-- .../checkpoint/CheckpointCoordinatorTest.java | 107 -- .../checkpoint/metadata/CheckpointTestUtils.java | 27 ++-- .../runtime/scheduler/SchedulerUtilsTest.java | 34 +++-- .../runtime/state/DiscardRecordedStateObject.java | 30 ++-- .../IncrementalRemoteKeyedStateHandleTest.java | 161 + .../runtime/state/SharedStateRegistryTest.java | 48 ++ ...le.java => TestingRelativeFileStateHandle.java} | 22 ++- .../runtime/state/TestingStreamStateHandle.java| 8 +- .../changelog/ChangelogKeyedStateBackend.java | 28 .../changelog/ChangelogKeyedStateBackendTest.java | 56 +++ .../common/PeriodicMaterializationManager.java | 33 - .../state/RocksDBKeyedStateBackendBuilder.java | 8 +- .../streaming/state/RocksDBStateDownloader.java| 13 +- .../streaming/state/RocksDBStateUploader.java | 65 - .../RocksDBIncrementalRestoreOperation.java| 3 +- .../state/restore/RocksDBRestoreResult.java| 11 +- .../snapshot/RocksDBSnapshotStrategyBase.java | 38 +++-- .../snapshot/RocksIncrementalSnapshotStrategy.java | 120 --- .../snapshot/RocksNativeFullSnapshotStrategy.java | 44 +++--- .../state/snapshot/RocksSnapshotUtil.java | 13 -- .../state/EmbeddedRocksDBStateBackendTest.java | 24 +-- .../state/RocksDBStateDownloaderTest.java | 32 ++-- .../streaming/state/RocksDBStateUploaderTest.java | 36 +++-- .../RocksIncrementalSnapshotStrategyTest.java | 21 +-- .../test/checkpointing/StateHandleReuseITCase.java | 6 +- 32 files changed, 729 insertions(+), 551 deletions(-) copy flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java => flink-runtime/src/test/java/org/apache/flink/runtime/state/DiscardRecordedStateObject.java (53%) copy flink-runtime/src/test/java/org/apache/flink/runtime/state/{TestingStreamStateHandle.java => TestingRelativeFileStateHandle.java} (66%)
[flink] branch master updated (fd96076b1b4 -> 86207724d5b)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from fd96076b1b4 [FLINK-32404][table] Add catalog modification listener interface and create listener for catalog manager (#22924) add 86207724d5b [FLINK-19010][metric] Introduce subtask level restore metric No new revisions were added by this update. Summary of changes: docs/content.zh/docs/ops/metrics.md| 5 ++ docs/content/docs/ops/metrics.md | 7 +- .../apache/flink/runtime/metrics/MetricNames.java | 1 + .../runtime/metrics/groups/TaskIOMetricGroup.java | 91 ++ .../metrics/groups/TaskIOMetricGroupTest.java | 63 +-- .../flink/streaming/runtime/tasks/StreamTask.java | 1 + 6 files changed, 131 insertions(+), 37 deletions(-)
[flink] branch master updated: [FLINK-31035] add warn info to user when NoNodeException happend
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f89be70c2b0 [FLINK-31035] add warn info to user when NoNodeException happend f89be70c2b0 is described below commit f89be70c2b00522ccde0c47f45c2236acdd4ad6d Author: xuyu <11161...@vivo.com> AuthorDate: Tue Jul 11 18:44:56 2023 +0800 [FLINK-31035] add warn info to user when NoNodeException happend --- .../org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java index a5bbb8fa1a9..95c81ac7717 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java @@ -364,6 +364,9 @@ public class ZooKeeperStateHandleStore return client.getChildren().forPath(path); } catch (KeeperException.NoNodeException ignored) { // Concurrent deletion, retry +LOG.debug( +"Unable to get all handles, retrying (ZNode was likely deleted concurrently: {})", +ignored.getMessage()); } } }
[flink] branch release-1.16 updated (e891bfc156d -> 80ee512f00a)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from e891bfc156d [python][examples] Add Python streaming word count examples add 80ee512f00a [FLINK-31527][tests] Stabilize ChangelogRescalingITCase No new revisions were added by this update. Summary of changes: .../org/apache/flink/test/state/ChangelogRescalingITCase.java | 10 -- 1 file changed, 8 insertions(+), 2 deletions(-)
[flink] branch master updated: [FLINK-31527][tests] Stabilize ChangelogRescalingITCase
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9ba3b1a5863 [FLINK-31527][tests] Stabilize ChangelogRescalingITCase 9ba3b1a5863 is described below commit 9ba3b1a5863c1aeeca0be25b4bb375abfe02b940 Author: Roman Khachatryan AuthorDate: Tue Mar 21 21:08:27 2023 + [FLINK-31527][tests] Stabilize ChangelogRescalingITCase --- .../org/apache/flink/test/state/ChangelogRescalingITCase.java | 10 -- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java index 6891eafadae..10ba869f6a8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java @@ -329,7 +329,7 @@ public class ChangelogRescalingITCase extends TestLogger { private String checkpointAndCancel(JobID jobID) throws Exception { waitForCheckpoint(jobID, cluster.getMiniCluster(), 1); cluster.getClusterClient().cancel(jobID).get(); -checkStatus(jobID); +waitForSuccessfulTermination(jobID); return CommonTestUtils.getLatestCompletedCheckpointPath(jobID, cluster.getMiniCluster()) .orElseThrow( () -> { @@ -337,7 +337,13 @@ public class ChangelogRescalingITCase extends TestLogger { }); } -private void checkStatus(JobID jobID) throws InterruptedException, ExecutionException { +private void waitForSuccessfulTermination(JobID jobID) throws Exception { +CommonTestUtils.waitUntilCondition( +() -> +cluster.getClusterClient() +.getJobStatus(jobID) +.get() +.isGloballyTerminalState()); if (cluster.getClusterClient().getJobStatus(jobID).get().isGloballyTerminalState()) { cluster.getClusterClient() .requestJobResult(jobID)
[flink] branch master updated: [FLINK-30023][changelog] increase timeout in ChangelogStorageMetricsTest#testTotalAttemptsPerUpload for improving stability
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new ef5b3173c15 [FLINK-30023][changelog] increase timeout in ChangelogStorageMetricsTest#testTotalAttemptsPerUpload for improving stability ef5b3173c15 is described below commit ef5b3173c150fbc3e1394cc6add5de21c13b355d Author: wangfeifan AuthorDate: Wed Nov 16 15:21:34 2022 +0800 [FLINK-30023][changelog] increase timeout in ChangelogStorageMetricsTest#testTotalAttemptsPerUpload for improving stability --- .../org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java index c86276d0ad7..ebf99c338fd 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java @@ -260,8 +260,8 @@ public class ChangelogStorageMetricsTest { @Test void testTotalAttemptsPerUpload() throws Exception { -int numUploads = 20, maxAttempts = 3; -long timeout = 20; +int numUploads = 10, maxAttempts = 3; +long timeout = 50; int numUploadThreads = 4; // must bigger or equal than maxAttempts ChangelogStorageMetricGroup metrics =
[flink] branch master updated: [FLINK-31139][state/changelog] not upload empty state changelog file
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new fb37e6a87d9 [FLINK-31139][state/changelog] not upload empty state changelog file fb37e6a87d9 is described below commit fb37e6a87d92912897c6a8c4b182048e13686dee Author: wangfeifan AuthorDate: Mon Feb 20 20:26:58 2023 +0800 [FLINK-31139][state/changelog] not upload empty state changelog file --- .../flink/changelog/fs/BatchingStateChangeUploadScheduler.java | 5 + 1 file changed, 5 insertions(+) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java index 3495c6bc525..39023bf96d1 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java @@ -223,6 +223,11 @@ class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler { scheduledBytesCounter = 0; scheduledFuture = null; } + +if (tasks.size() == 0) { +return; +} + try { Throwable error = getErrorSafe(); if (error != null) {
[flink] branch release-1.15 updated: [FLINK-27169][tests] Increase changelog upload timeout in PartiallyFinishedSourcesITCase
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.15 by this push: new ddec8d8e144 [FLINK-27169][tests] Increase changelog upload timeout in PartiallyFinishedSourcesITCase ddec8d8e144 is described below commit ddec8d8e144c9cc9adb0a04f41c9667cdd68aabb Author: Roman Khachatryan AuthorDate: Sat Feb 25 22:43:44 2023 + [FLINK-27169][tests] Increase changelog upload timeout in PartiallyFinishedSourcesITCase --- .../runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java | 5 + 1 file changed, 5 insertions(+) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java index eec2eebcc96..54ef5d6411d 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java @@ -40,6 +40,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; +import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -48,6 +49,8 @@ import static java.util.Arrays.asList; import static java.util.stream.StreamSupport.stream; import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart; import static org.apache.flink.changelog.fs.FsStateChangelogOptions.BASE_PATH; +import static org.apache.flink.changelog.fs.FsStateChangelogOptions.RETRY_MAX_ATTEMPTS; +import static org.apache.flink.changelog.fs.FsStateChangelogOptions.UPLOAD_TIMEOUT; import static org.apache.flink.changelog.fs.FsStateChangelogStorageFactory.IDENTIFIER; import static org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY; import static org.apache.flink.configuration.StateChangelogOptions.STATE_CHANGE_LOG_STORAGE; @@ -92,6 +95,8 @@ public class PartiallyFinishedSourcesITCase extends TestLogger { // can only be set on the cluster level; so we do it unconditionally here. configuration.setString(STATE_CHANGE_LOG_STORAGE, IDENTIFIER); configuration.setString(BASE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath()); +configuration.set(RETRY_MAX_ATTEMPTS, 10); +configuration.set(UPLOAD_TIMEOUT, Duration.ofMinutes(1)); miniClusterResource = new MiniClusterWithClientResource(
[flink] branch release-1.15 updated: [FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.15 by this push: new 51660f840cf [FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase 51660f840cf is described below commit 51660f840cfc505b9b9cb72530fde7f9f8a4dee2 Author: Roman Khachatryan AuthorDate: Sat Feb 25 21:24:46 2023 + [FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase - Only obtain execution exception if the job is in globally terminal state - [hotfix] Unsubscribe finished TestEventSources from test commands. Otherwise, any command with SINGLE_SUBTASK scope might be dispatched to a finished source. This will result in TestJobExecutor.waitForFailover timing out while waiting for the command to be executed and ACKed. - [hotfix] Mark TestEventSource.scheduledCommands volatile - [hotfix] Make sure to process all commands in TestEventSource --- .../operators/lifecycle/TestJobExecutor.java | 25 +++--- .../command/SharedTestCommandDispatcher.java | 5 + .../lifecycle/command/TestCommandDispatcher.java | 2 ++ .../command/TestCommandDispatcherImpl.java | 5 + .../operators/lifecycle/graph/TestEventSource.java | 23 5 files changed, 44 insertions(+), 16 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java index 418a746d40b..1ffa0a06d20 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java @@ -164,22 +164,23 @@ public class TestJobExecutor { } private void handleFailoverTimeout(TimeoutException e) throws Exception { +JobStatus jobStatus = miniClusterResource.getClusterClient().getJobStatus(jobID).get(); String message = String.format( "Unable to failover the job: %s; job status: %s", -e.getMessage(), - miniClusterResource.getClusterClient().getJobStatus(jobID).get()); -Optional throwable = -miniClusterResource -.getClusterClient() -.requestJobResult(jobID) -.get() -.getSerializedThrowable(); -if (throwable.isPresent()) { -throw new RuntimeException(message, throwable.get()); -} else { -throw new RuntimeException(message); +e.getMessage(), jobStatus); +if (jobStatus.isGloballyTerminalState()) { +Optional throwable = +miniClusterResource +.getClusterClient() +.requestJobResult(jobID) +.get() +.getSerializedThrowable(); +if (throwable.isPresent()) { +throw new RuntimeException(message, throwable.get()); +} } +throw new RuntimeException(message); } public TestJobExecutor sendBroadcastCommand(TestCommand command, TestCommandScope scope) { diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java index 3ad52515808..d91c96f5cbc 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java @@ -45,4 +45,9 @@ class SharedTestCommandDispatcher implements TestCommandDispatcher { public void broadcast(TestCommand testCommand, TestCommandScope scope) { ref.get().broadcast(testCommand, scope); } + +@Override +public void unsubscribe(String operatorID, CommandExecutor commandExecutor) { +ref.get().unsubscribe(operatorID, commandExecutor); +} } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java index cdf46495102..3af13663b48 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java @@ -33,6 +33,8 @@ public
[flink] branch release-1.17 updated: [FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new 6e7703738cd [FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase 6e7703738cd is described below commit 6e7703738cdefed17277ea86d2c9dc25393eceac Author: Roman Khachatryan AuthorDate: Sat Feb 25 21:24:46 2023 + [FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase - Only obtain execution exception if the job is in globally terminal state - [hotfix] Unsubscribe finished TestEventSources from test commands. Otherwise, any command with SINGLE_SUBTASK scope might be dispatched to a finished source. This will result in TestJobExecutor.waitForFailover timing out while waiting for the command to be executed and ACKed. - [hotfix] Mark TestEventSource.scheduledCommands volatile - [hotfix] Make sure to process all commands in TestEventSource --- .../operators/lifecycle/TestJobExecutor.java | 25 +++--- .../command/SharedTestCommandDispatcher.java | 5 + .../lifecycle/command/TestCommandDispatcher.java | 2 ++ .../command/TestCommandDispatcherImpl.java | 5 + .../operators/lifecycle/graph/TestEventSource.java | 23 5 files changed, 44 insertions(+), 16 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java index 9082702c3d1..5df5fc4fdf4 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java @@ -164,22 +164,23 @@ public class TestJobExecutor { } private void handleFailoverTimeout(TimeoutException e) throws Exception { +JobStatus jobStatus = miniClusterResource.getClusterClient().getJobStatus(jobID).get(); String message = String.format( "Unable to failover the job: %s; job status: %s", -e.getMessage(), - miniClusterResource.getClusterClient().getJobStatus(jobID).get()); -Optional throwable = -miniClusterResource -.getClusterClient() -.requestJobResult(jobID) -.get() -.getSerializedThrowable(); -if (throwable.isPresent()) { -throw new RuntimeException(message, throwable.get()); -} else { -throw new RuntimeException(message); +e.getMessage(), jobStatus); +if (jobStatus.isGloballyTerminalState()) { +Optional throwable = +miniClusterResource +.getClusterClient() +.requestJobResult(jobID) +.get() +.getSerializedThrowable(); +if (throwable.isPresent()) { +throw new RuntimeException(message, throwable.get()); +} } +throw new RuntimeException(message); } public TestJobExecutor sendBroadcastCommand(TestCommand command, TestCommandScope scope) { diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java index 3ad52515808..d91c96f5cbc 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java @@ -45,4 +45,9 @@ class SharedTestCommandDispatcher implements TestCommandDispatcher { public void broadcast(TestCommand testCommand, TestCommandScope scope) { ref.get().broadcast(testCommand, scope); } + +@Override +public void unsubscribe(String operatorID, CommandExecutor commandExecutor) { +ref.get().unsubscribe(operatorID, commandExecutor); +} } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java index cdf46495102..3af13663b48 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java @@ -33,6 +33,8 @@ public
[flink] branch release-1.16 updated (01c8eb59c1b -> cf04b2c08fa)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from 01c8eb59c1b [FLINK-31288][doc] Update doc ralated to overdraft buffer. add cf04b2c08fa [FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase No new revisions were added by this update. Summary of changes: .../operators/lifecycle/TestJobExecutor.java | 25 +++--- .../command/SharedTestCommandDispatcher.java | 5 + .../lifecycle/command/TestCommandDispatcher.java | 2 ++ .../command/TestCommandDispatcherImpl.java | 5 + .../operators/lifecycle/graph/TestEventSource.java | 23 5 files changed, 44 insertions(+), 16 deletions(-)
[flink] branch master updated: [FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4aacff572a9 [FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase 4aacff572a9 is described below commit 4aacff572a9e3996c5dee9273638831e4040c767 Author: Roman Khachatryan AuthorDate: Sat Feb 25 21:24:46 2023 + [FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase - Only obtain execution exception if the job is in globally terminal state - [hotfix] Unsubscribe finished TestEventSources from test commands. Otherwise, any command with SINGLE_SUBTASK scope might be dispatched to a finished source. This will result in TestJobExecutor.waitForFailover timing out while waiting for the command to be executed and ACKed. - [hotfix] Mark TestEventSource.scheduledCommands volatile - [hotfix] Make sure to process all commands in TestEventSource --- .../operators/lifecycle/TestJobExecutor.java | 25 +++--- .../command/SharedTestCommandDispatcher.java | 5 + .../lifecycle/command/TestCommandDispatcher.java | 2 ++ .../command/TestCommandDispatcherImpl.java | 5 + .../operators/lifecycle/graph/TestEventSource.java | 23 5 files changed, 44 insertions(+), 16 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java index 9082702c3d1..5df5fc4fdf4 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java @@ -164,22 +164,23 @@ public class TestJobExecutor { } private void handleFailoverTimeout(TimeoutException e) throws Exception { +JobStatus jobStatus = miniClusterResource.getClusterClient().getJobStatus(jobID).get(); String message = String.format( "Unable to failover the job: %s; job status: %s", -e.getMessage(), - miniClusterResource.getClusterClient().getJobStatus(jobID).get()); -Optional throwable = -miniClusterResource -.getClusterClient() -.requestJobResult(jobID) -.get() -.getSerializedThrowable(); -if (throwable.isPresent()) { -throw new RuntimeException(message, throwable.get()); -} else { -throw new RuntimeException(message); +e.getMessage(), jobStatus); +if (jobStatus.isGloballyTerminalState()) { +Optional throwable = +miniClusterResource +.getClusterClient() +.requestJobResult(jobID) +.get() +.getSerializedThrowable(); +if (throwable.isPresent()) { +throw new RuntimeException(message, throwable.get()); +} } +throw new RuntimeException(message); } public TestJobExecutor sendBroadcastCommand(TestCommand command, TestCommandScope scope) { diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java index 3ad52515808..d91c96f5cbc 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java @@ -45,4 +45,9 @@ class SharedTestCommandDispatcher implements TestCommandDispatcher { public void broadcast(TestCommand testCommand, TestCommandScope scope) { ref.get().broadcast(testCommand, scope); } + +@Override +public void unsubscribe(String operatorID, CommandExecutor commandExecutor) { +ref.get().unsubscribe(operatorID, commandExecutor); +} } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java index cdf46495102..3af13663b48 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java @@ -33,6 +33,8 @@ public interface TestCommandDispatch
[flink] 04/04: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e38a6709b57000e38bf044d0e55da3dd3ec3bde8 Author: David Moravek AuthorDate: Fri Jan 21 10:09:53 2022 +0100 [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler --- .../scheduler/adaptive/AdaptiveScheduler.java | 16 +- .../scheduler/adaptive/JobGraphJobInformation.java | 5 + .../adaptive/allocator/DefaultSlotAssigner.java| 77 .../allocator/JobAllocationsInformation.java | 112 .../adaptive/allocator/JobInformation.java | 2 + .../adaptive/allocator/SlotAllocator.java | 4 +- .../{JobInformation.java => SlotAssigner.java} | 36 ++-- .../allocator/SlotSharingSlotAllocator.java| 70 +++ .../allocator/StateLocalitySlotAssigner.java | 201 + .../allocator/SlotSharingSlotAllocatorTest.java| 161 + .../allocator/StateLocalitySlotAssignerTest.java | 166 + .../adaptive/allocator/TestJobInformation.java | 59 ++ .../scheduler/adaptive/allocator/TestSlotInfo.java | 10 +- .../adaptive/allocator/TestVertexInformation.java} | 49 ++--- .../adaptive/allocator/TestingSlotAllocator.java | 31 +--- 15 files changed, 802 insertions(+), 197 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index a876bb65eb2..2446d41c5c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -96,6 +96,7 @@ import org.apache.flink.runtime.scheduler.SchedulerUtils; import org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener; import org.apache.flink.runtime.scheduler.VertexParallelismInformation; import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation; import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots; import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; @@ -760,12 +761,15 @@ public class AdaptiveScheduler .isPresent(); } -private JobSchedulingPlan determineParallelism(SlotAllocator slotAllocator) +private JobSchedulingPlan determineParallelism( +SlotAllocator slotAllocator, @Nullable ExecutionGraph previousExecutionGraph) throws NoResourceAvailableException { return slotAllocator .determineParallelismAndCalculateAssignment( -jobInformation, declarativeSlotPool.getFreeSlotsInformation()) +jobInformation, +declarativeSlotPool.getFreeSlotsInformation(), + JobAllocationsInformation.fromGraph(previousExecutionGraph)) .orElseThrow( () -> new NoResourceAvailableException( @@ -921,8 +925,7 @@ public class AdaptiveScheduler public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) { final CompletableFuture executionGraphWithAvailableResourcesFuture = -createExecutionGraphWithAvailableResourcesAsync(); - + createExecutionGraphWithAvailableResourcesAsync(previousExecutionGraph); transitionToState( new CreatingExecutionGraph.Factory( this, @@ -932,12 +935,13 @@ public class AdaptiveScheduler } private CompletableFuture -createExecutionGraphWithAvailableResourcesAsync() { +createExecutionGraphWithAvailableResourcesAsync( +@Nullable ExecutionGraph previousExecutionGraph) { final JobSchedulingPlan schedulingPlan; final VertexParallelismStore adjustedParallelismStore; try { -schedulingPlan = determineParallelism(slotAllocator); +schedulingPlan = determineParallelism(slotAllocator, previousExecutionGraph); JobGraph adjustedJobGraph = jobInformation.copyJobGraph(); for (JobVertex vertex : adjustedJobGraph.getVertices()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java index 1b0bb60e7e6..2111b6e9c07 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
[flink] 03/04: [FLINK-21450][runtime] Restructure types passed between AdaptiveScheduler and SlotAssigner
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 416cb7aaa02c176e01485ff11ab4269f76b5e9e2 Author: Roman Khachatryan AuthorDate: Tue Feb 28 14:22:28 2023 + [FLINK-21450][runtime] Restructure types passed between AdaptiveScheduler and SlotAssigner Slot assignments are computed and consumed by SlotAllocator. This is expressed implicitly by extending VertexParallelism. This change tries to make that clear, while still allowing to assign slots to something other than Slot Sharing Groups. It does so by: 1. Introduce JobSchedulingPlan, computed and consumed by SlotAllocator. It couples VertexParallelism with slot assignments 2. Introduce determineParallelismAndCalculateAssignment method in addition to determineParallelism, specifically for assignments 3. Push the polymorphism of state assignments from VertexParallelism into the JobSchedulingPlan (slot assignment target) --- .../jobgraph/jsonplan/JsonPlanGenerator.java | 21 +--- .../scheduler/adaptive/AdaptiveScheduler.java | 28 +++--- .../scheduler/adaptive/CreatingExecutionGraph.java | 14 ++- .../scheduler/adaptive/JobSchedulingPlan.java | 105 +++ .../adaptive/allocator/SlotAllocator.java | 16 ++- .../allocator/SlotSharingSlotAllocator.java| 111 + .../adaptive/allocator/VertexParallelism.java | 43 ++-- .../VertexParallelismWithSlotSharing.java | 52 -- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 3 +- .../adaptive/CreatingExecutionGraphTest.java | 29 ++ .../allocator/SlotSharingSlotAllocatorTest.java| 62 ++-- .../adaptive/allocator/TestingSlotAllocator.java | 28 +++--- 12 files changed, 276 insertions(+), 236 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java index 8817c72282b..6cdcc77cc53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java @@ -33,27 +33,13 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator import org.apache.commons.text.StringEscapeUtils; import java.io.StringWriter; -import java.util.Collections; import java.util.List; -import java.util.Map; @Internal public class JsonPlanGenerator { private static final String NOT_SET = ""; private static final String EMPTY = "{}"; -private static final VertexParallelism EMPTY_VERTEX_PARALLELISM = -new VertexParallelism() { -@Override -public Map getMaxParallelismForVertices() { -return Collections.emptyMap(); -} - -@Override -public int getParallelism(JobVertexID jobVertexId) { -return -1; -} -}; public static String generatePlan(JobGraph jg) { return generatePlan( @@ -61,7 +47,7 @@ public class JsonPlanGenerator { jg.getName(), jg.getJobType(), jg.getVertices(), -EMPTY_VERTEX_PARALLELISM); +VertexParallelism.empty()); } public static String generatePlan( @@ -116,11 +102,12 @@ public class JsonPlanGenerator { // write the core properties JobVertexID vertexID = vertex.getID(); -int storeParallelism = vertexParallelism.getParallelism(vertexID); gen.writeStringField("id", vertexID.toString()); gen.writeNumberField( "parallelism", -storeParallelism != -1 ? storeParallelism : vertex.getParallelism()); +vertexParallelism +.getParallelismOptional(vertexID) +.orElse(vertex.getParallelism())); gen.writeStringField("operator", operator); gen.writeStringField("operator_strategy", operatorDescr); gen.writeStringField("description", description); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 0bdbb44da7c..a876bb65eb2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -760,11
[flink] branch master updated (18c73f1488b -> e38a6709b57)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 18c73f1488b [docs][hotfix] correct label in config configuration new d718342ad03 [hotfix][tests] Make LocalRecoveryITCase fail when allocations don't match new 676e8e5528f [FLINK-21450][runtime] Add previous ExecutionGraph to WaitingForResources AdaptiveScheduler state new 416cb7aaa02 [FLINK-21450][runtime] Restructure types passed between AdaptiveScheduler and SlotAssigner new e38a6709b57 [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../jobgraph/jsonplan/JsonPlanGenerator.java | 21 +- .../scheduler/adaptive/AdaptiveScheduler.java | 50 ++--- .../flink/runtime/scheduler/adaptive/Created.java | 2 +- .../scheduler/adaptive/CreatingExecutionGraph.java | 31 ++- .../scheduler/adaptive/JobGraphJobInformation.java | 5 + .../scheduler/adaptive/JobSchedulingPlan.java | 105 ++ .../runtime/scheduler/adaptive/Restarting.java | 5 +- .../scheduler/adaptive/StateTransitions.java | 6 +- .../scheduler/adaptive/WaitingForResources.java| 28 ++- .../adaptive/allocator/DefaultSlotAssigner.java| 77 .../allocator/JobAllocationsInformation.java | 112 +++ .../adaptive/allocator/JobInformation.java | 2 + .../adaptive/allocator/SlotAllocator.java | 18 +- .../{VertexParallelism.java => SlotAssigner.java} | 23 ++- .../allocator/SlotSharingSlotAllocator.java| 123 +--- .../allocator/StateLocalitySlotAssigner.java | 201 +++ .../adaptive/allocator/VertexParallelism.java | 43 +++- .../VertexParallelismWithSlotSharing.java | 52 - .../scheduler/adaptive/AdaptiveSchedulerTest.java | 3 +- .../runtime/scheduler/adaptive/CreatedTest.java| 3 +- .../adaptive/CreatingExecutionGraphTest.java | 52 +++-- .../runtime/scheduler/adaptive/RestartingTest.java | 2 +- .../adaptive/WaitingForResourcesTest.java | 9 +- .../allocator/SlotSharingSlotAllocatorTest.java| 219 +++-- .../allocator/StateLocalitySlotAssignerTest.java | 166 .../adaptive/allocator/TestJobInformation.java | 59 ++ .../scheduler/adaptive/allocator/TestSlotInfo.java | 10 +- .../adaptive/allocator/TestVertexInformation.java} | 49 +++-- .../adaptive/allocator/TestingSlotAllocator.java | 49 ++--- .../flink/test/recovery/LocalRecoveryITCase.java | 28 ++- 30 files changed, 1135 insertions(+), 418 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobSchedulingPlan.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java copy flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/{VertexParallelism.java => SlotAssigner.java} (62%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelismWithSlotSharing.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java copy flink-runtime/src/{main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java => test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java} (53%)
[flink] 02/04: [FLINK-21450][runtime] Add previous ExecutionGraph to WaitingForResources AdaptiveScheduler state
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 676e8e5528f99eb8ba5747f7489b0f02ee025dd6 Author: Roman Khachatryan AuthorDate: Mon Feb 20 23:08:47 2023 + [FLINK-21450][runtime] Add previous ExecutionGraph to WaitingForResources AdaptiveScheduler state Previous ExecutionGraph will be used in a subsequent commit to allocate workloads more optimally by taking previous allocations into account. --- .../scheduler/adaptive/AdaptiveScheduler.java | 12 ++ .../flink/runtime/scheduler/adaptive/Created.java | 2 +- .../scheduler/adaptive/CreatingExecutionGraph.java | 17 + .../runtime/scheduler/adaptive/Restarting.java | 5 +++- .../scheduler/adaptive/StateTransitions.java | 6 +++-- .../scheduler/adaptive/WaitingForResources.java| 28 +- .../runtime/scheduler/adaptive/CreatedTest.java| 3 ++- .../adaptive/CreatingExecutionGraphTest.java | 23 +++--- .../runtime/scheduler/adaptive/RestartingTest.java | 2 +- .../adaptive/WaitingForResourcesTest.java | 9 --- 10 files changed, 76 insertions(+), 31 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 4f7b559e86a..0bdbb44da7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -784,7 +784,7 @@ public class AdaptiveScheduler } @Override -public void goToWaitingForResources() { +public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph) { final ResourceCounter desiredResources = calculateDesiredResources(); declarativeSlotPool.setResourceRequirements(desiredResources); @@ -794,7 +794,8 @@ public class AdaptiveScheduler LOG, desiredResources, this.initialResourceAllocationTimeout, -this.resourceStabilizationTimeout)); +this.resourceStabilizationTimeout, +previousExecutionGraph)); } private ResourceCounter calculateDesiredResources() { @@ -916,14 +917,17 @@ public class AdaptiveScheduler } @Override -public void goToCreatingExecutionGraph() { +public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) { final CompletableFuture executionGraphWithAvailableResourcesFuture = createExecutionGraphWithAvailableResourcesAsync(); transitionToState( new CreatingExecutionGraph.Factory( -this, executionGraphWithAvailableResourcesFuture, LOG)); +this, +executionGraphWithAvailableResourcesFuture, +LOG, +previousExecutionGraph)); } private CompletableFuture diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java index 1dd9f1e8f16..798eeda352e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java @@ -69,7 +69,7 @@ class Created implements State { /** Starts the scheduling by going into the {@link WaitingForResources} state. */ void startScheduling() { -context.goToWaitingForResources(); +context.goToWaitingForResources(null); } /** Context of the {@link Created} state. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java index 0ec7437bb85..c87e58b2971 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java @@ -61,12 +61,15 @@ public class CreatingExecutionGraph implements State { private final Logger logger; private final OperatorCoordinatorHandlerFactory operatorCoordinatorHandlerFactory; +private final @Nullable ExecutionGraph previousExecutionGraph; + public CreatingExecutionGraph( Context context, CompletableFuture executionGraphWithParallelismFuture, Logger logger, -OperatorCoordinatorHandlerFactory
[flink] 01/04: [hotfix][tests] Make LocalRecoveryITCase fail when allocations don't match
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit d718342ad0311ea7481d1a1bd87c395aeff25928 Author: Roman Khachatryan AuthorDate: Mon Feb 20 20:30:19 2023 + [hotfix][tests] Make LocalRecoveryITCase fail when allocations don't match Currently, wrong allocation fails the task causing a restart, which eventually allows to fix the allocation by picking the right TM. This prevents the test from failure and hides the wrong allocation. --- .../flink/test/recovery/LocalRecoveryITCase.java | 28 -- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java index 1639cda95b9..c8e505c9c9f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.test.recovery; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.ListAccumulator; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.configuration.CheckpointingOptions; @@ -67,12 +68,14 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests local recovery by restarting Flink processes. */ @ExtendWith(TestLoggerExtension.class) class LocalRecoveryITCase { +private static final String ALLOCATION_FAILURES_ACCUMULATOR_NAME = "acc"; + @TempDir private File tmpDirectory; @Test @@ -108,7 +111,12 @@ class LocalRecoveryITCase { restartTaskManagerProcesses(taskManagerProcesses, parallelism - 1); -jobClient.getJobExecutionResult().get(waitingTimeInSeconds, TimeUnit.SECONDS); +List allocFailures = +jobClient +.getJobExecutionResult() +.get(waitingTimeInSeconds, TimeUnit.SECONDS) + .getAccumulatorResult(ALLOCATION_FAILURES_ACCUMULATOR_NAME); +assertTrue(allocFailures.isEmpty(), allocFailures.toString()); success = true; } finally { @@ -307,11 +315,17 @@ class LocalRecoveryITCase { new IllegalStateException( "Could not find corresponding TaskNameAllocationID information.")); -assertThat(myTaskNameAllocationId.getAllocationId()) -.withFailMessage( -"The task was deployed to AllocationID(%s) but it should have been deployed to AllocationID(%s) for local recovery.", -allocationId, myTaskNameAllocationId.getAllocationId()) -.isEqualTo(allocationId); +runtimeContext.addAccumulator( +ALLOCATION_FAILURES_ACCUMULATOR_NAME, new ListAccumulator()); +if (!allocationId.equals(myTaskNameAllocationId.getAllocationId())) { +runtimeContext + .getAccumulator(ALLOCATION_FAILURES_ACCUMULATOR_NAME) +.add( +String.format( +"The task was deployed to AllocationID(%s) but it should have been deployed to AllocationID(%s) for local recovery.", +allocationId, + myTaskNameAllocationId.getAllocationId())); +} // terminate running = false; }
[flink] branch release-1.16 updated (f0e0069a741 -> 700f8839126)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from f0e0069a741 [FLINK-31031][python] Disable the output buffer of Python process add 700f8839126 [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry No new revisions were added by this update. Summary of changes: .../fs/AbstractStateChangeFsUploader.java | 19 +-- .../flink/changelog/fs/FsStateChangelogWriter.java | 8 +- .../flink/changelog/fs/TaskChangelogRegistry.java | 28 ++--- .../changelog/fs/TaskChangelogRegistryImpl.java| 46 --- .../fs/DiscardRecordableStateChangeUploader.java | 73 +++ .../changelog/fs/FsStateChangelogWriterTest.java | 140 + .../fs/TaskChangelogRegistryImplTest.java | 22 ++-- .../fs/TestingBatchingUploadScheduler.java | 41 +++--- .../state/changelog/ChangelogStateDiscardTest.java | 39 +++--- 9 files changed, 322 insertions(+), 94 deletions(-) create mode 100644 flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/DiscardRecordableStateChangeUploader.java copy flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestManualCacheReloadTrigger.java => flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingBatchingUploadScheduler.java (53%)
[flink] branch release-1.17 updated: [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new c612575ed33 [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry c612575ed33 is described below commit c612575ed339b319d9822dd7cbc59e3d972fe5ed Author: wangfeifan AuthorDate: Wed Feb 1 00:04:50 2023 +0800 [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry Co-authored-by: Yanfei Lei <18653940+fre...@users.noreply.github.com> --- .../fs/AbstractStateChangeFsUploader.java | 19 +-- .../flink/changelog/fs/FsStateChangelogWriter.java | 8 +- .../flink/changelog/fs/TaskChangelogRegistry.java | 28 ++--- .../changelog/fs/TaskChangelogRegistryImpl.java| 46 --- .../fs/DiscardRecordableStateChangeUploader.java | 73 +++ .../changelog/fs/FsStateChangelogWriterTest.java | 140 + .../fs/TaskChangelogRegistryImplTest.java | 22 ++-- .../fs/TestingBatchingUploadScheduler.java | 52 .../state/changelog/ChangelogStateDiscardTest.java | 39 +++--- 9 files changed, 352 insertions(+), 75 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java index 3dd3dfcc70f..55bdfeac179 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java @@ -33,7 +33,6 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.function.BiFunction; -import java.util.stream.Collectors; /** Base implementation of StateChangeUploader. */ public abstract class AbstractStateChangeFsUploader implements StateChangeUploader { @@ -80,22 +79,16 @@ public abstract class AbstractStateChangeFsUploader implements StateChangeUpload for (UploadTask task : tasks) { tasksOffsets.put(task, format.write(stream, task.changeSets)); } + +long numOfChangeSets = tasks.stream().flatMap(t -> t.getChangeSets().stream()).count(); + StreamStateHandle handle = stream.getHandle(handleFactory); -changelogRegistry.startTracking( -handle, -tasks.stream() -.flatMap(t -> t.getChangeSets().stream()) -.map(StateChangeSet::getLogId) -.collect(Collectors.toSet())); +changelogRegistry.startTracking(handle, numOfChangeSets); + if (stream instanceof DuplicatingOutputStreamWithPos) { StreamStateHandle localHandle = ((DuplicatingOutputStreamWithPos) stream).getSecondaryHandle(handleFactory); -changelogRegistry.startTracking( -localHandle, -tasks.stream() -.flatMap(t -> t.getChangeSets().stream()) -.map(StateChangeSet::getLogId) -.collect(Collectors.toSet())); +changelogRegistry.startTracking(localHandle, numOfChangeSets); return new UploadTasksResult(tasksOffsets, handle, localHandle); } // WARN: streams have to be closed before returning the results diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java index f72c50a9d88..83a4b2d5861 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java @@ -288,9 +288,9 @@ class FsStateChangelogWriter implements StateChangelogWriter notUsedState) { LOG.trace("Uploaded state to discard: {}", notUsedState); for (UploadResult result : notUsedState.values()) { -changelogRegistry.notUsed(result.streamStateHandle, logId); +changelogRegistry.release(result.streamStateHandle); if (result.localStreamHandle != null) { -changelogRegistry.notUsed(result.localStreamHandle, logId); +changelogRegistry.release(result.localStreamHandle); } } } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/TaskChangelogRegistry.java b/flink-dstl/flink-dstl-dfs/src/m
[flink] branch master updated: [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c0aa73df4df [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry c0aa73df4df is described below commit c0aa73df4df4e39c138f2cddaeb8efad6c831d03 Author: wangfeifan AuthorDate: Wed Feb 1 00:04:50 2023 +0800 [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry Co-authored-by: Yanfei Lei <18653940+fre...@users.noreply.github.com> --- .../fs/AbstractStateChangeFsUploader.java | 19 +-- .../flink/changelog/fs/FsStateChangelogWriter.java | 8 +- .../flink/changelog/fs/TaskChangelogRegistry.java | 28 ++--- .../changelog/fs/TaskChangelogRegistryImpl.java| 46 --- .../fs/DiscardRecordableStateChangeUploader.java | 73 +++ .../changelog/fs/FsStateChangelogWriterTest.java | 140 + .../fs/TaskChangelogRegistryImplTest.java | 22 ++-- .../fs/TestingBatchingUploadScheduler.java | 52 .../state/changelog/ChangelogStateDiscardTest.java | 39 +++--- 9 files changed, 352 insertions(+), 75 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java index 3dd3dfcc70f..55bdfeac179 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java @@ -33,7 +33,6 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.function.BiFunction; -import java.util.stream.Collectors; /** Base implementation of StateChangeUploader. */ public abstract class AbstractStateChangeFsUploader implements StateChangeUploader { @@ -80,22 +79,16 @@ public abstract class AbstractStateChangeFsUploader implements StateChangeUpload for (UploadTask task : tasks) { tasksOffsets.put(task, format.write(stream, task.changeSets)); } + +long numOfChangeSets = tasks.stream().flatMap(t -> t.getChangeSets().stream()).count(); + StreamStateHandle handle = stream.getHandle(handleFactory); -changelogRegistry.startTracking( -handle, -tasks.stream() -.flatMap(t -> t.getChangeSets().stream()) -.map(StateChangeSet::getLogId) -.collect(Collectors.toSet())); +changelogRegistry.startTracking(handle, numOfChangeSets); + if (stream instanceof DuplicatingOutputStreamWithPos) { StreamStateHandle localHandle = ((DuplicatingOutputStreamWithPos) stream).getSecondaryHandle(handleFactory); -changelogRegistry.startTracking( -localHandle, -tasks.stream() -.flatMap(t -> t.getChangeSets().stream()) -.map(StateChangeSet::getLogId) -.collect(Collectors.toSet())); +changelogRegistry.startTracking(localHandle, numOfChangeSets); return new UploadTasksResult(tasksOffsets, handle, localHandle); } // WARN: streams have to be closed before returning the results diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java index f72c50a9d88..83a4b2d5861 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java @@ -288,9 +288,9 @@ class FsStateChangelogWriter implements StateChangelogWriter notUsedState) { LOG.trace("Uploaded state to discard: {}", notUsedState); for (UploadResult result : notUsedState.values()) { -changelogRegistry.notUsed(result.streamStateHandle, logId); +changelogRegistry.release(result.streamStateHandle); if (result.localStreamHandle != null) { -changelogRegistry.notUsed(result.localStreamHandle, logId); +changelogRegistry.release(result.localStreamHandle); } } } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/TaskChangelogRegistry.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/
[flink] branch release-1.17 updated: [FLINK-30561][state/changelog] fix changelog local cache file not found
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new 0d14c618825 [FLINK-30561][state/changelog] fix changelog local cache file not found 0d14c618825 is described below commit 0d14c6188252fadc1408034d73f232312c2f683f Author: wangfeifan AuthorDate: Mon Jan 16 11:15:39 2023 +0800 [FLINK-30561][state/changelog] fix changelog local cache file not found --- flink-dstl/flink-dstl-dfs/pom.xml | 8 ++ .../fs/ChangelogStreamHandleReaderWithCache.java | 35 --- .../ChangelogStreamHandleReaderWithCacheTest.java | 115 + 3 files changed, 143 insertions(+), 15 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/pom.xml b/flink-dstl/flink-dstl-dfs/pom.xml index b6b2a739c7d..ffad17f99ec 100644 --- a/flink-dstl/flink-dstl-dfs/pom.xml +++ b/flink-dstl/flink-dstl-dfs/pom.xml @@ -58,6 +58,14 @@ under the License. + + org.apache.flink + flink-core + ${project.version} + test + test-jar + + org.apache.flink flink-runtime diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java index 1501cd7482f..70f2f66ef35 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java @@ -161,23 +161,28 @@ class ChangelogStreamHandleReaderWithCache implements ChangelogStreamHandleReade private DataInputStream wrapStream(Path dfsPath, FileInputStream fin) { return new DataInputStream(new BufferedInputStream(fin)) { +private boolean closed = false; + @Override public void close() throws IOException { -try { -super.close(); -} finally { -cache.computeIfPresent( -dfsPath, -(key, value) -> { -value.release(); -if (value.getReferenceCounter() == NO_USING_REF_COUNT) { -cacheCleanScheduler.schedule( -() -> cleanCacheFile(dfsPath), -cacheIdleMillis, -TimeUnit.MILLISECONDS); -} -return value; -}); +if (!closed) { +closed = true; +try { +super.close(); +} finally { +cache.computeIfPresent( +dfsPath, +(key, value) -> { +value.release(); +if (value.getReferenceCounter() == NO_USING_REF_COUNT) { +cacheCleanScheduler.schedule( +() -> cleanCacheFile(dfsPath), +cacheIdleMillis, +TimeUnit.MILLISECONDS); +} +return value; +}); +} } } }; diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCacheTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCacheTest.java new file mode 100644 index 000..76a9c891f1b --- /dev/null +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCacheTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law o
[flink] branch master updated: [FLINK-30561][state/changelog] fix changelog local cache file not found
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new ba2b55df207 [FLINK-30561][state/changelog] fix changelog local cache file not found ba2b55df207 is described below commit ba2b55df207fb79ad776eaf64ec8a6c1ab27bac9 Author: wangfeifan AuthorDate: Mon Jan 16 11:15:39 2023 +0800 [FLINK-30561][state/changelog] fix changelog local cache file not found --- flink-dstl/flink-dstl-dfs/pom.xml | 8 ++ .../fs/ChangelogStreamHandleReaderWithCache.java | 35 --- .../ChangelogStreamHandleReaderWithCacheTest.java | 115 + 3 files changed, 143 insertions(+), 15 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/pom.xml b/flink-dstl/flink-dstl-dfs/pom.xml index 2854687103e..2e07089ceb2 100644 --- a/flink-dstl/flink-dstl-dfs/pom.xml +++ b/flink-dstl/flink-dstl-dfs/pom.xml @@ -58,6 +58,14 @@ under the License. + + org.apache.flink + flink-core + ${project.version} + test + test-jar + + org.apache.flink flink-runtime diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java index 1501cd7482f..70f2f66ef35 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java @@ -161,23 +161,28 @@ class ChangelogStreamHandleReaderWithCache implements ChangelogStreamHandleReade private DataInputStream wrapStream(Path dfsPath, FileInputStream fin) { return new DataInputStream(new BufferedInputStream(fin)) { +private boolean closed = false; + @Override public void close() throws IOException { -try { -super.close(); -} finally { -cache.computeIfPresent( -dfsPath, -(key, value) -> { -value.release(); -if (value.getReferenceCounter() == NO_USING_REF_COUNT) { -cacheCleanScheduler.schedule( -() -> cleanCacheFile(dfsPath), -cacheIdleMillis, -TimeUnit.MILLISECONDS); -} -return value; -}); +if (!closed) { +closed = true; +try { +super.close(); +} finally { +cache.computeIfPresent( +dfsPath, +(key, value) -> { +value.release(); +if (value.getReferenceCounter() == NO_USING_REF_COUNT) { +cacheCleanScheduler.schedule( +() -> cleanCacheFile(dfsPath), +cacheIdleMillis, +TimeUnit.MILLISECONDS); +} +return value; +}); +} } } }; diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCacheTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCacheTest.java new file mode 100644 index 000..76a9c891f1b --- /dev/null +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCacheTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed t
[flink] branch release-1.17 updated: [FLINK-30785][tests] Ignore completeExceptionally in e2e test logs
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new 05e687cdea8 [FLINK-30785][tests] Ignore completeExceptionally in e2e test logs 05e687cdea8 is described below commit 05e687cdea804b74534b991057b40b100aefefb3 Author: Roman Khachatryan AuthorDate: Mon Jan 30 15:53:47 2023 + [FLINK-30785][tests] Ignore completeExceptionally in e2e test logs --- flink-end-to-end-tests/test-scripts/common.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 047a05a7bb4..44f0ed4c5df 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -428,6 +428,7 @@ function internal_check_logs_for_exceptions { "org.apache.flink.runtime.JobException: Recovery is suppressed" \ "WARN akka.remote.ReliableDeliverySupervisor" \ "RecipientUnreachableException" \ + "completeExceptionally" \ "SerializedCheckpointException.unwrap") local all_allowed_exceptions=("${default_allowed_exceptions[@]}" "${additional_allowed_exceptions[@]}")
[flink] branch master updated (d9102ddd755 -> d7bbb763ded)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from d9102ddd755 [FLINK-29237][table] Remove RexSimplify from Flink code, SearchOperator code generation for RexUnknown.nullAs add d7bbb763ded [FLINK-30785][tests] Ignore completeExceptionally in e2e test logs No new revisions were added by this update. Summary of changes: flink-end-to-end-tests/test-scripts/common.sh | 1 + 1 file changed, 1 insertion(+)
[flink] branch master updated (b1e70aebd3e -> 7b656441617)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from b1e70aebd3e [FLINK-30846][runtime] Fix getSchedulerType for testing with flink.tests.enable-adaptive-scheduler new c5d13f8c7c2 [refactor][state / rocksdb] Introduce RocksDBMemory factory interface to facilitate testing new 7b656441617 [FLINK-30328][tests] Use RocksDBMemory factory interface in TaskManagerWideRocksDbMemorySharingITCase The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../state/EmbeddedRocksDBStateBackend.java | 14 +- .../state/RocksDBMemoryControllerUtils.java| 37 +++- .../streaming/state/RocksDBOperationUtils.java | 6 +- .../state/RocksDBSharedResourcesFactory.java | 12 +- .../state/RocksDBMemoryControllerUtilsTest.java| 77 +++-- .../state/RocksDBSharedResourcesFactoryTest.java | 3 +- .../TaskManagerWideRocksDbMemorySharingITCase.java | 191 +++-- 7 files changed, 146 insertions(+), 194 deletions(-)
[flink] 01/02: [refactor][state / rocksdb] Introduce RocksDBMemory factory interface to facilitate testing
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c5d13f8c7c20ec4b69a824c2b1a19ce578b80add Author: Roman Khachatryan AuthorDate: Sun Jan 29 14:19:57 2023 + [refactor][state / rocksdb] Introduce RocksDBMemory factory interface to facilitate testing --- .../state/EmbeddedRocksDBStateBackend.java | 14 +++- .../state/RocksDBMemoryControllerUtils.java| 37 +-- .../streaming/state/RocksDBOperationUtils.java | 6 +- .../state/RocksDBSharedResourcesFactory.java | 12 ++-- .../state/RocksDBMemoryControllerUtilsTest.java| 77 +++--- .../state/RocksDBSharedResourcesFactoryTest.java | 3 +- 6 files changed, 83 insertions(+), 66 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index eb2f3f48cc8..e65d925f453 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -30,6 +30,7 @@ import org.apache.flink.configuration.DescribedEnum; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.description.InlineElement; +import org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils.RocksDBMemoryFactory; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.Path; @@ -175,6 +176,9 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke * key-group range. */ private double overlapFractionThreshold; + +/** Factory for Write Buffer Manager and Block Cache. */ +private RocksDBMemoryFactory rocksDBMemoryFactory; // /** Creates a new {@code EmbeddedRocksDBStateBackend} for storing local state. */ @@ -203,6 +207,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke this.memoryConfiguration = new RocksDBMemoryConfiguration(); this.writeBatchSize = UNDEFINED_WRITE_BATCH_SIZE; this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD; +this.rocksDBMemoryFactory = RocksDBMemoryFactory.DEFAULT; } /** @@ -298,6 +303,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke checkArgument( overlapFractionThreshold >= 0 && this.overlapFractionThreshold <= 1, "Overlap fraction threshold of restoring should be between 0 and 1"); + +this.rocksDBMemoryFactory = original.rocksDBMemoryFactory; } // @@ -460,7 +467,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke final OpaqueMemoryResource sharedResources = RocksDBOperationUtils.allocateSharedCachesIfConfigured( -memoryConfiguration, env, managedMemoryFraction, LOG); +memoryConfiguration, env, managedMemoryFraction, LOG, rocksDBMemoryFactory); if (sharedResources != null) { LOG.info("Obtained shared RocksDB cache of size {} bytes", sharedResources.getSize()); } @@ -834,6 +841,11 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke this.writeBatchSize = writeBatchSize; } +/** Set RocksDBMemoryFactory. */ +public void setRocksDBMemoryFactory(RocksDBMemoryFactory rocksDBMemoryFactory) { +this.rocksDBMemoryFactory = checkNotNull(rocksDBMemoryFactory); +} + double getOverlapFractionThreshold() { return overlapFractionThreshold == UNDEFINED_OVERLAP_FRACTION_THRESHOLD ? RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue() diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java index 2ea022137b8..0cd4250ff3f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/
[flink] 02/02: [FLINK-30328][tests] Use RocksDBMemory factory interface in TaskManagerWideRocksDbMemorySharingITCase
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 7b65644161767e90f0834db76ba116d4d92449d5 Author: Roman Khachatryan AuthorDate: Sun Jan 29 14:20:51 2023 + [FLINK-30328][tests] Use RocksDBMemory factory interface in TaskManagerWideRocksDbMemorySharingITCase --- .../TaskManagerWideRocksDbMemorySharingITCase.java | 191 +++-- 1 file changed, 63 insertions(+), 128 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java index e633bc005ad..bbc4da21655 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java @@ -21,76 +21,57 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.time.Deadline; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; -import org.apache.flink.configuration.StateBackendOptions; -import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils.RocksDBMemoryFactory; import org.apache.flink.contrib.streaming.state.RocksDBOptions; -import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics; -import org.apache.flink.runtime.testutils.InMemoryReporter; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.testutils.junit.SharedObjects; +import org.apache.flink.testutils.junit.SharedReference; +import org.apache.flink.util.TestLogger; -import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.rocksdb.Cache; +import org.rocksdb.WriteBufferManager; -import java.math.BigInteger; -import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Random; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.concurrent.CopyOnWriteArrayList; import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart; -import static org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils.calculateActualCacheCapacity; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; -import static org.apache.flink.util.Preconditions.checkState; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** * Tests that {@link RocksDBOptions#FIX_PER_TM_MEMORY_SIZE} works as expected, i.e. make RocksDB use * the same BlockCache and WriteBufferManager objects. It does so using RocksDB metrics. */ -public class TaskManagerWideRocksDbMemorySharingITCase { +public class TaskManagerWideRocksDbMemorySharingITCase extends TestLogger { private static final int PARALLELISM = 4; private static final int NUMBER_OF_JOBS = 5; private static final int NUMBER_OF_TASKS = NUMBER_OF_JOBS * PARALLELISM; - private static final MemorySize SHARED_MEMORY = MemorySize.ofMebiBytes(NUMBER_OF_TASKS * 25); -private static final double WRITE_BUFFER_RATIO = 0.5; -private static final double EXPECTED_BLOCK_CACHE_SIZE = -calculateActualCacheCapacity(SHARED_MEMORY.getBytes(), WRITE_BUFFER_RATIO); -// try to check that the memory usage is limited -// however, there is no hard limit actually -// because of https://issues.apache.org/jira/browse/FLINK-15532 -private static final double EFFECTIVE_LIMIT = EXPECTED_BLOCK_CACHE_SIZE * 1.5; - -private static final int NUM_MEASUREMENTS = 100; - -private InMemoryReporter metricsReporter; private MiniClusterWithClientResource cluster; +@Rule public final SharedObjects sharedObjects = SharedObjects.create(); + @Before public void init() throws
[flink] branch master updated (c9e87fe410c -> abb060aa775)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from c9e87fe410c [FLINK-30692][sql-client] Introduce SingleSessionManager to support REMOVE JAR syntax (#21771) add abb060aa775 [FLINK-30328][tests] Temporarily ignore unstable TaskManagerWideRocksDbMemorySharingITCase No new revisions were added by this update. Summary of changes: .../flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java | 2 ++ 1 file changed, 2 insertions(+)
[flink] branch master updated (f8105bb584f -> b4fe4a42887)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from f8105bb584f [FLINK-30328][tests] Fix unstable TaskManagerWideRocksDbMemorySharingITCase add b4fe4a42887 [FLINK-30301][tests] Use NoOp HeartbeatServices in TaskExecutorTest No new revisions were added by this update. Summary of changes: .../configuration/HeartbeatManagerOptions.java | 6 +- ...nitorImpl.java => DefaultHeartbeatMonitor.java} | 15 +++-- .../runtime/heartbeat/HeartbeatManagerImpl.java| 4 +- .../heartbeat/HeartbeatManagerSenderImpl.java | 5 +- .../flink/runtime/heartbeat/HeartbeatServices.java | 66 ...eatServices.java => HeartbeatServicesImpl.java} | 71 +- .../runtime/heartbeat/NoOpHeartbeatServices.java | 52 .../runtime/dispatcher/AbstractDispatcherTest.java | 3 +- .../runtime/dispatcher/MiniDispatcherTest.java | 3 +- .../runtime/dispatcher/TestingDispatcher.java | 3 +- .../RecordingHeartbeatServices.java| 39 +--- .../heartbeat/TestingHeartbeatServices.java| 22 --- ...asterExecutionDeploymentReconciliationTest.java | 3 +- .../jobmaster/JobMasterPartitionReleaseTest.java | 3 +- .../flink/runtime/jobmaster/JobMasterTest.java | 13 ++-- .../runtime/jobmaster/utils/JobMasterBuilder.java | 3 +- .../resourcemanager/ResourceManagerTest.java | 9 ++- .../runtime/taskexecutor/TaskExecutorBuilder.java | 3 +- ...cutorExecutionDeploymentReconciliationTest.java | 4 +- .../TaskExecutorPartitionLifecycleTest.java| 4 +- .../runtime/taskexecutor/TaskExecutorTest.java | 30 ++--- .../TaskSubmissionTestEnvironment.java | 3 +- .../recovery/ProcessFailureCancelingITCase.java| 4 +- 23 files changed, 197 insertions(+), 171 deletions(-) rename flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/{HeartbeatMonitorImpl.java => DefaultHeartbeatMonitor.java} (92%) copy flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/{HeartbeatServices.java => HeartbeatServicesImpl.java} (50%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/NoOpHeartbeatServices.java rename flink-runtime/src/test/java/org/apache/flink/runtime/{taskexecutor => heartbeat}/RecordingHeartbeatServices.java (77%)
[flink] branch master updated (f38c2370e85 -> f8105bb584f)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from f38c2370e85 [FLINK-30278][streaming] Unset parallelism in the SinkTransformationTranslator if it wasn't set before add f8105bb584f [FLINK-30328][tests] Fix unstable TaskManagerWideRocksDbMemorySharingITCase No new revisions were added by this update. Summary of changes: .../DescriptiveStatisticsHistogramStatistics.java | 6 ++- .../TaskManagerWideRocksDbMemorySharingITCase.java | 49 ++ 2 files changed, 36 insertions(+), 19 deletions(-)
[flink] branch master updated (6b68bc6ccc1 -> c7a8205a221)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 6b68bc6ccc1 [FLINK-30275][tests] Fix off-by-one error in TaskExecutorTest.testSharedResourcesLifecycle add c7a8205a221 [FLINK-30275][tests] Fix off-by-one error in TaskExecutorTest.testSharedResourcesLifecycle - fixup No new revisions were added by this update. Summary of changes: .../java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] branch master updated (fb27e689350 -> 6b68bc6ccc1)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from fb27e689350 [FLINK-30240][table]Fix doc error reference to TableKind#MANAGED add 6b68bc6ccc1 [FLINK-30275][tests] Fix off-by-one error in TaskExecutorTest.testSharedResourcesLifecycle No new revisions were added by this update. Summary of changes: .../java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
[flink] branch master updated: [FLINK-29928][runtime, state] Share RocksDB memory across TM slots
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3b6d08e57f6 [FLINK-29928][runtime, state] Share RocksDB memory across TM slots 3b6d08e57f6 is described below commit 3b6d08e57f644cddcdac1fb5a110d44172652c3a Author: Roman Khachatryan AuthorDate: Fri Nov 18 11:06:19 2022 +0100 [FLINK-29928][runtime, state] Share RocksDB memory across TM slots --- docs/content.zh/docs/ops/state/state_backends.md | 2 +- docs/content/docs/ops/state/state_backends.md | 2 +- .../generated/rocksdb_configuration.html | 8 +- .../generated/state_backend_rocksdb_section.html | 8 +- .../state/api/runtime/SavepointEnvironment.java| 9 + .../flink/runtime/execution/Environment.java | 4 + .../flink/runtime/memory/SharedResources.java | 8 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 7 + .../runtime/taskexecutor/TaskManagerServices.java | 13 +- .../runtime/taskmanager/RuntimeEnvironment.java| 9 + .../org/apache/flink/runtime/taskmanager/Task.java | 7 + .../operators/testutils/DummyEnvironment.java | 6 + .../operators/testutils/MockEnvironment.java | 9 + .../runtime/taskexecutor/TaskExecutorTest.java | 147 +++- .../taskexecutor/TaskManagerServicesBuilder.java | 6 +- .../runtime/taskmanager/TaskAsyncCallTest.java | 2 + .../flink/runtime/taskmanager/TestTaskBuilder.java | 2 + .../flink/runtime/testutils/InMemoryReporter.java | 23 ++ .../runtime/util/JvmExitOnFatalErrorTest.java | 2 + .../state/EmbeddedRocksDBStateBackend.java | 2 +- .../state/RocksDBMemoryConfiguration.java | 5 + .../state/RocksDBMemoryControllerUtils.java| 11 +- .../streaming/state/RocksDBOperationUtils.java | 46 +--- .../contrib/streaming/state/RocksDBOptions.java| 21 +- .../streaming/state/RocksDBSharedResources.java| 4 + .../state/RocksDBSharedResourcesFactory.java | 183 +++ .../state/RocksDBSharedResourcesFactoryTest.java | 162 ++ .../tasks/InterruptSensitiveRestoreTest.java | 2 + .../runtime/tasks/StreamMockEnvironment.java | 9 + .../runtime/tasks/StreamTaskSystemExitTest.java| 2 + .../runtime/tasks/StreamTaskTerminationTest.java | 2 + .../runtime/tasks/SynchronousCheckpointITCase.java | 2 + .../tasks/TaskCheckpointingBehaviourTest.java | 2 + .../TaskManagerWideRocksDbMemorySharingITCase.java | 249 + 34 files changed, 913 insertions(+), 63 deletions(-) diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md index a987992e472..a78339228a6 100644 --- a/docs/content.zh/docs/ops/state/state_backends.md +++ b/docs/content.zh/docs/ops/state/state_backends.md @@ -235,7 +235,7 @@ Flink还提供了两个参数来控制*写路径*(MemTable)和*读路径*( 注意 上述机制开启时将覆盖用户在 [`PredefinedOptions`](#predefined-per-columnfamily-options) 和 [`RocksDBOptionsFactory`](#passing-options-factory-to-rocksdb) 中对 block cache 和 write buffer 进行的配置。 注意 *仅面向专业用户*:若要手动控制内存,可以将 `state.backend.rocksdb.memory.managed` 设置为 `false`,并通过 [`ColumnFamilyOptions`](#passing-options-factory-to-rocksdb) 配置 RocksDB。 -或者可以复用上述 cache/write-buffer-manager 机制,但将内存大小设置为与 Flink 的托管内存大小无关的固定大小(通过 `state.backend.rocksdb.memory.fixed-per-slot` 选项)。 +或者可以复用上述 cache/write-buffer-manager 机制,但将内存大小设置为与 Flink 的托管内存大小无关的固定大小(通过 `state.backend.rocksdb.memory.fixed-per-slot`/`state.backend.rocksdb.memory.fixed-per-tm` 选项)。 注意在这两种情况下,用户都需要确保在 JVM 之外有足够的内存可供 RocksDB 使用。 diff --git a/docs/content/docs/ops/state/state_backends.md b/docs/content/docs/ops/state/state_backends.md index b4b2ac2eec3..28c7b02fbe6 100644 --- a/docs/content/docs/ops/state/state_backends.md +++ b/docs/content/docs/ops/state/state_backends.md @@ -226,7 +226,7 @@ When the above described mechanism (`cache` and `write buffer manager`) is enabl {{< /hint >}} {{< details "Expert Mode" >}} -To control memory manually, you can set `state.backend.rocksdb.memory.managed` to `false` and configure RocksDB via [`ColumnFamilyOptions`](#passing-options-factory-to-rocksdb). Alternatively, you can use the above mentioned cache/buffer-manager mechanism, but set the memory size to a fixed amount independent of Flink's managed memory size (`state.backend.rocksdb.memory.fixed-per-slot` option). Note that in both cases, users need to ensure on their own that enough memory is available out [...] +To control memory manually, you can set `state.backend.rocksdb.memory.managed` to `false` and configure RocksDB via [`ColumnFamilyOptions`](#passing-options-factory-to-rocksdb). Alternatively, you can use the above mentioned cache/buffer-manager mechanism, but set the memory size to a fixed amount independent of Flink
[flink] 01/03: [hotfix][tests] Use negative changelog materialization interval in tests
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git commit 258c3e35265bb3a966bd317340f2a5fe7cfd7364 Author: Roman Khachatryan AuthorDate: Wed Aug 24 18:20:02 2022 +0200 [hotfix][tests] Use negative changelog materialization interval in tests ...instead of a big value so that the subsequent change won't break them. --- .../java/org/apache/flink/streaming/util/TestStreamEnvironment.java| 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index e9ec8314bf8..b56fdfe449c 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -138,8 +138,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { Duration.ofMillis(500), Duration.ofSeconds(1), Duration.ofSeconds(5), -Duration.ofSeconds( -Long.MAX_VALUE / 1000 /* max allowed by Duration.toMillis */)); +Duration.ofSeconds(-1)); miniCluster.overrideRestoreModeForChangelogStateBackend(); } }
[flink] 03/03: [FLINK-28976][state] Don't add extra delay to the 1st materialization
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git commit 493a1aa8556038283e256efc5368bd319bd06d17 Author: Roman Khachatryan AuthorDate: Mon Aug 15 19:30:36 2022 +0200 [FLINK-28976][state] Don't add extra delay to the 1st materialization --- .../apache/flink/state/changelog/PeriodicMaterializationManager.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java index 66c75c670e9..2dfe11d3886 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java @@ -114,7 +114,7 @@ class PeriodicMaterializationManager implements Closeable { LOG.info("Task {} starts periodic materialization", subtaskName); -scheduleNextMaterialization(periodicMaterializeDelay + initialDelay); +scheduleNextMaterialization(initialDelay); } }
[flink] 02/03: [hotfix][state] Fix logging in Materializer and make FLINK-28976 more explicit
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git commit 605a7118a018a716b7715834f5b3f63f3e7064af Author: Roman Khachatryan AuthorDate: Mon Aug 15 19:29:58 2022 +0200 [hotfix][state] Fix logging in Materializer and make FLINK-28976 more explicit (cherry picked from commit 0e083c119b24a9b5fa7af1ad274c9ee7111c07cc) --- .../state/changelog/PeriodicMaterializationManager.java | 13 + 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java index d51d7958cd3..66c75c670e9 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java @@ -114,7 +114,7 @@ class PeriodicMaterializationManager implements Closeable { LOG.info("Task {} starts periodic materialization", subtaskName); -scheduleNextMaterialization(initialDelay); +scheduleNextMaterialization(periodicMaterializeDelay + initialDelay); } } @@ -255,22 +255,19 @@ class PeriodicMaterializationManager implements Closeable { } private void scheduleNextMaterialization() { -scheduleNextMaterialization(0); +scheduleNextMaterialization(periodicMaterializeDelay); } // task thread and asyncOperationsThreadPool can access this method -private synchronized void scheduleNextMaterialization(long offset) { +private synchronized void scheduleNextMaterialization(long delay) { if (started && !periodicExecutor.isShutdown()) { LOG.info( "Task {} schedules the next materialization in {} seconds", subtaskName, -periodicMaterializeDelay / 1000); +delay / 1000); -periodicExecutor.schedule( -this::triggerMaterialization, -periodicMaterializeDelay + offset, -TimeUnit.MILLISECONDS); +periodicExecutor.schedule(this::triggerMaterialization, delay, TimeUnit.MILLISECONDS); } }
[flink] branch release-1.15 updated (62d7cc0ee4b -> 493a1aa8556)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git from 62d7cc0ee4b Update japicmp configuration for 1.15.2 new 258c3e35265 [hotfix][tests] Use negative changelog materialization interval in tests new 605a7118a01 [hotfix][state] Fix logging in Materializer and make FLINK-28976 more explicit new 493a1aa8556 [FLINK-28976][state] Don't add extra delay to the 1st materialization The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/state/changelog/PeriodicMaterializationManager.java | 11 --- .../apache/flink/streaming/util/TestStreamEnvironment.java| 3 +-- 2 files changed, 5 insertions(+), 9 deletions(-)
[flink] branch master updated (3149c621671 -> 91e1291e942)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 3149c621671 [FLINK-29096][table] Update documentation for JSON_VALUE special characters add a38b852bbbd [hotfix][state] Extract PeriodicMaterializationManager into state-backend-common add aaa35607866 [hotfix][tests] Use negative changelog materialization interval in tests add 0e083c119b2 [hotfix][state] Fix logging in Materializer and make FLINK-28976 more explicit add 91e1291e942 [FLINK-28976][state] Don't add extra delay to the 1st materialization No new revisions were added by this update. Summary of changes: .../flink-statebackend-changelog/pom.xml | 6 ++ .../changelog/ChangelogKeyedStateBackend.java | 10 +- .../state/changelog/ChangelogStateBackend.java | 2 + .../changelog/ChangelogKeyedStateBackendTest.java | 2 +- .../state/changelog/ChangelogMetricGroupTest.java | 8 +- .../changelog/ChangelogStateBackendTestUtils.java | 2 + .../state/changelog/ChangelogStateDiscardTest.java | 2 +- .../pom.xml| 51 +++--- .../ChangelogMaterializationMetricGroup.java | 14 +-- .../common}/PeriodicMaterializationManager.java| 104 + .../common/PeriodicMaterializationManagerTest.java | 67 + flink-state-backends/pom.xml | 1 + .../streaming/util/TestStreamEnvironment.java | 3 +- .../test/state/ChangelogRecoveryCachingITCase.java | 2 +- 14 files changed, 198 insertions(+), 76 deletions(-) copy flink-state-backends/{flink-statebackend-changelog => flink-statebackend-common}/pom.xml (63%) rename flink-state-backends/{flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog => flink-statebackend-common/src/main/java/org/apache/flink/state/common}/ChangelogMaterializationMetricGroup.java (79%) rename flink-state-backends/{flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog => flink-statebackend-common/src/main/java/org/apache/flink/state/common}/PeriodicMaterializationManager.java (79%) create mode 100644 flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java
[flink-benchmarks] branch master updated: [FLINK-28931] Fix BlockingPartitionBenchmark compile error
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git The following commit(s) were added to refs/heads/master by this push: new 4726194 [FLINK-28931] Fix BlockingPartitionBenchmark compile error 4726194 is described below commit 4726194e7e73a39a78b102a4ce2b4ece4d338ece Author: Roman Khachatryan AuthorDate: Thu Aug 11 15:22:18 2022 +0200 [FLINK-28931] Fix BlockingPartitionBenchmark compile error --- .../java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java index bcef395..c447049 100644 --- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java @@ -114,7 +114,7 @@ public class BlockingPartitionBenchmark extends BenchmarkBase { Integer.MAX_VALUE); } configuration.setBoolean( - NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, + NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED, compressionEnabled); configuration.setString( NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, subpartitionType);
[flink] branch release-1.15 updated: [FLINK-27570][runtime] Count checkpoint finalization failures in CheckpointFailureManager
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.15 by this push: new b24c5161162 [FLINK-27570][runtime] Count checkpoint finalization failures in CheckpointFailureManager b24c5161162 is described below commit b24c51611628807ecb78337dc7adae70d359c5b5 Author: 鲍健昕 <1411643...@qq.com> AuthorDate: Wed Jul 20 10:35:40 2022 +0800 [FLINK-27570][runtime] Count checkpoint finalization failures in CheckpointFailureManager --- .../runtime/checkpoint/CheckpointCoordinator.java | 12 ++-- .../checkpoint/CheckpointFailureManager.java | 3 +- .../runtime/checkpoint/DefaultCheckpointPlan.java | 5 +- .../checkpoint/FinishedTaskStateProvider.java | 23 ++- .../filesystem/FsCheckpointStorageAccess.java | 10 ++- .../checkpoint/CheckpointFailureManagerTest.java | 5 +- .../CheckpointFailureManagerITCase.java| 79 -- 7 files changed, 117 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 72a6b7032de..57c8cfc9f20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider.PartialFinishingNotSupportedByStateException; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -1342,18 +1343,21 @@ public class CheckpointCoordinator { } catch (Exception e1) { // abort the current pending checkpoint if we fails to finalize the pending // checkpoint. +final CheckpointFailureReason failureReason = +e1 instanceof PartialFinishingNotSupportedByStateException +? CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING +: CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE; + if (!pendingCheckpoint.isDisposed()) { abortPendingCheckpoint( -pendingCheckpoint, -new CheckpointException( - CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1)); +pendingCheckpoint, new CheckpointException(failureReason, e1)); } throw new CheckpointException( "Could not finalize the pending checkpoint " + pendingCheckpoint.getCheckpointID() + '.', -CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, +failureReason, e1); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java index 08cf49e41ee..8db1fe307a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java @@ -238,8 +238,8 @@ public class CheckpointFailureManager { case TASK_FAILURE: case TASK_CHECKPOINT_FAILURE: case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE: +// there are some edge cases shouldn't be counted as a failure, e.g. shutdown case TRIGGER_CHECKPOINT_FAILURE: -case FINALIZE_CHECKPOINT_FAILURE: // ignore break; @@ -247,6 +247,7 @@ public class CheckpointFailureManager { case CHECKPOINT_ASYNC_EXCEPTION: case CHECKPOINT_DECLINED: case CHECKPOINT_EXPIRED: +case FINALIZE_CHECKPOINT_FAILURE: // we should make sure one checkpoint only be counted once if (checkpointId == UNKNOWN_CHECKPOINT_ID || countedCheckpointIds.add(checkpointId)) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java index eaa7a595e7d..9253799a17a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/ru
[flink] branch master updated: [FLINK-27570][runtime] Count checkpoint finalization failures in CheckpointFailureManager
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 88b309b7dca [FLINK-27570][runtime] Count checkpoint finalization failures in CheckpointFailureManager 88b309b7dca is described below commit 88b309b7dcad269ad084eab5e2944724daf6dee4 Author: 鲍健昕 <1411643...@qq.com> AuthorDate: Wed Jul 20 10:35:40 2022 +0800 [FLINK-27570][runtime] Count checkpoint finalization failures in CheckpointFailureManager --- .../runtime/checkpoint/CheckpointCoordinator.java | 12 ++-- .../checkpoint/CheckpointFailureManager.java | 3 +- .../runtime/checkpoint/DefaultCheckpointPlan.java | 5 +- .../checkpoint/FinishedTaskStateProvider.java | 23 ++- .../filesystem/FsCheckpointStorageAccess.java | 10 ++- .../checkpoint/CheckpointFailureManagerTest.java | 5 +- .../CheckpointFailureManagerITCase.java| 79 -- 7 files changed, 117 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 1051dbc6bc0..0f5033ac8ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider.PartialFinishingNotSupportedByStateException; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -1365,18 +1366,21 @@ public class CheckpointCoordinator { } catch (Exception e1) { // abort the current pending checkpoint if we fails to finalize the pending // checkpoint. +final CheckpointFailureReason failureReason = +e1 instanceof PartialFinishingNotSupportedByStateException +? CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING +: CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE; + if (!pendingCheckpoint.isDisposed()) { abortPendingCheckpoint( -pendingCheckpoint, -new CheckpointException( - CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1)); +pendingCheckpoint, new CheckpointException(failureReason, e1)); } throw new CheckpointException( "Could not finalize the pending checkpoint " + pendingCheckpoint.getCheckpointID() + '.', -CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, +failureReason, e1); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java index 08cf49e41ee..8db1fe307a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java @@ -238,8 +238,8 @@ public class CheckpointFailureManager { case TASK_FAILURE: case TASK_CHECKPOINT_FAILURE: case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE: +// there are some edge cases shouldn't be counted as a failure, e.g. shutdown case TRIGGER_CHECKPOINT_FAILURE: -case FINALIZE_CHECKPOINT_FAILURE: // ignore break; @@ -247,6 +247,7 @@ public class CheckpointFailureManager { case CHECKPOINT_ASYNC_EXCEPTION: case CHECKPOINT_DECLINED: case CHECKPOINT_EXPIRED: +case FINALIZE_CHECKPOINT_FAILURE: // we should make sure one checkpoint only be counted once if (checkpointId == UNKNOWN_CHECKPOINT_ID || countedCheckpointIds.add(checkpointId)) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java index eaa7a595e7d..9253799a17a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/ru
[flink] branch master updated (465db25502e -> 3268ec6a7ce)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 465db25502e [FLINK-28904][python][docs] Add missing connector/format documentation add 3268ec6a7ce [FLINK-28898][state/changelog] Fix unstable ChangelogRecoverySwitchStateBackendITCase#testSwitchFromEnablingToDisablingWithRescalingOut No new revisions were added by this update. Summary of changes: .../test/checkpointing/ChangelogRecoveryITCaseBase.java | 12 +--- 1 file changed, 9 insertions(+), 3 deletions(-)
[flink] branch master updated (10a6f41fa12 -> c5a8b0f3c29)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 10a6f41fa12 [FLINK-28532][table] Support full caching in lookup join runner using InputFormats as scan runtime provider (#20447) add 6b725d13153 [FLINK-28529][state/changelog] Fix unstable ChangelogPeriodicMaterializationSwitchStateBackendITCase#testSwitchFromDisablingToEnablingInClaimMode add c5a8b0f3c29 [refactor][state/changelog] Rename ChangelogPeriodicMaterialization* to ChangelogRecovery* No new revisions were added by this update. Summary of changes: .../ChangelogLocalRecoveryITCase.java | 6 +-- ...ionITCase.java => ChangelogRecoveryITCase.java} | 5 +- ...tBase.java => ChangelogRecoveryITCaseBase.java} | 4 +- ...se.java => ChangelogRecoveryRescaleITCase.java} | 6 +-- ...ava => ChangelogRecoverySwitchEnvTestBase.java} | 38 +- ...ChangelogRecoverySwitchStateBackendITCase.java} | 59 -- 6 files changed, 66 insertions(+), 52 deletions(-) rename flink-tests/src/test/java/org/apache/flink/test/checkpointing/{ChangelogPeriodicMaterializationITCase.java => ChangelogRecoveryITCase.java} (98%) rename flink-tests/src/test/java/org/apache/flink/test/checkpointing/{ChangelogPeriodicMaterializationTestBase.java => ChangelogRecoveryITCaseBase.java} (99%) rename flink-tests/src/test/java/org/apache/flink/test/checkpointing/{ChangelogPeriodicMaterializationRescaleITCase.java => ChangelogRecoveryRescaleITCase.java} (88%) rename flink-tests/src/test/java/org/apache/flink/test/checkpointing/{ChangelogPeriodicMaterializationSwitchEnvTestBase.java => ChangelogRecoverySwitchEnvTestBase.java} (83%) rename flink-tests/src/test/java/org/apache/flink/test/checkpointing/{ChangelogPeriodicMaterializationSwitchStateBackendITCase.java => ChangelogRecoverySwitchStateBackendITCase.java} (79%)
[flink] branch master updated: [FLINK-28178][runtime-web] Show the delegated StateBackend and whether changelog is enabled in the UI
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9ed70a1e8b5 [FLINK-28178][runtime-web] Show the delegated StateBackend and whether changelog is enabled in the UI 9ed70a1e8b5 is described below commit 9ed70a1e8b5d59abdf9d7673bc5b44d421140ef0 Author: wangfeifan AuthorDate: Wed Jun 29 15:58:46 2022 +0800 [FLINK-28178][runtime-web] Show the delegated StateBackend and whether changelog is enabled in the UI --- docs/layouts/shortcodes/generated/rest_v1_dispatcher.html | 3 +++ docs/static/generated/rest_v1_dispatcher.yml | 2 ++ flink-runtime-web/src/test/resources/rest_api_v1.snapshot | 5 - .../web-dashboard/src/app/interfaces/job-checkpoint.ts| 1 + .../pages/job/checkpoints/job-checkpoints.component.html | 10 +- .../runtime/executiongraph/AccessExecutionGraph.java | 8 .../runtime/executiongraph/ArchivedExecutionGraph.java| 14 ++ .../runtime/executiongraph/DefaultExecutionGraph.java | 15 ++- .../handler/job/checkpoints/CheckpointConfigHandler.java | 6 ++ .../rest/messages/checkpoints/CheckpointConfigInfo.java | 9 + .../java/org/apache/flink/runtime/state/StateBackend.java | 9 + .../apache/flink/runtime/state/StateBackendLoader.java| 2 +- .../runtime/state/delegate/DelegatingStateBackend.java| 5 + .../legacy/utils/ArchivedExecutionGraphBuilder.java | 2 ++ .../messages/checkpoints/CheckpointConfigInfoTest.java| 1 + .../adaptive/StateTrackingMockExecutionGraph.java | 6 ++ 16 files changed, 90 insertions(+), 8 deletions(-) diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index 8d5e8191415..ea4b36e4be3 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -2335,6 +2335,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "state_backend" : { "type" : "string" }, +"state_changelog_enabled" : { + "type" : "boolean" +}, "timeout" : { "type" : "integer" }, diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index 2d8a79402ef..1cc1579dfb0 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -2864,6 +2864,8 @@ components: format: int64 checkpoints_after_tasks_finish: type: boolean +state_changelog_enabled: + type: boolean changelog_periodic_materialization_interval: type: integer format: int64 diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 85337fa8795..ef9be2485d2 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -1408,6 +1408,9 @@ "checkpoints_after_tasks_finish" : { "type" : "boolean" }, +"state_changelog_enabled" : { + "type" : "boolean" +}, "changelog_periodic_materialization_interval" : { "type" : "integer" }, @@ -3842,4 +3845,4 @@ } } } ] -} \ No newline at end of file +} diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts index 8dc270f1a73..f4f8ddae841 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts @@ -128,6 +128,7 @@ export interface CheckpointConfig { delete_on_cancellation: boolean; }; state_backend: string; + state_changelog_enabled: boolean; checkpoint_storage: string; unaligned_checkpoints: boolean; tolerable_failed_checkpoints: number; diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html index b82659dab74..3b7fb0f41bb 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html @@ -551,15 +551,15 @@ -Changelog state-backend -Enabled -Disa
[flink] branch master updated (0dc8890f1b8 -> 52eb7e76b5d)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 0dc8890f1b8 [FLINK-28711][hive] Hive source supports dynamic filtering new 1f9632a0719 [FLINK-27693][changelog] Support local recovery for non-materialized part new 52eb7e76b5d [FLINK-27693][docs] Remove local recovery from the Limitations of changelog The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/content.zh/docs/ops/state/state_backends.md | 2 - docs/content/docs/ops/state/state_backends.md | 2 - .../fs/AbstractStateChangeFsUploader.java | 120 ++ .../fs/DuplicatingOutputStreamWithPos.java | 174 + .../fs/DuplicatingStateChangeFsUploader.java | 111 + .../changelog/fs/FsStateChangelogStorage.java | 50 -- .../fs/FsStateChangelogStorageFactory.java | 8 +- .../flink/changelog/fs/FsStateChangelogWriter.java | 111 ++--- .../flink/changelog/fs/OutputStreamWithPos.java| 54 ++- .../flink/changelog/fs/StateChangeFormat.java | 10 +- .../flink/changelog/fs/StateChangeFsUploader.java | 104 ++-- .../changelog/fs/StateChangeUploadScheduler.java | 34 ++-- .../flink/changelog/fs/StateChangeUploader.java| 33 +++- .../apache/flink/changelog/fs/UploadResult.java| 41 - .../fs/BatchingStateChangeUploadSchedulerTest.java | 6 +- .../changelog/fs/ChangelogStorageMetricsTest.java | 45 -- .../changelog/fs/FsStateChangelogStorageTest.java | 7 +- .../fs/FsStateChangelogWriterSqnTest.java | 6 +- .../changelog/fs/FsStateChangelogWriterTest.java | 25 ++- .../state/ChangelogTaskLocalStateStore.java| 33 +++- .../TaskExecutorStateChangelogStoragesManager.java | 6 +- .../state/changelog/LocalChangelogRegistry.java| 64 .../changelog/LocalChangelogRegistryImpl.java | 118 ++ .../changelog/StateChangelogStorageFactory.java| 6 +- .../changelog/StateChangelogStorageLoader.java | 8 +- .../state/changelog/StateChangelogWriter.java | 21 ++- .../InMemoryStateChangelogStorageFactory.java | 6 +- .../inmemory/InMemoryStateChangelogWriter.java | 15 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 5 +- .../state/ChangelogTaskLocalStateStoreTest.java| 2 + ...kExecutorStateChangelogStoragesManagerTest.java | 60 +-- .../changelog/LocalChangelogRegistryTest.java | 56 +++ .../inmemory/StateChangelogStorageLoaderTest.java | 16 +- .../inmemory/StateChangelogStorageTest.java| 7 +- ...cutorExecutionDeploymentReconciliationTest.java | 14 +- .../taskexecutor/TaskExecutorSlotLifetimeTest.java | 14 ++ .../changelog/ChangelogKeyedStateBackend.java | 28 +++- .../state/changelog/ChangelogTruncateHelper.java | 1 + .../changelog/ChangelogStateBackendTestUtils.java | 4 +- .../state/changelog/ChangelogStateDiscardTest.java | 20 ++- .../state/changelog/StateChangeLoggerTestBase.java | 7 +- .../ChangelogLocalRecoveryITCase.java | 6 +- 42 files changed, 1219 insertions(+), 241 deletions(-) create mode 100644 flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java create mode 100644 flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java create mode 100644 flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
[flink] 02/02: [FLINK-27693][docs] Remove local recovery from the Limitations of changelog
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 52eb7e76b5d66ff5c4d9d4af8a213b5b8f9f8322 Author: fredia AuthorDate: Mon Aug 8 23:45:45 2022 +0800 [FLINK-27693][docs] Remove local recovery from the Limitations of changelog --- docs/content.zh/docs/ops/state/state_backends.md | 2 -- docs/content/docs/ops/state/state_backends.md| 2 -- 2 files changed, 4 deletions(-) diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md index 01080f7efad..29606369f34 100644 --- a/docs/content.zh/docs/ops/state/state_backends.md +++ b/docs/content.zh/docs/ops/state/state_backends.md @@ -406,7 +406,6 @@ dstl.dfs.base-path: s3:// # 类似于 state.checkpoints.dir 请将如下配置保持默认值 (参见[限制](#limitations)): ```yaml execution.checkpointing.max-concurrent-checkpoints: 1 -state.backend.local-recovery: false ``` 有关其他配置选项,请参阅[配置]({{< ref "docs/deployment/config#state-changelog-options" >}})部分。 @@ -465,7 +464,6 @@ env.enable_changelog_statebackend(true) ### 限制 - 最多同时创建一个 checkpoint -- 本地恢复暂不支持 - 到 Flink 1.15 为止, 只有 `filesystem` changelog 实现可用 - 尚不支持 [NO_CLAIM]({{< ref "docs/deployment/config#execution-savepoint-restore-mode" >}}) 模式 diff --git a/docs/content/docs/ops/state/state_backends.md b/docs/content/docs/ops/state/state_backends.md index 16d3b8f4515..9835c9fe52a 100644 --- a/docs/content/docs/ops/state/state_backends.md +++ b/docs/content/docs/ops/state/state_backends.md @@ -407,7 +407,6 @@ dstl.dfs.base-path: s3:// # similar to state.checkpoints.dir Please keep the following defaults (see [limitations](#limitations)): ```yaml execution.checkpointing.max-concurrent-checkpoints: 1 -state.backend.local-recovery: false ``` Please refer to the [configuration section]({{< ref "docs/deployment/config#state-changelog-options" >}}) for other options. @@ -460,7 +459,6 @@ Resuming from both savepoints and checkpoints is supported: ### Limitations - At most one concurrent checkpoint - - Local recovery not supported - As of Flink 1.15, only `filesystem` changelog implementation is available - [NO_CLAIM]({{< ref "docs/deployment/config#execution-savepoint-restore-mode" >}}) mode not supported
[flink] 01/02: [FLINK-27693][changelog] Support local recovery for non-materialized part
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 1f9632a07199854c0225bd7f416c038fbf59abe0 Author: fredia AuthorDate: Tue May 31 15:22:59 2022 +0800 [FLINK-27693][changelog] Support local recovery for non-materialized part --- .../fs/AbstractStateChangeFsUploader.java | 120 ++ .../fs/DuplicatingOutputStreamWithPos.java | 174 + .../fs/DuplicatingStateChangeFsUploader.java | 111 + .../changelog/fs/FsStateChangelogStorage.java | 50 -- .../fs/FsStateChangelogStorageFactory.java | 8 +- .../flink/changelog/fs/FsStateChangelogWriter.java | 111 ++--- .../flink/changelog/fs/OutputStreamWithPos.java| 54 ++- .../flink/changelog/fs/StateChangeFormat.java | 10 +- .../flink/changelog/fs/StateChangeFsUploader.java | 104 ++-- .../changelog/fs/StateChangeUploadScheduler.java | 34 ++-- .../flink/changelog/fs/StateChangeUploader.java| 33 +++- .../apache/flink/changelog/fs/UploadResult.java| 41 - .../fs/BatchingStateChangeUploadSchedulerTest.java | 6 +- .../changelog/fs/ChangelogStorageMetricsTest.java | 45 -- .../changelog/fs/FsStateChangelogStorageTest.java | 7 +- .../fs/FsStateChangelogWriterSqnTest.java | 6 +- .../changelog/fs/FsStateChangelogWriterTest.java | 25 ++- .../state/ChangelogTaskLocalStateStore.java| 33 +++- .../TaskExecutorStateChangelogStoragesManager.java | 6 +- .../state/changelog/LocalChangelogRegistry.java| 64 .../changelog/LocalChangelogRegistryImpl.java | 118 ++ .../changelog/StateChangelogStorageFactory.java| 6 +- .../changelog/StateChangelogStorageLoader.java | 8 +- .../state/changelog/StateChangelogWriter.java | 21 ++- .../InMemoryStateChangelogStorageFactory.java | 6 +- .../inmemory/InMemoryStateChangelogWriter.java | 15 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 5 +- .../state/ChangelogTaskLocalStateStoreTest.java| 2 + ...kExecutorStateChangelogStoragesManagerTest.java | 60 +-- .../changelog/LocalChangelogRegistryTest.java | 56 +++ .../inmemory/StateChangelogStorageLoaderTest.java | 16 +- .../inmemory/StateChangelogStorageTest.java| 7 +- ...cutorExecutionDeploymentReconciliationTest.java | 14 +- .../taskexecutor/TaskExecutorSlotLifetimeTest.java | 14 ++ .../changelog/ChangelogKeyedStateBackend.java | 28 +++- .../state/changelog/ChangelogTruncateHelper.java | 1 + .../changelog/ChangelogStateBackendTestUtils.java | 4 +- .../state/changelog/ChangelogStateDiscardTest.java | 20 ++- .../state/changelog/StateChangeLoggerTestBase.java | 7 +- .../ChangelogLocalRecoveryITCase.java | 6 +- 40 files changed, 1219 insertions(+), 237 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java new file mode 100644 index 000..3dd3dfcc70f --- /dev/null +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.SystemClock; + +import org.apache.flink.shaded.guava30.com.google.common.io.Closer; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** Base implementation of StateChangeUploader. */ +public abstract class AbstractStateChangeFsUploader implements
[flink] branch master updated: [FLINK-27155][changelog] Reduce multiple reads to the same Changelog file in the same taskmanager during restore
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1ec4acd2c99 [FLINK-27155][changelog] Reduce multiple reads to the same Changelog file in the same taskmanager during restore 1ec4acd2c99 is described below commit 1ec4acd2c993409092ddcb7121e2c9647bb4a086 Author: wangfeifan AuthorDate: Sun Apr 17 15:34:07 2022 +0800 [FLINK-27155][changelog] Reduce multiple reads to the same Changelog file in the same taskmanager during restore --- .../fs_state_changelog_configuration.html | 6 + .../changelog/fs/ChangelogStreamHandleReader.java | 30 ++- .../fs/ChangelogStreamHandleReaderWithCache.java | 221 ++ .../flink/changelog/fs/ChangelogStreamWrapper.java | 62 + .../changelog/fs/FsStateChangelogOptions.java | 8 + .../changelog/fs/FsStateChangelogStorage.java | 1 + .../fs/FsStateChangelogStorageFactory.java | 5 +- .../fs/FsStateChangelogStorageForRecovery.java | 17 +- .../flink/changelog/fs/StateChangeFormat.java | 35 +-- ...rRecovery.java => StateChangeIteratorImpl.java} | 30 +-- .../api/runtime/SavepointTaskStateManager.java | 10 + .../TaskExecutorStateChangelogStoragesManager.java | 95 +++- .../flink/runtime/state/TaskStateManager.java | 10 + .../flink/runtime/state/TaskStateManagerImpl.java | 26 +++ .../changelog/StateChangelogStorageFactory.java| 2 +- .../changelog/StateChangelogStorageLoader.java | 5 +- .../InMemoryStateChangelogStorageFactory.java | 2 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 3 +- ...kExecutorStateChangelogStoragesManagerTest.java | 5 +- .../runtime/state/TaskStateManagerImplTest.java| 4 + .../flink/runtime/state/TestTaskStateManager.java | 21 ++ .../inmemory/StateChangelogStorageLoaderTest.java | 3 +- .../runtime/util/JvmExitOnFatalErrorTest.java | 2 + .../state/changelog/ChangelogStateBackend.java | 2 + .../DeactivatedChangelogStateBackend.java | 3 + .../restore/ChangelogBackendRestoreOperation.java | 17 +- .../StateInitializationContextImplTest.java| 2 + .../runtime/tasks/LocalStateForwardingTest.java| 2 + .../streaming/runtime/tasks/StreamTaskTest.java| 3 + .../test/state/ChangelogRecoveryCachingITCase.java | 253 + 30 files changed, 807 insertions(+), 78 deletions(-) diff --git a/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html b/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html index cc3b64dfcec..d5ca1bfadf1 100644 --- a/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html +++ b/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html @@ -38,6 +38,12 @@ Integer Number of threads to use to discard changelog (e.g. pre-emptively uploaded unused state). + +dstl.dfs.download.local-cache.idle-timeout-ms +10 min +Duration +Maximum idle time for cache files of distributed changelog file, after which the cache files will be deleted. + dstl.dfs.preemptive-persist-threshold 5 mb diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReader.java similarity index 50% copy from flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java copy to flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReader.java index f60ad065094..c633acf3bfd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReader.java @@ -16,29 +16,25 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.changelog; +package org.apache.flink.changelog.fs; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.state.StreamStateHandle; +import java.io.DataInputStream; import java.io.IOException; -/** - * A factory for {@link StateChangelogStorage}. Please use {@link StateChangelogStorageLoader} to - * create {@link StateChangelogStorage}. - */ +import static org.apache.flink.changelog.fs.ChangelogStreamWrapper.wrapAndSeek; + +/** Changelog handle reader to use by {@link StateChangeIteratorImpl}. */ @Internal -public interface StateChangelogStorageFact