>From Michael Blow <[email protected]>: Michael Blow has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19467 )
Change subject: [NO ISSUE][HYR][MISC] Generalize timed interruptible actions with Span ...................................................................... [NO ISSUE][HYR][MISC] Generalize timed interruptible actions with Span - user model changes: no - storage format changes: no - interface changes: no Ext-ref: MB-65432 Change-Id: Id2d5e1db6f3c7077f220f409ad60014121266d58 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19467 Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Michael Blow <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java A hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptableBiFunction.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java 3 files changed, 100 insertions(+), 14 deletions(-) Approvals: Michael Blow: Looks good to me, but someone else must approve; Verified Ali Alsuliman: Looks good to me, approved Anon. E. Moose #1000171: Jenkins: Verified diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index a4dd9db..83d0eb8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -52,7 +52,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.util.ExceptionUtils; -import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.util.Span; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -289,22 +288,11 @@ for (Future<Void> task : tasks) { task.cancel(true); } - if (acquireUninterruptibly(completeSemaphore, retryWait)) { + retryWait.reset(); + if (retryWait.tryAcquireUninterruptibly(completeSemaphore)) { return true; } LOGGER.warn("not all tasks were cancelled within 5 minutes. retrying cancelling..."); } } - - private static boolean acquireUninterruptibly(Semaphore completeSemaphore, Span s) { - s.reset(); - return InvokeUtil.getUninterruptibly(() -> { - while (!s.elapsed()) { - if (completeSemaphore.tryAcquire(s.remaining(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)) { - return true; - } - } - return false; - }); - } } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptableBiFunction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptableBiFunction.java new file mode 100644 index 0000000..c36d194 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptableBiFunction.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.util; + +import java.util.function.BiFunction; + +import com.google.common.util.concurrent.UncheckedExecutionException; + +@FunctionalInterface +public interface InterruptableBiFunction<I, J, R> { + R process(I p1, J p2) throws InterruptedException; + + @SuppressWarnings("Duplicates") + static <I, J, R> BiFunction<I, J, R> asUnchecked(InterruptableBiFunction<I, J, R> function) { + return (p1, p2) -> { + try { + return function.process(p1, p2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedExecutionException(e); + } + }; + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java index 26d7f1c..8c8f1ce 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java @@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; public class Span { @@ -159,6 +160,43 @@ return latch.await(remaining(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); } + /** + * tryAcquireUninterruptibly will attempt to acquire the semaphore until the span has elapsed or the semaphore is + * acquired, disregarding any interruptions. If an interruption occurs, the thread will be re-interrupted upon + * method completion. + * @param semaphore the semaphore to acquire + * @return true if the semaphore was acquired, false if the span elapsed + */ + public boolean tryAcquireUninterruptibly(Semaphore semaphore) { + return retryUninterruptibly(semaphore::tryAcquire, v -> v); + } + + /** + * retryUninterruptibly will attempt to execute the action until the span has elapsed or the action has succeeded + * @param action the action to execute + * @param test the predicate to determine if the action has succeeded + * @return true if the action succeeded, false if the span elapsed + */ + public <V> boolean retryUninterruptibly(InterruptableBiFunction<Long, TimeUnit, V> action, Predicate<V> test) { + boolean interrupted = Thread.interrupted(); + try { + while (!elapsed()) { + try { + if (test.test(action.process(remaining(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS))) { + return true; + } + } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind + interrupted = true; + } + } + return false; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + public void loopUntilExhausted(ThrowingAction action) throws Exception { loopUntilExhausted(action, 0, TimeUnit.NANOSECONDS); } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19467 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: ionic Gerrit-Change-Id: Id2d5e1db6f3c7077f220f409ad60014121266d58 Gerrit-Change-Number: 19467 Gerrit-PatchSet: 3 Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-MessageType: merged
