DELTASPIKE-1118 refactoring to ThrottledStrategy
Project: http://git-wip-us.apache.org/repos/asf/deltaspike/repo Commit: http://git-wip-us.apache.org/repos/asf/deltaspike/commit/18e1af70 Tree: http://git-wip-us.apache.org/repos/asf/deltaspike/tree/18e1af70 Diff: http://git-wip-us.apache.org/repos/asf/deltaspike/diff/18e1af70 Branch: refs/heads/master Commit: 18e1af703e06e629caadcc949bad0fd566b3d24c Parents: ac3d582 Author: gpetracek <[email protected]> Authored: Tue Apr 12 00:11:53 2016 +0200 Committer: gpetracek <[email protected]> Committed: Tue Apr 12 01:02:11 2016 +0200 ---------------------------------------------------------------------- .../core/spi/throttling/ThrottledStrategy.java | 25 ++++ .../throttling/DefaultThrottledStrategy.java | 38 +++++ .../core/impl/throttling/Invoker.java | 83 ++++++++++ .../core/impl/throttling/InvokerStorage.java | 111 ++++++++++++++ .../impl/throttling/ThrottledInterceptor.java | 150 +------------------ 5 files changed, 260 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/deltaspike/blob/18e1af70/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/spi/throttling/ThrottledStrategy.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/spi/throttling/ThrottledStrategy.java b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/spi/throttling/ThrottledStrategy.java new file mode 100644 index 0000000..e1a290a --- /dev/null +++ b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/spi/throttling/ThrottledStrategy.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.core.spi.throttling; + +import org.apache.deltaspike.core.spi.InterceptorStrategy; + +public interface ThrottledStrategy extends InterceptorStrategy +{ +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/18e1af70/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/DefaultThrottledStrategy.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/DefaultThrottledStrategy.java b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/DefaultThrottledStrategy.java new file mode 100644 index 0000000..e31241f --- /dev/null +++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/DefaultThrottledStrategy.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.core.impl.throttling; + +import org.apache.deltaspike.core.spi.throttling.ThrottledStrategy; + +import javax.enterprise.context.Dependent; +import javax.inject.Inject; +import javax.interceptor.InvocationContext; + +@Dependent +public class DefaultThrottledStrategy implements ThrottledStrategy +{ + @Inject + private InvokerStorage metadata; + + @Override + public Object execute(InvocationContext ic) throws Exception + { + return metadata.getOrCreateInvoker(ic).invoke(ic); + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/18e1af70/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/Invoker.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/Invoker.java b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/Invoker.java new file mode 100644 index 0000000..0d4e49f --- /dev/null +++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/Invoker.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.core.impl.throttling; + +import org.apache.deltaspike.core.util.ExceptionUtils; + +import javax.interceptor.InvocationContext; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +class Invoker +{ + private final int weight; + private final Semaphore semaphore; + private final long timeout; + + Invoker(final Semaphore semaphore, final int weight, final long timeout) + { + this.semaphore = semaphore; + this.weight = weight; + this.timeout = timeout; + } + + public Object invoke(final InvocationContext context) throws Exception + { + if (timeout > 0) + { + try + { + if (!semaphore.tryAcquire(weight, timeout, TimeUnit.MILLISECONDS)) + { + throw new IllegalStateException( + "Can't acquire " + weight + " permits for " + context.getMethod() + " in " + timeout + "ms"); + } + } + catch (final InterruptedException e) + { + return onInterruption(e); + } + } + else + { + try + { + semaphore.acquire(weight); + } + catch (final InterruptedException e) + { + return onInterruption(e); + } + } + try + { + return context.proceed(); + } + finally + { + semaphore.release(weight); + } + } + + private static Semaphore onInterruption(final InterruptedException e) + { + Thread.interrupted(); + throw ExceptionUtils.throwAsRuntimeException(e); + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/18e1af70/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/InvokerStorage.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/InvokerStorage.java b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/InvokerStorage.java new file mode 100644 index 0000000..3f1d69d --- /dev/null +++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/InvokerStorage.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.core.impl.throttling; + + +import org.apache.deltaspike.core.api.throttling.Throttled; +import org.apache.deltaspike.core.api.throttling.Throttling; +import org.apache.deltaspike.core.impl.util.AnnotatedMethods; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.inject.Typed; +import javax.enterprise.inject.spi.AnnotatedMethod; +import javax.enterprise.inject.spi.AnnotatedType; +import javax.enterprise.inject.spi.BeanManager; +import javax.inject.Inject; +import javax.interceptor.InvocationContext; +import java.lang.reflect.Method; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; + +@ApplicationScoped +@Typed(InvokerStorage.class) +public class InvokerStorage implements Throttling.SemaphoreFactory +{ + private final ConcurrentMap<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>(); + private final ConcurrentMap<Method, Invoker> providers = new ConcurrentHashMap<Method, Invoker>(); + + @Inject + private BeanManager beanManager; + + Invoker getOrCreateInvoker(final InvocationContext ic) + { + final Method method = ic.getMethod(); + Invoker i = providers.get(method); + if (i == null) + { + final Class declaringClass = method.getDeclaringClass(); + final AnnotatedType<Object> annotatedType = beanManager.createAnnotatedType(declaringClass); + final AnnotatedMethod<?> annotatedMethod = AnnotatedMethods.findMethod(annotatedType, method); + + Throttled config = annotatedMethod.getAnnotation(Throttled.class); + if (config == null) + { + config = annotatedType.getAnnotation(Throttled.class); + } + Throttling sharedConfig = annotatedMethod.getAnnotation(Throttling.class); + if (sharedConfig == null) + { + sharedConfig = annotatedType.getAnnotation(Throttling.class); + } + + final Throttling.SemaphoreFactory factory = + sharedConfig != null && sharedConfig.factory() != Throttling.SemaphoreFactory.class ? + Throttling.SemaphoreFactory.class.cast( + beanManager.getReference(beanManager.resolve( + beanManager.getBeans( + sharedConfig.factory())), + Throttling.SemaphoreFactory.class, null)) : this; + + final Semaphore semaphore = factory.newSemaphore( + annotatedMethod, + sharedConfig != null && !sharedConfig.name().isEmpty() ? + sharedConfig.name() : declaringClass.getName(), + sharedConfig != null && sharedConfig.fair(), + sharedConfig != null ? sharedConfig.permits() : 1); + final long timeout = config.timeoutUnit().toMillis(config.timeout()); + final int weigth = config.weight(); + i = new Invoker(semaphore, weigth, timeout); + final Invoker existing = providers.putIfAbsent(ic.getMethod(), i); + if (existing != null) + { + i = existing; + } + } + return i; + } + + @Override + public Semaphore newSemaphore(final AnnotatedMethod<?> method, final String name, + final boolean fair, final int permits) + { + Semaphore semaphore = semaphores.get(name); + if (semaphore == null) + { + semaphore = new Semaphore(permits, fair); + final Semaphore existing = semaphores.putIfAbsent(name, semaphore); + if (existing != null) + { + semaphore = existing; + } + } + return semaphore; + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/18e1af70/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java index f02b14f..19ddd5c 100644 --- a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java +++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java @@ -19,168 +19,24 @@ package org.apache.deltaspike.core.impl.throttling; import org.apache.deltaspike.core.api.throttling.Throttled; -import org.apache.deltaspike.core.api.throttling.Throttling; -import org.apache.deltaspike.core.impl.util.AnnotatedMethods; +import org.apache.deltaspike.core.spi.throttling.ThrottledStrategy; -import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.inject.Typed; -import javax.enterprise.inject.spi.AnnotatedMethod; -import javax.enterprise.inject.spi.AnnotatedType; -import javax.enterprise.inject.spi.BeanManager; import javax.inject.Inject; import javax.interceptor.AroundInvoke; import javax.interceptor.Interceptor; import javax.interceptor.InvocationContext; import java.io.Serializable; -import java.lang.reflect.Method; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; @Throttled @Interceptor public class ThrottledInterceptor implements Serializable { @Inject - private LocalCache metadata; + private ThrottledStrategy throttledStrategy; @AroundInvoke public Object invoke(final InvocationContext ic) throws Exception { - return metadata.getOrCreateInvocation(ic).invoke(ic); - } - - private static Semaphore onInterruption(final InterruptedException e) - { - Thread.interrupted(); - throw new IllegalStateException("acquire() interrupted", e); - } - - @ApplicationScoped - @Typed(LocalCache.class) - static class LocalCache implements Throttling.SemaphoreFactory - { - private final ConcurrentMap<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>(); - private final ConcurrentMap<Method, Invocation> providers = new ConcurrentHashMap<Method, Invocation>(); - - @Inject - private BeanManager beanManager; - - Invocation getOrCreateInvocation(final InvocationContext ic) - { - final Method method = ic.getMethod(); - Invocation i = providers.get(method); - if (i == null) - { - final Class declaringClass = method.getDeclaringClass(); - final AnnotatedType<Object> annotatedType = beanManager.createAnnotatedType(declaringClass); - final AnnotatedMethod<?> annotatedMethod = AnnotatedMethods.findMethod(annotatedType, method); - - Throttled config = annotatedMethod.getAnnotation(Throttled.class); - if (config == null) - { - config = annotatedType.getAnnotation(Throttled.class); - } - Throttling sharedConfig = annotatedMethod.getAnnotation(Throttling.class); - if (sharedConfig == null) - { - sharedConfig = annotatedType.getAnnotation(Throttling.class); - } - - final Throttling.SemaphoreFactory factory = - sharedConfig != null && sharedConfig.factory() != Throttling.SemaphoreFactory.class ? - Throttling.SemaphoreFactory.class.cast( - beanManager.getReference(beanManager.resolve( - beanManager.getBeans( - sharedConfig.factory())), - Throttling.SemaphoreFactory.class, null)) : this; - - final Semaphore semaphore = factory.newSemaphore( - annotatedMethod, - sharedConfig != null && !sharedConfig.name().isEmpty() ? - sharedConfig.name() : declaringClass.getName(), - sharedConfig != null && sharedConfig.fair(), - sharedConfig != null ? sharedConfig.permits() : 1); - final long timeout = config.timeoutUnit().toMillis(config.timeout()); - final int weigth = config.weight(); - i = new Invocation(semaphore, weigth, timeout); - final Invocation existing = providers.putIfAbsent(ic.getMethod(), i); - if (existing != null) - { - i = existing; - } - } - return i; - } - - @Override - public Semaphore newSemaphore(final AnnotatedMethod<?> method, final String name, - final boolean fair, final int permits) - { - Semaphore semaphore = semaphores.get(name); - if (semaphore == null) - { - semaphore = new Semaphore(permits, fair); - final Semaphore existing = semaphores.putIfAbsent(name, semaphore); - if (existing != null) - { - semaphore = existing; - } - } - return semaphore; - } - } - - private static final class Invocation - { - private final int weight; - private final Semaphore semaphore; - private final long timeout; - - private Invocation(final Semaphore semaphore, final int weight, final long timeout) - { - this.semaphore = semaphore; - this.weight = weight; - this.timeout = timeout; - } - - Object invoke(final InvocationContext context) throws Exception - { - if (timeout > 0) - { - try - { - if (!semaphore.tryAcquire(weight, timeout, TimeUnit.MILLISECONDS)) - { - throw new IllegalStateException("Can't acquire " + weight + - " permits for " + context.getMethod() + " in " + timeout + "ms"); - } - } - catch (final InterruptedException e) - { - return onInterruption(e); - } - } - else - { - try - { - semaphore.acquire(weight); - } - catch (final InterruptedException e) - { - return onInterruption(e); - } - } - try - { - return context.proceed(); - } - finally - { - semaphore.release(weight); - } - } + return throttledStrategy.execute(ic); } }
