[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files
pnowojski commented on code in PR #20151: URL: https://github.com/apache/flink/pull/20151#discussion_r1082600527 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java: ## @@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions { .booleanType() .defaultValue(false) .withDescription("Flag to enable approximate local recovery."); + +public static final ConfigOption CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE = + ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file") +.intType() +.defaultValue(5) +.withDescription( +"Defines the maximum number of subtasks that share the same channel state file. " ++ "It can reduce the number of small files when enable unaligned checkpoint. " ++ "Each subtask will create a new channel state file when this is configured to 1."); Review Comment: It's only for unaligned checkpoints. There is a possibility we will have more options, but not very likely. So maybe `execution.checkpointing.unaligned.max-subtasks-per-channel-state-file`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files
pnowojski commented on code in PR #20151: URL: https://github.com/apache/flink/pull/20151#discussion_r1082365656 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java: ## @@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions { .booleanType() .defaultValue(false) .withDescription("Flag to enable approximate local recovery."); + +public static final ConfigOption CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE = + ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file") +.intType() +.defaultValue(5) +.withDescription( +"Defines the maximum number of subtasks that share the same channel state file. " ++ "It can reduce the number of small files when enable unaligned checkpoint. " ++ "Each subtask will create a new channel state file when this is configured to 1."); Review Comment: > number-of-subtasks-share-file This sounds a bit strange in english. Maybe let's rename it to: > execution.checkpointing.channel-state.subtasks-per-file ? and renaming the config option and getters as well? @dawidwys maybe you have some better idea? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files
pnowojski commented on code in PR #20151: URL: https://github.com/apache/flink/pull/20151#discussion_r1081119337 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java: ## @@ -107,49 +119,78 @@ void run() { try { closeAll( this::cleanupRequests, -() -> -dispatcher.fail( -thrown == null ? new CancellationException() : thrown)); +() -> { +Throwable cause; +synchronized (lock) { +cause = thrown == null ? new CancellationException() : thrown; +} +dispatcher.fail(cause); +}); } catch (Exception e) { -//noinspection NonAtomicOperationOnVolatileField -thrown = ExceptionUtils.firstOrSuppressed(e, thrown); +synchronized (lock) { +//noinspection NonAtomicOperationOnVolatileField +thrown = ExceptionUtils.firstOrSuppressed(e, thrown); +} } FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); } LOG.debug("loop terminated"); } private void loop() throws Exception { -while (!wasClosed) { +while (true) { try { -ChannelStateWriteRequest request = deque.take(); -// The executor will end the registration, when the start request comes. -// Because the checkpoint can be started after all tasks are initiated. -if (request instanceof CheckpointStartRequest && isRegistering()) { -checkState( -isRegistering.compareAndSet(true, false), -"Transition isRegistering failed."); +ChannelStateWriteRequest request; +boolean completeRegister = false; +synchronized (lock) { +if (wasClosed) { +return; +} +request = waitAndTakeUnsafe(); +// The executor will end the registration, when the start request comes. +// Because the checkpoint can be started after all tasks are initiated. +if (request instanceof CheckpointStartRequest) { +completeRegister = completeRegister(); +} +} +if (completeRegister) { onRegistered.accept(this); } dispatcher.dispatch(request); } catch (InterruptedException e) { -if (!wasClosed) { -LOG.debug( -"Channel state executor is interrupted while waiting for a request (continue waiting)", -e); -} else { -Thread.currentThread().interrupt(); +synchronized (lock) { +if (!wasClosed) { +LOG.debug( +"Channel state executor is interrupted while waiting for a request (continue waiting)", +e); +} else { +Thread.currentThread().interrupt(); +} } } } } +private ChannelStateWriteRequest waitAndTakeUnsafe() throws InterruptedException { +ChannelStateWriteRequest request; +while (true) { Review Comment: shouldn't this be `while(!wasClosed)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files
pnowojski commented on code in PR #20151: URL: https://github.com/apache/flink/pull/20151#discussion_r1081118070 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java: ## @@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx private final Thread thread; private volatile Exception thrown = null; private volatile boolean wasClosed = false; -private final String taskName; + +private final Map> unreadyQueues = +new ConcurrentHashMap<>(); + +private final JobID jobID; +private final Set subtasks; +private final AtomicBoolean isRegistering = new AtomicBoolean(true); +private final int numberOfSubtasksShareFile; Review Comment: Hmmm, I'm not sure. Maybe it's sufficient to have just the interrupt. On the other hand, does it hurt if we add the `lock.notifyAll()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files
pnowojski commented on code in PR #20151: URL: https://github.com/apache/flink/pull/20151#discussion_r1052418421 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java: ## @@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx private final Thread thread; private volatile Exception thrown = null; private volatile boolean wasClosed = false; -private final String taskName; + +private final Map> unreadyQueues = +new ConcurrentHashMap<>(); + +private final JobID jobID; +private final Set subtasks; +private final AtomicBoolean isRegistering = new AtomicBoolean(true); +private final int numberOfSubtasksShareFile; Review Comment: > BlockingDeque.take() will wait until an element becomes available. Deque is hard to achieve. BlockingDeque is easy to implement the producer & consumer model. That's a slight complexity, but should be easily solved via `lock.wait()` and `lock.notifyAll()` called in one or two places (`close()` and whenever we add anything to the current `dequeue`)? https://howtodoinjava.com/java/multi-threading/wait-notify-and-notifyall-methods/ The loop probably should look like this: ``` while (true) { synchronized (lock) { if (wasClosed) return; (...) ChannelStateWriteRequest request = waitAndTakeUnsafe(); (...) } } private ChannelStateWriteRequest waitAndTakeUnsafe() { ChannelStateWriteRequest request; while (true) { request = dequeue.pollFirst(); if (request == null) { lock.wait(); } else { return request; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files
pnowojski commented on code in PR #20151: URL: https://github.com/apache/flink/pull/20151#discussion_r1052319366 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java: ## @@ -286,27 +277,32 @@ private void runWithChecks(RunnableWithException r) { } } -public void fail(JobID jobID, JobVertexID jobVertexID, int subtaskIndex, Throwable e) { +/** + * The throwable is just used for special subtask, other subtasks should fail by {@link + * CHANNEL_STATE_SHARED_STREAM_EXCEPTION}. Review Comment: nit: > The throwable is just used for specific subtask that triggered the failure. Other subtasks should fail by {@link CHANNEL_STATE_SHARED_STREAM_EXCEPTION} ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files
pnowojski commented on code in PR #20151: URL: https://github.com/apache/flink/pull/20151#discussion_r1052295272 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java: ## @@ -18,280 +18,265 @@ package org.apache.flink.runtime.checkpoint.channel; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger; -import org.apache.flink.runtime.state.AbstractChannelStateHandle; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.AbstractChannelStateHandle.StateContentMetaInfo; import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.InputChannelStateHandle; -import org.apache.flink.runtime.state.ResultSubpartitionStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.RunnableWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; import java.io.DataOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; +import java.util.Objects; +import java.util.Set; -import static java.util.Collections.emptyList; -import static java.util.Collections.singletonList; -import static java.util.UUID.randomUUID; +import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHANNEL_STATE_SHARED_STREAM_EXCEPTION; import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE; import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.apache.flink.util.ExceptionUtils.rethrow; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; -/** Writes channel state for a specific checkpoint-subtask-attempt triple. */ +/** Writes channel state for multiple subtasks of the same checkpoint. */ @NotThreadSafe class ChannelStateCheckpointWriter { private static final Logger LOG = LoggerFactory.getLogger(ChannelStateCheckpointWriter.class); private final DataOutputStream dataStream; private final CheckpointStateOutputStream checkpointStream; -private final ChannelStateWriteResult result; -private final Map inputChannelOffsets = new HashMap<>(); -private final Map resultSubpartitionOffsets = -new HashMap<>(); + +/** + * Indicates whether the current checkpoints of all subtasks have exception. If it's not null, + * the checkpoint will fail. + */ +private Throwable throwable; + private final ChannelStateSerializer serializer; private final long checkpointId; -private boolean allInputsReceived = false; -private boolean allOutputsReceived = false; private final RunnableWithException onComplete; -private final int subtaskIndex; -private final String taskName; + +// Subtasks that have not yet register writer result. +private final Set waitedSubtasks; + +private final Map pendingResults = new HashMap<>(); ChannelStateCheckpointWriter( -String taskName, -int subtaskIndex, -CheckpointStartRequest startCheckpointItem, +Set subtasks, +long checkpointId, CheckpointStreamFactory streamFactory, ChannelStateSerializer serializer, RunnableWithException onComplete) throws Exception { this( -taskName, -subtaskIndex, -startCheckpointItem.getCheckpointId(), -startCheckpointItem.getTargetResult(), +subtasks, +checkpointId, streamFactory.createCheckpointStateOutputStream(EXCLUSIVE), serializer, onComplete); } @VisibleForTesting ChannelStateCheckpointWriter( -String taskName, -int subtaskIndex, +Set subtasks, long checkpointId, -ChannelStateWriteResult result, CheckpointStateOutputStream stream, ChannelStateSerializer serializer,
[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files
pnowojski commented on code in PR #20151: URL: https://github.com/apache/flink/pull/20151#discussion_r1048286114 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java: ## @@ -18,280 +18,265 @@ package org.apache.flink.runtime.checkpoint.channel; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger; -import org.apache.flink.runtime.state.AbstractChannelStateHandle; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.AbstractChannelStateHandle.StateContentMetaInfo; import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.InputChannelStateHandle; -import org.apache.flink.runtime.state.ResultSubpartitionStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.RunnableWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; import java.io.DataOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; +import java.util.Objects; +import java.util.Set; -import static java.util.Collections.emptyList; -import static java.util.Collections.singletonList; -import static java.util.UUID.randomUUID; +import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHANNEL_STATE_SHARED_STREAM_EXCEPTION; import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE; import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.apache.flink.util.ExceptionUtils.rethrow; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; -/** Writes channel state for a specific checkpoint-subtask-attempt triple. */ +/** Writes channel state for multiple subtasks of the same checkpoint. */ @NotThreadSafe class ChannelStateCheckpointWriter { private static final Logger LOG = LoggerFactory.getLogger(ChannelStateCheckpointWriter.class); private final DataOutputStream dataStream; private final CheckpointStateOutputStream checkpointStream; -private final ChannelStateWriteResult result; -private final Map inputChannelOffsets = new HashMap<>(); -private final Map resultSubpartitionOffsets = -new HashMap<>(); + +/** + * Indicates whether the current checkpoints of all subtasks have exception. If it's not null, + * the checkpoint will fail. + */ +private Throwable throwable; + private final ChannelStateSerializer serializer; private final long checkpointId; -private boolean allInputsReceived = false; -private boolean allOutputsReceived = false; private final RunnableWithException onComplete; -private final int subtaskIndex; -private final String taskName; + +// Subtasks that have not yet register writer result. +private final Set waitedSubtasks; + +private final Map pendingResults = new HashMap<>(); ChannelStateCheckpointWriter( -String taskName, -int subtaskIndex, -CheckpointStartRequest startCheckpointItem, +Set subtasks, +long checkpointId, CheckpointStreamFactory streamFactory, ChannelStateSerializer serializer, RunnableWithException onComplete) throws Exception { this( -taskName, -subtaskIndex, -startCheckpointItem.getCheckpointId(), -startCheckpointItem.getTargetResult(), +subtasks, +checkpointId, streamFactory.createCheckpointStateOutputStream(EXCLUSIVE), serializer, onComplete); } @VisibleForTesting ChannelStateCheckpointWriter( -String taskName, -int subtaskIndex, +Set subtasks, long checkpointId, -ChannelStateWriteResult result, CheckpointStateOutputStream stream, ChannelStateSerializer serializer,
[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files
pnowojski commented on code in PR #20151: URL: https://github.com/apache/flink/pull/20151#discussion_r1048154979 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java: ## @@ -252,4 +252,13 @@ public class ExecutionCheckpointingOptions { "{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta", "the important considerations")) .build()); + +public static final ConfigOption CHANNEL_STATE_NUMBER_OF_TASKS_SHARE_FILE = + ConfigOptions.key("execution.checkpointing.channel-state.number-of-tasks-share-file") +.intType() +.defaultValue(5) +.withDescription( +"Defines the maximum number of tasks that share the same channel state file. " ++ "It can reduce the number of small files when enable unaligned checkpoint. " ++ "Each task will create a new channel state file when this is configured to 1."); Review Comment: I don't see that this has been actually fixed? Note, most likely also the config option should be renamed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files
pnowojski commented on code in PR #20151: URL: https://github.com/apache/flink/pull/20151#discussion_r1000549678 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java: ## @@ -205,9 +207,17 @@ public CheckpointStorageLocationReference getLocationReference() { return locationReference; } +public void registerCancelCallback(Consumer cancelCallback) { +this.cancelCallback = cancelCallback; +} + @Override public void cancel(Throwable cause) { -targetResult.fail(cause); +if (cancelCallback == null) { +targetResult.fail(cause); +return; +} +cancelCallback.accept(cause); Review Comment: why we are not failing the `targetResult` if the `cancelCallback` is set? This seems strange. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java: ## @@ -252,4 +252,13 @@ public class ExecutionCheckpointingOptions { "{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta", "the important considerations")) .build()); + +public static final ConfigOption CHANNEL_STATE_NUMBER_OF_TASKS_SHARE_FILE = + ConfigOptions.key("execution.checkpointing.channel-state.number-of-tasks-share-file") +.intType() +.defaultValue(5) +.withDescription( +"Defines the maximum number of tasks that share the same channel state file. " ++ "It can reduce the number of small files when enable unaligned checkpoint. " ++ "Each task will create a new channel state file when this is configured to 1."); Review Comment: `task` -> `subtask` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org