This is an automated email from the ASF dual-hosted git repository. dblevins pushed a commit to branch TOMEE-4050 in repository https://gitbox.apache.org/repos/asf/tomee.git
commit 6e201ba5085a696aa3c0b0d8cd7af1790de73caa Author: David Blevins <dblev...@tomitribe.com> AuthorDate: Wed Sep 21 18:51:16 2022 -0700 Prototype CachingSupplier required for TOMEE-4050: Retry and Refresh for MP JWT keys supplied via HTTP --- .../org/apache/openejb/util/CachedSupplier.java | 237 +++++++++++++++++++++ .../java/org/apache/openejb/util/Duration.java | 14 ++ .../apache/openejb/util/CachedSupplierTest.java | 112 ++++++++++ .../java/org/apache/openejb/util/DurationTest.java | 17 ++ 4 files changed, 380 insertions(+) diff --git a/container/openejb-core/src/main/java/org/apache/openejb/util/CachedSupplier.java b/container/openejb-core/src/main/java/org/apache/openejb/util/CachedSupplier.java new file mode 100644 index 0000000000..8eec3d6444 --- /dev/null +++ b/container/openejb-core/src/main/java/org/apache/openejb/util/CachedSupplier.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.util; + +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +public class CachedSupplier<T> implements Supplier<T> { + + private final Duration initialRetryDelay; + private final Duration maxRetryDelay; + private final Duration accessTimeout; + private final Duration refreshInterval; + private final Supplier<T> supplier; + + private final AtomicReference<T> value = new AtomicReference<>(); + private final AtomicReference<Accessor<T>> accessor = new AtomicReference<>(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory()); + + private CachedSupplier(final Supplier<T> supplier, final Duration initialRetryDelay, + final Duration maxRetryDelay, final Duration accessTimeout, + final Duration refreshInterval) { + + Objects.requireNonNull(supplier, "supplier"); + Objects.requireNonNull(initialRetryDelay, "initialRetryDelay"); + Objects.requireNonNull(maxRetryDelay, "maxRetryDelay"); + Objects.requireNonNull(accessTimeout, "accessTimeout"); + Objects.requireNonNull(refreshInterval, "refreshInterval"); + + this.supplier = supplier; + this.initialRetryDelay = initialRetryDelay; + this.maxRetryDelay = maxRetryDelay; + this.accessTimeout = accessTimeout; + this.refreshInterval = refreshInterval; + + /* + * This must be last as it starts running code + * that uses the above settings + */ + this.accessor.set(new BlockTillInitialized()); + } + + @Override + public T get() { + final Accessor<T> accessor = this.accessor.get(); + return accessor.get(); + } + + public interface Accessor<T> { + T get(); + } + + class BlockTillInitialized implements Accessor<T> { + final CountDownLatch initialized = new CountDownLatch(1); + + public BlockTillInitialized() { + executor.execute(new Initialize(1, initialRetryDelay)); + } + + @Override + public T get() { + try { + initialized.await(accessTimeout.getTime(), accessTimeout.getUnit()); + return value.get(); + } catch (InterruptedException e) { + throw new TimeoutException(); + } + } + + class Initialize implements Runnable { + final int attempts; + final Duration delay; + + public Initialize(final int attempts, final Duration delay) { + this.attempts = attempts; + this.delay = delay; + } + + public Initialize retry() { + if (delay.greaterOrEqualTo(maxRetryDelay)) { + return new Initialize(attempts + 1, maxRetryDelay); + } else { + return new Initialize(attempts + 1, Duration.min(maxRetryDelay, delay.multiply(2))); + } + } + + @Override + public void run() { + try { + final T t = supplier.get(); + if (t != null) { + value.set(t); + accessor.set(new Initialized()); + initialized.countDown(); + return; + } + } catch (Throwable e) { + // TODO + e.printStackTrace(); + } + + /* + * Initialization didn't work. Let's try again + */ + final Initialize retry = retry(); + executor.schedule(retry, retry.delay.getTime(), retry.delay.getUnit()); + } + } + } + + class Initialized implements Accessor<T> { + public Initialized() { + executor.scheduleAtFixedRate(new Refresh(), refreshInterval.getTime(), refreshInterval.getTime(), refreshInterval.getUnit()); + } + + @Override + public T get() { + return value.get(); + } + + class Refresh implements Runnable { + @Override + public void run() { + try { + final T t = supplier.get(); + if (t != null) { + value.set(t); + } + } catch (Throwable e) { + // TODO + e.printStackTrace(); + } + } + } + } + + public static <T> CachedSupplier<T> of(final Supplier<T> supplier) { + return new Builder<T>().supplier(supplier).build(); + } + + public static <T> Builder<T> builder(final Supplier<T> supplier) { + return new Builder<T>().supplier(supplier); + } + + public static class TimeoutException extends RuntimeException { + // TODO + } + + private static class DaemonThreadFactory implements ThreadFactory { + @Override + public Thread newThread(final Runnable r) { + final Thread thread = new Thread(r); + thread.setName(CachedSupplier.class.getSimpleName() + " Supplier"); + thread.setDaemon(true); + return thread; + } + } + + public static class Builder<T> { + private Duration initialRetryDelay = new Duration(2, TimeUnit.SECONDS); + private Duration maxRetryDelay = new Duration(1, TimeUnit.HOURS); + private Duration accessTimeout = new Duration(30, TimeUnit.SECONDS); + private Duration refreshInterval = new Duration(1, TimeUnit.DAYS); + private Supplier<T> supplier; + + + public Builder<T> initialRetryDelay(final Duration initialRetryDelay) { + this.initialRetryDelay = initialRetryDelay; + return this; + } + + public Builder<T> initialRetryDelay(final int time, final TimeUnit unit) { + return initialRetryDelay(new Duration(time, unit)); + } + + public Builder<T> maxRetryDelay(final Duration maxRetryDelay) { + this.maxRetryDelay = maxRetryDelay; + return this; + } + + public Builder<T> maxRetryDelay(final int time, final TimeUnit unit) { + return maxRetryDelay(new Duration(time, unit)); + } + + public Builder<T> accessTimeout(final Duration accessTimeout) { + this.accessTimeout = accessTimeout; + return this; + } + + public Builder<T> accessTimeout(final int time, final TimeUnit unit) { + return accessTimeout(new Duration(time, unit)); + } + + + public Builder<T> refreshInterval(final Duration refreshInterval) { + this.refreshInterval = refreshInterval; + return this; + } + + public Builder<T> refreshInterval(final int time, final TimeUnit unit) { + return refreshInterval(new Duration(time, unit)); + } + + public Builder<T> supplier(final Supplier<T> supplier) { + this.supplier = supplier; + return this; + } + + public CachedSupplier<T> build() { + return new CachedSupplier<>(supplier, + initialRetryDelay, + maxRetryDelay, + accessTimeout, + refreshInterval); + } + } +} diff --git a/container/openejb-core/src/main/java/org/apache/openejb/util/Duration.java b/container/openejb-core/src/main/java/org/apache/openejb/util/Duration.java index dd7b1c6af4..9de1d84094 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/util/Duration.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/util/Duration.java @@ -169,6 +169,10 @@ public class Duration { return new Duration(n.a - n.b, n.base); } + public Duration multiply(final long n) { + return new Duration(getTime() * n, getUnit()); + } + public boolean greaterThan(final Duration that) { final Normalize n = new Normalize(this, that); return n.a > n.b; @@ -189,6 +193,16 @@ public class Duration { return n.a <= n.b; } + public static Duration max(final Duration a, final Duration b) { + final Normalize n = new Normalize(a, b); + return (n.a >= n.b) ? a : b; + } + + public static Duration min(final Duration a, final Duration b) { + final Normalize n = new Normalize(a, b); + return (n.a <= n.b) ? a : b; + } + public static Duration parse(final String text) { return new Duration(text); } diff --git a/container/openejb-core/src/test/java/org/apache/openejb/util/CachedSupplierTest.java b/container/openejb-core/src/test/java/org/apache/openejb/util/CachedSupplierTest.java new file mode 100644 index 0000000000..e179bc3acc --- /dev/null +++ b/container/openejb-core/src/test/java/org/apache/openejb/util/CachedSupplierTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.util; + +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +public class CachedSupplierTest { + + /** + * Supplier returns a valid result immediately and there + * are no delays on the first get. + * + * We also assert that calling get multiple times on the + * CachedSupplier return the cached result and do not get + * updated results before the refresh occurs. + */ + @Test + public void happyPath() { + } + + /** + * Supplier does not immediately return an initial instance, so we + * block till one is available. We assert that we blocked until get + * a valid result and no timeout or null is returned. + */ + @Test + public void delayedInitialization() { + } + + /** + * Supplier does not immediately return an initial instance + * and the timeout is reached. We assert a TimeoutException + * is thrown. We also assert that when the Supplier eventually + * does return a valid result we no longer get a TimeoutException + * or any blocking. + */ + @Test + public void delayedInitializationTimeout() { + } + + /** + * Supplier returns null on the first three calls to get. On the + * fourth retry a valid result is returned from get. We assert + * the number of times the supplier get is called as well as the + * time between each call to ensure exponential backoff is working + */ + @Test + public void initializationRetry() { + } + + /** + * Supplier returns null repeatedly on all initialization attempts. + * We assert that when the max retry time is reached all subsequent + * retries are at that same time interval and do not continue increasing + * exponentially. + */ + @Test + public void initializationRetryTillMax() { + } + + /** + * Suppler returns a valid result on initialization. We expect that even + * when we are not actively calling get() the value will be refreshed + * according to the refreshInterval. We wait for at least 3 refreshes + * to occur and assert the value we get is the most recent value returned + * from the supplier. We intentionally check for this expected value + * while the refresh is currently executing for the fourth time. We do + * that to ensure that there is no time values are null, even when we're + * concurrently trying to refresh it in the background. + */ + @Test + public void refreshReliablyCalled() { + } + + + /** + * On the first refresh the Supplier returns null indicating there is + * no valid replacement. We assert that the previous valid value is + * still in use. + */ + @Test + public void refreshFailedWithNull() { + } + + /** + * On the first refresh the Supplier throws an exception, therefore there is + * no valid replacement. We assert that the previous valid value is + * still in use. + */ + @Test + public void refreshFailedWithException() { + } + +} diff --git a/container/openejb-core/src/test/java/org/apache/openejb/util/DurationTest.java b/container/openejb-core/src/test/java/org/apache/openejb/util/DurationTest.java index 4f370dbac0..f0258da294 100644 --- a/container/openejb-core/src/test/java/org/apache/openejb/util/DurationTest.java +++ b/container/openejb-core/src/test/java/org/apache/openejb/util/DurationTest.java @@ -140,4 +140,21 @@ public class DurationTest extends TestCase { assertTrue(a.lessOrEqualTo(a)); assertFalse(b.lessOrEqualTo(a)); } + + public void testMultiply() { + { + final Duration a = new Duration(1, SECONDS); + final Duration b = a.multiply(3); + + assertEquals(b.getUnit(), SECONDS); + assertEquals(b.getTime(), 3); + } + { + final Duration a = new Duration(3, MINUTES); + final Duration b = a.multiply(3); + + assertEquals(b.getUnit(), MINUTES); + assertEquals(b.getTime(), 9); + } + } }