[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-20 Thread GitBox


pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082600527


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##
@@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions {
 .booleanType()
 .defaultValue(false)
 .withDescription("Flag to enable approximate local 
recovery.");
+
+public static final ConfigOption 
CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE =
+
ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file")
+.intType()
+.defaultValue(5)
+.withDescription(
+"Defines the maximum number of subtasks that share 
the same channel state file. "
++ "It can reduce the number of small files 
when enable unaligned checkpoint. "
++ "Each subtask will create a new channel 
state file when this is configured to 1.");

Review Comment:
   It's only for unaligned checkpoints. There is a possibility we will have 
more options, but not very likely. So maybe 
`execution.checkpointing.unaligned.max-subtasks-per-channel-state-file`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-20 Thread GitBox


pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082365656


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##
@@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions {
 .booleanType()
 .defaultValue(false)
 .withDescription("Flag to enable approximate local 
recovery.");
+
+public static final ConfigOption 
CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE =
+
ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file")
+.intType()
+.defaultValue(5)
+.withDescription(
+"Defines the maximum number of subtasks that share 
the same channel state file. "
++ "It can reduce the number of small files 
when enable unaligned checkpoint. "
++ "Each subtask will create a new channel 
state file when this is configured to 1.");

Review Comment:
   > number-of-subtasks-share-file
   
   This sounds a bit strange in english. Maybe let's rename it to:
   
   > execution.checkpointing.channel-state.subtasks-per-file
   
   ? and renaming the config option and getters as well? @dawidwys maybe you 
have some better idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-19 Thread GitBox


pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1081119337


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##
@@ -107,49 +119,78 @@ void run() {
 try {
 closeAll(
 this::cleanupRequests,
-() ->
-dispatcher.fail(
-thrown == null ? new 
CancellationException() : thrown));
+() -> {
+Throwable cause;
+synchronized (lock) {
+cause = thrown == null ? new 
CancellationException() : thrown;
+}
+dispatcher.fail(cause);
+});
 } catch (Exception e) {
-//noinspection NonAtomicOperationOnVolatileField
-thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+synchronized (lock) {
+//noinspection NonAtomicOperationOnVolatileField
+thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+}
 }
 FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 }
 LOG.debug("loop terminated");
 }
 
 private void loop() throws Exception {
-while (!wasClosed) {
+while (true) {
 try {
-ChannelStateWriteRequest request = deque.take();
-// The executor will end the registration, when the start 
request comes.
-// Because the checkpoint can be started after all tasks are 
initiated.
-if (request instanceof CheckpointStartRequest && 
isRegistering()) {
-checkState(
-isRegistering.compareAndSet(true, false),
-"Transition isRegistering failed.");
+ChannelStateWriteRequest request;
+boolean completeRegister = false;
+synchronized (lock) {
+if (wasClosed) {
+return;
+}
+request = waitAndTakeUnsafe();
+// The executor will end the registration, when the start 
request comes.
+// Because the checkpoint can be started after all tasks 
are initiated.
+if (request instanceof CheckpointStartRequest) {
+completeRegister = completeRegister();
+}
+}
+if (completeRegister) {
 onRegistered.accept(this);
 }
 dispatcher.dispatch(request);
 } catch (InterruptedException e) {
-if (!wasClosed) {
-LOG.debug(
-"Channel state executor is interrupted while 
waiting for a request (continue waiting)",
-e);
-} else {
-Thread.currentThread().interrupt();
+synchronized (lock) {
+if (!wasClosed) {
+LOG.debug(
+"Channel state executor is interrupted while 
waiting for a request (continue waiting)",
+e);
+} else {
+Thread.currentThread().interrupt();
+}
 }
 }
 }
 }
 
+private ChannelStateWriteRequest waitAndTakeUnsafe() throws 
InterruptedException {
+ChannelStateWriteRequest request;
+while (true) {

Review Comment:
   shouldn't this be `while(!wasClosed)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-19 Thread GitBox


pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1081118070


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##
@@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements 
ChannelStateWriteRequestEx
 private final Thread thread;
 private volatile Exception thrown = null;
 private volatile boolean wasClosed = false;
-private final String taskName;
+
+private final Map> 
unreadyQueues =
+new ConcurrentHashMap<>();
+
+private final JobID jobID;
+private final Set subtasks;
+private final AtomicBoolean isRegistering = new AtomicBoolean(true);
+private final int numberOfSubtasksShareFile;

Review Comment:
   Hmmm, I'm not sure. Maybe it's sufficient to have just the interrupt. On the 
other hand, does it hurt if we add the `lock.notifyAll()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2022-12-19 Thread GitBox


pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1052418421


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##
@@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements 
ChannelStateWriteRequestEx
 private final Thread thread;
 private volatile Exception thrown = null;
 private volatile boolean wasClosed = false;
-private final String taskName;
+
+private final Map> 
unreadyQueues =
+new ConcurrentHashMap<>();
+
+private final JobID jobID;
+private final Set subtasks;
+private final AtomicBoolean isRegistering = new AtomicBoolean(true);
+private final int numberOfSubtasksShareFile;

Review Comment:
   > BlockingDeque.take() will wait until an element becomes available. Deque 
is hard to achieve. BlockingDeque is easy to implement the producer & consumer 
model.
   
   That's a slight complexity, but should be easily solved via `lock.wait()` 
and `lock.notifyAll()` called in one or two places (`close()` and whenever we 
add anything to the current `dequeue`)? 
https://howtodoinjava.com/java/multi-threading/wait-notify-and-notifyall-methods/
   
   The loop probably should look like this:
   
   ```
   while (true) {
 synchronized (lock) {
   if (wasClosed) return; 
   (...)
   ChannelStateWriteRequest request = waitAndTakeUnsafe();
   (...)
 }
   }
   
   private ChannelStateWriteRequest waitAndTakeUnsafe() {
 ChannelStateWriteRequest request;
 while (true) {
   request = dequeue.pollFirst();
   if (request == null) {
 lock.wait();
   }
   else {
 return request;
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2022-12-19 Thread GitBox


pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1052319366


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java:
##
@@ -286,27 +277,32 @@ private void runWithChecks(RunnableWithException r) {
 }
 }
 
-public void fail(JobID jobID, JobVertexID jobVertexID, int subtaskIndex, 
Throwable e) {
+/**
+ * The throwable is just used for special subtask, other subtasks should 
fail by {@link
+ * CHANNEL_STATE_SHARED_STREAM_EXCEPTION}.

Review Comment:
   nit:
   
   > The throwable is just used for specific subtask that triggered the 
failure. Other subtasks should fail by {@link 
CHANNEL_STATE_SHARED_STREAM_EXCEPTION}
   
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2022-12-19 Thread GitBox


pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1052295272


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java:
##
@@ -18,280 +18,265 @@
 package org.apache.flink.runtime.checkpoint.channel;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
-import org.apache.flink.runtime.state.AbstractChannelStateHandle;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.state.AbstractChannelStateHandle.StateContentMetaInfo;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.InputChannelStateHandle;
-import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.Objects;
+import java.util.Set;
 
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-import static java.util.UUID.randomUUID;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHANNEL_STATE_SHARED_STREAM_EXCEPTION;
 import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.rethrow;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
-/** Writes channel state for a specific checkpoint-subtask-attempt triple. */
+/** Writes channel state for multiple subtasks of the same checkpoint. */
 @NotThreadSafe
 class ChannelStateCheckpointWriter {
 private static final Logger LOG = 
LoggerFactory.getLogger(ChannelStateCheckpointWriter.class);
 
 private final DataOutputStream dataStream;
 private final CheckpointStateOutputStream checkpointStream;
-private final ChannelStateWriteResult result;
-private final Map 
inputChannelOffsets = new HashMap<>();
-private final Map 
resultSubpartitionOffsets =
-new HashMap<>();
+
+/**
+ * Indicates whether the current checkpoints of all subtasks have 
exception. If it's not null,
+ * the checkpoint will fail.
+ */
+private Throwable throwable;
+
 private final ChannelStateSerializer serializer;
 private final long checkpointId;
-private boolean allInputsReceived = false;
-private boolean allOutputsReceived = false;
 private final RunnableWithException onComplete;
-private final int subtaskIndex;
-private final String taskName;
+
+// Subtasks that have not yet register writer result.
+private final Set waitedSubtasks;
+
+private final Map pendingResults = 
new HashMap<>();
 
 ChannelStateCheckpointWriter(
-String taskName,
-int subtaskIndex,
-CheckpointStartRequest startCheckpointItem,
+Set subtasks,
+long checkpointId,
 CheckpointStreamFactory streamFactory,
 ChannelStateSerializer serializer,
 RunnableWithException onComplete)
 throws Exception {
 this(
-taskName,
-subtaskIndex,
-startCheckpointItem.getCheckpointId(),
-startCheckpointItem.getTargetResult(),
+subtasks,
+checkpointId,
 streamFactory.createCheckpointStateOutputStream(EXCLUSIVE),
 serializer,
 onComplete);
 }
 
 @VisibleForTesting
 ChannelStateCheckpointWriter(
-String taskName,
-int subtaskIndex,
+Set subtasks,
 long checkpointId,
-ChannelStateWriteResult result,
 CheckpointStateOutputStream stream,
 ChannelStateSerializer serializer,
 

[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2022-12-14 Thread GitBox


pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1048286114


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java:
##
@@ -18,280 +18,265 @@
 package org.apache.flink.runtime.checkpoint.channel;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
-import org.apache.flink.runtime.state.AbstractChannelStateHandle;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.state.AbstractChannelStateHandle.StateContentMetaInfo;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.InputChannelStateHandle;
-import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.Objects;
+import java.util.Set;
 
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-import static java.util.UUID.randomUUID;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHANNEL_STATE_SHARED_STREAM_EXCEPTION;
 import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.rethrow;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
-/** Writes channel state for a specific checkpoint-subtask-attempt triple. */
+/** Writes channel state for multiple subtasks of the same checkpoint. */
 @NotThreadSafe
 class ChannelStateCheckpointWriter {
 private static final Logger LOG = 
LoggerFactory.getLogger(ChannelStateCheckpointWriter.class);
 
 private final DataOutputStream dataStream;
 private final CheckpointStateOutputStream checkpointStream;
-private final ChannelStateWriteResult result;
-private final Map 
inputChannelOffsets = new HashMap<>();
-private final Map 
resultSubpartitionOffsets =
-new HashMap<>();
+
+/**
+ * Indicates whether the current checkpoints of all subtasks have 
exception. If it's not null,
+ * the checkpoint will fail.
+ */
+private Throwable throwable;
+
 private final ChannelStateSerializer serializer;
 private final long checkpointId;
-private boolean allInputsReceived = false;
-private boolean allOutputsReceived = false;
 private final RunnableWithException onComplete;
-private final int subtaskIndex;
-private final String taskName;
+
+// Subtasks that have not yet register writer result.
+private final Set waitedSubtasks;
+
+private final Map pendingResults = 
new HashMap<>();
 
 ChannelStateCheckpointWriter(
-String taskName,
-int subtaskIndex,
-CheckpointStartRequest startCheckpointItem,
+Set subtasks,
+long checkpointId,
 CheckpointStreamFactory streamFactory,
 ChannelStateSerializer serializer,
 RunnableWithException onComplete)
 throws Exception {
 this(
-taskName,
-subtaskIndex,
-startCheckpointItem.getCheckpointId(),
-startCheckpointItem.getTargetResult(),
+subtasks,
+checkpointId,
 streamFactory.createCheckpointStateOutputStream(EXCLUSIVE),
 serializer,
 onComplete);
 }
 
 @VisibleForTesting
 ChannelStateCheckpointWriter(
-String taskName,
-int subtaskIndex,
+Set subtasks,
 long checkpointId,
-ChannelStateWriteResult result,
 CheckpointStateOutputStream stream,
 ChannelStateSerializer serializer,
 

[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2022-12-14 Thread GitBox


pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1048154979


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##
@@ -252,4 +252,13 @@ public class ExecutionCheckpointingOptions {
 
"{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta",
 "the important 
considerations"))
 .build());
+
+public static final ConfigOption 
CHANNEL_STATE_NUMBER_OF_TASKS_SHARE_FILE =
+
ConfigOptions.key("execution.checkpointing.channel-state.number-of-tasks-share-file")
+.intType()
+.defaultValue(5)
+.withDescription(
+"Defines the maximum number of tasks that share 
the same channel state file. "
++ "It can reduce the number of small files 
when enable unaligned checkpoint. "
++ "Each task will create a new channel 
state file when this is configured to 1.");

Review Comment:
   I don't see that this has been actually fixed? Note, most likely also the 
config option should be renamed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2022-10-20 Thread GitBox


pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1000549678


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##
@@ -205,9 +207,17 @@ public CheckpointStorageLocationReference 
getLocationReference() {
 return locationReference;
 }
 
+public void registerCancelCallback(Consumer cancelCallback) {
+this.cancelCallback = cancelCallback;
+}
+
 @Override
 public void cancel(Throwable cause) {
-targetResult.fail(cause);
+if (cancelCallback == null) {
+targetResult.fail(cause);
+return;
+}
+cancelCallback.accept(cause);

Review Comment:
   why we are not failing the `targetResult` if the `cancelCallback` is set? 
This seems strange.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##
@@ -252,4 +252,13 @@ public class ExecutionCheckpointingOptions {
 
"{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta",
 "the important 
considerations"))
 .build());
+
+public static final ConfigOption 
CHANNEL_STATE_NUMBER_OF_TASKS_SHARE_FILE =
+
ConfigOptions.key("execution.checkpointing.channel-state.number-of-tasks-share-file")
+.intType()
+.defaultValue(5)
+.withDescription(
+"Defines the maximum number of tasks that share 
the same channel state file. "
++ "It can reduce the number of small files 
when enable unaligned checkpoint. "
++ "Each task will create a new channel 
state file when this is configured to 1.");

Review Comment:
   `task` -> `subtask`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org