Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4764#discussion_r142592523 --- Diff: flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.io.IOException; +import java.io.Serializable; + +/** + * This class is a guard for shared resources with the following invariants. The resource can be acquired by multiple + * clients in parallel through the {@link #acquireResource()} call. Clients must release the resource after usage + * with a matching call to {@link #releaseResource()}. The resource can only be disposed once the guard is closed, but + * the guard can only be closed once all clients that acquired the resource have released it. Before this is happened, + * the call to {@link #close()} will block until the condition is triggered. After the guard is closed, calls to + * {@link #acquireResource()} will fail with exception. Notice that, obviously clients are responsible to release the + * resource after usage. All clients are considered equal, i.e. there is only a global count maintained how many + * times the resource was acquired but not by whom. + */ +public class ResourceGuard implements AutoCloseable, Serializable { + + private static final long serialVersionUID = 1L; + + /** + * Introduced additional checks of invariants. + */ + private static final boolean STRICT_CHECKS = true; + + /** + * The object that serves as lock for count and the closed-flag. + */ + private final SerializableObject lock; + + /** + * Number of clients that have ongoing access to the resource. + */ + private int clientCount; + + /** + * This flag indicated if it is still possible to acquire access to the resource. + */ + private volatile boolean closed; + + public ResourceGuard() { + this.lock = new SerializableObject(); + this.clientCount = 0; + this.closed = false; + } + + /** + * Acquired access from one new client for the guarded resource. + * + * @throws IOException when the resource guard is already closed. + */ + public void acquireResource() throws IOException { + + synchronized (lock) { + + if (closed) { + throw new IOException("Resource guard was already closed."); + } + + ++clientCount; + } + } + + /** + * Releases access for one client of the guarded resource. This method must only be called after a matching call to + * {@link #acquireResource()}. + */ + public void releaseResource() { + + synchronized (lock) { + + if (STRICT_CHECKS && clientCount <= 0) { --- End diff -- It is just a flag, whether or not to check that there cannot be more calls to `release()` than previous calls to `acquire`. Through the `static final boolean` in the condition, the jitter can eliminate the branch if it is deactivated.
---