1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082043383
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -107,49 +119,78 @@ void run() {
try {
closeAll(
this::cleanupRequests,
- () ->
- dispatcher.fail(
- thrown == null ? new
CancellationException() : thrown));
+ () -> {
+ Throwable cause;
+ synchronized (lock) {
+ cause = thrown == null ? new
CancellationException() : thrown;
+ }
+ dispatcher.fail(cause);
+ });
} catch (Exception e) {
- //noinspection NonAtomicOperationOnVolatileField
- thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+ synchronized (lock) {
+ //noinspection NonAtomicOperationOnVolatileField
+ thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+ }
}
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
LOG.debug("loop terminated");
}
private void loop() throws Exception {
- while (!wasClosed) {
+ while (true) {
try {
- ChannelStateWriteRequest request = deque.take();
- // The executor will end the registration, when the start
request comes.
- // Because the checkpoint can be started after all tasks are
initiated.
- if (request instanceof CheckpointStartRequest &&
isRegistering()) {
- checkState(
- isRegistering.compareAndSet(true, false),
- "Transition isRegistering failed.");
+ ChannelStateWriteRequest request;
+ boolean completeRegister = false;
+ synchronized (lock) {
+ if (wasClosed) {
+ return;
+ }
+ request = waitAndTakeUnsafe();
+ // The executor will end the registration, when the start
request comes.
+ // Because the checkpoint can be started after all tasks
are initiated.
+ if (request instanceof CheckpointStartRequest) {
+ completeRegister = completeRegister();
+ }
+ }
+ if (completeRegister) {
onRegistered.accept(this);
}
dispatcher.dispatch(request);
} catch (InterruptedException e) {
- if (!wasClosed) {
- LOG.debug(
- "Channel state executor is interrupted while
waiting for a request (continue waiting)",
- e);
- } else {
- Thread.currentThread().interrupt();
+ synchronized (lock) {
+ if (!wasClosed) {
+ LOG.debug(
+ "Channel state executor is interrupted while
waiting for a request (continue waiting)",
+ e);
+ } else {
+ Thread.currentThread().interrupt();
+ }
}
}
}
}
+ private ChannelStateWriteRequest waitAndTakeUnsafe() throws
InterruptedException {
+ ChannelStateWriteRequest request;
+ while (true) {
Review Comment:
Thanks for your review, updated.
I didn't squash commits, and add a new fixup commit, it should be easy to
review. And I can squash them after you think it's ok.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]