dawidwys commented on a change in pull request #17701:
URL: https://github.com/apache/flink/pull/17701#discussion_r754242318
##########
File path:
flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
##########
@@ -62,10 +67,13 @@ protected boolean doUnRegister(
return closeableMap.remove(closeable) != null;
}
+ /**
+ * This implementation doesn't imply any exception during closing due to
backward compatibility.
+ */
@Override
- protected Collection<Closeable> getReferencesToClose() {
- ArrayList<Closeable> closeablesToClose = new
ArrayList<>(closeableToRef.keySet());
- Collections.reverse(closeablesToClose);
- return closeablesToClose;
+ public void doClose(Collection<Closeable> toClose) throws IOException {
+ List<AutoCloseable> reversed = new ArrayList<>(toClose);
Review comment:
WDYT about reworking it to use guava's `Lists.reverse`? We would get rid
off a single list copy. We would have to change the `doClose` to
`doClose(List<>)`.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -989,19 +1006,6 @@ private void releaseOutputResources() throws Exception {
}
}
- private Exception runAndSuppressThrowable(
- ThrowingRunnable<?> runnable, @Nullable Exception
originalException) {
- try {
- runnable.run();
- } catch (Throwable t) {
- // TODO: investigate why Throwable instead of Exception is used
here.
- Exception e = t instanceof Exception ? (Exception) t : new
Exception(t);
Review comment:
Honestly, I don't feel confident making that decision. It's quite an
outer layer of the entire Flink stack, so actually it might make sense to
really catch all sorts of errors and try to clean up the resources.
@pnowojski WDYT?
##########
File path:
flink-core/src/main/java/org/apache/flink/util/AbstractAutoCloseableRegistry.java
##########
@@ -158,14 +157,14 @@ protected final Object getSynchronizationLock() {
}
/** Adds a mapping to the registry map, respecting locking. */
- protected final void addCloseableInternal(Closeable closeable, T metaData)
{
+ protected final void addCloseableInternal(R closeable, T metaData) {
Review comment:
My IDE says it is unused. Is that correct? If so, can we drop it?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -366,70 +368,100 @@ protected StreamTask(
StreamTaskActionExecutor actionExecutor,
TaskMailbox mailbox)
throws Exception {
- this.environment = environment;
- this.configuration = new
StreamConfig(environment.getTaskConfiguration());
- this.recordWriter = createRecordWriterDelegate(configuration,
environment);
- this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
- this.mailboxProcessor = new MailboxProcessor(this::processInput,
mailbox, actionExecutor);
- this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
- this.asyncExceptionHandler = new
StreamTaskAsyncExceptionHandler(environment);
- this.asyncOperationsThreadPool =
- Executors.newCachedThreadPool(
- new ExecutorThreadFactory("AsyncOperations",
uncaughtExceptionHandler));
-
- environment.setMainMailboxExecutor(mainMailboxExecutor);
- environment.setAsyncOperationsThreadPool(asyncOperationsThreadPool);
-
- this.stateBackend = createStateBackend();
- this.checkpointStorage = createCheckpointStorage(stateBackend);
- this.changelogWriterAvailabilityProvider =
- environment.getTaskStateManager().getStateChangelogStorage()
== null
- ? null
- : environment
- .getTaskStateManager()
- .getStateChangelogStorage()
- .getAvailabilityProvider();
-
- CheckpointStorageAccess checkpointStorageAccess =
-
checkpointStorage.createCheckpointStorage(getEnvironment().getJobID());
-
- environment.setCheckpointStorageAccess(checkpointStorageAccess);
-
- this.subtaskCheckpointCoordinator =
- new SubtaskCheckpointCoordinatorImpl(
- checkpointStorageAccess,
- getName(),
- actionExecutor,
- getCancelables(),
- getAsyncOperationsThreadPool(),
- environment,
- this,
- configuration.isUnalignedCheckpointsEnabled(),
- configuration
- .getConfiguration()
- .get(
- ExecutionCheckpointingOptions
-
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH),
- this::prepareInputSnapshot);
-
- // if the clock is not already set, then assign a default
TimeServiceProvider
- if (timerService == null) {
- this.timerService = createTimerService("Time Trigger for " +
getName());
- } else {
- this.timerService = timerService;
- }
+ // The registration of all closeable resources. The order of
registration is important.
+ resourceCloser = new AutoCloseableRegistry();
+ try {
+ this.environment = environment;
+ this.configuration = new
StreamConfig(environment.getTaskConfiguration());
+ this.mailboxProcessor =
+ new MailboxProcessor(this::processInput, mailbox,
actionExecutor);
+ // Should be closed last.
+ resourceCloser.registerCloseable(mailboxProcessor);
+
+ this.channelIOExecutor =
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("channel-state-unspilling"));
+ resourceCloser.registerCloseable(channelIOExecutor::shutdown);
+
+ this.recordWriter = createRecordWriterDelegate(configuration,
environment);
+ // Release the output resources. this method should never fail.
+ resourceCloser.registerCloseable(this::releaseOutputResources);
+ // If the operators won't be closed explicitly, register it to a
hard close.
+ resourceCloser.registerCloseable(this::closeAllOperators);
+ resourceCloser.registerCloseable(this::cleanUpInternal);
+
+ this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
+ this.mainMailboxExecutor =
mailboxProcessor.getMainMailboxExecutor();
+ this.asyncExceptionHandler = new
StreamTaskAsyncExceptionHandler(environment);
+
+ this.asyncOperationsThreadPool =
+ Executors.newCachedThreadPool(
+ new ExecutorThreadFactory("AsyncOperations",
uncaughtExceptionHandler));
+
+ // Register all asynchronous checkpoint threads.
+ resourceCloser.registerCloseable(this::shutdownAsyncThreads);
+ resourceCloser.registerCloseable(cancelables);
+
+ environment.setMainMailboxExecutor(mainMailboxExecutor);
+
environment.setAsyncOperationsThreadPool(asyncOperationsThreadPool);
+
+ this.stateBackend = createStateBackend();
+ this.checkpointStorage = createCheckpointStorage(stateBackend);
+ this.changelogWriterAvailabilityProvider =
+
environment.getTaskStateManager().getStateChangelogStorage() == null
+ ? null
+ : environment
+ .getTaskStateManager()
+ .getStateChangelogStorage()
+ .getAvailabilityProvider();
+
+ CheckpointStorageAccess checkpointStorageAccess =
+
checkpointStorage.createCheckpointStorage(getEnvironment().getJobID());
+
+ environment.setCheckpointStorageAccess(checkpointStorageAccess);
+
+ this.subtaskCheckpointCoordinator =
+ new SubtaskCheckpointCoordinatorImpl(
+ checkpointStorageAccess,
+ getName(),
+ actionExecutor,
+ getCancelables(),
+ getAsyncOperationsThreadPool(),
+ environment,
+ this,
+ configuration.isUnalignedCheckpointsEnabled(),
+ configuration
+ .getConfiguration()
+ .get(
+ ExecutionCheckpointingOptions
+
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH),
+ this::prepareInputSnapshot);
+
+ // if the clock is not already set, then assign a default
TimeServiceProvider
+ if (timerService == null) {
+ this.timerService = createTimerService("Time Trigger for " +
getName());
+ } else {
+ this.timerService = timerService;
+ }
- this.systemTimerService = createTimerService("System Time Trigger for
" + getName());
- this.channelIOExecutor =
- Executors.newSingleThreadExecutor(
- new ExecutorThreadFactory("channel-state-unspilling"));
+ this.systemTimerService = createTimerService("System Time Trigger
for " + getName());
+ // Register to stop all timers and threads. Should be closed first.
+ resourceCloser.registerCloseable(this::tryShutdownTimerService);
- injectChannelStateWriterIntoChannels();
+ injectChannelStateWriterIntoChannels();
-
environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true);
- Configuration taskManagerConf =
environment.getTaskManagerInfo().getConfiguration();
+
environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true);
+ Configuration taskManagerConf =
environment.getTaskManagerInfo().getConfiguration();
- this.bufferDebloatPeriod =
taskManagerConf.get(BUFFER_DEBLOAT_PERIOD).toMillis();
+ this.bufferDebloatPeriod =
taskManagerConf.get(BUFFER_DEBLOAT_PERIOD).toMillis();
+ } catch (Exception ex) {
+ try {
+ resourceCloser.close();
Review comment:
Why is it different than in lines `948-952`?
##########
File path:
flink-core/src/main/java/org/apache/flink/util/AbstractAutoCloseableRegistry.java
##########
@@ -43,33 +43,36 @@
* @param <T> Type for potential meta data associated with the registering
closeables
*/
@Internal
-public abstract class AbstractCloseableRegistry<C extends Closeable, T>
implements Closeable {
+public abstract class AbstractAutoCloseableRegistry<
+ R extends AutoCloseable, C extends R, T, E extends Exception>
+ implements AutoCloseable {
/** Lock that guards state of this registry. * */
private final Object lock;
/** Map from tracked Closeables to some associated meta data. */
@GuardedBy("lock")
- protected final Map<Closeable, T> closeableToRef;
+ protected final Map<R, T> closeableToRef;
/** Indicates if this registry is closed. */
@GuardedBy("lock")
private boolean closed;
- public AbstractCloseableRegistry(@Nonnull Map<Closeable, T>
closeableToRef) {
+ public AbstractAutoCloseableRegistry(@Nonnull Map<R, T> closeableToRef) {
this.lock = new Object();
this.closeableToRef = Preconditions.checkNotNull(closeableToRef);
this.closed = false;
}
/**
- * Registers a {@link Closeable} with the registry. In case the registry
is already closed, this
- * method throws an {@link IllegalStateException} and closes the passed
{@link Closeable}.
+ * Registers a {@link AutoCloseable} with the registry. In case the
registry is already closed,
+ * this method throws an {@link IllegalStateException} and closes the
passed {@link
+ * AutoCloseable}.
*
- * @param closeable Closeable tor register
- * @throws IOException exception when the registry was closed before
+ * @param closeable Closeable to register.
+ * @throws IOException exception when the registry was closed before.
*/
- public final void registerCloseable(C closeable) throws IOException {
+ public void registerCloseable(C closeable) throws IOException {
Review comment:
Why did you drop the `final` here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]