Repository: deltaspike Updated Branches: refs/heads/master 4523b5f35 -> 2b630819a
DELTASPIKE-1099 @Locked interceptor Project: http://git-wip-us.apache.org/repos/asf/deltaspike/repo Commit: http://git-wip-us.apache.org/repos/asf/deltaspike/commit/2b630819 Tree: http://git-wip-us.apache.org/repos/asf/deltaspike/tree/2b630819 Diff: http://git-wip-us.apache.org/repos/asf/deltaspike/diff/2b630819 Branch: refs/heads/master Commit: 2b630819a6392e52b8ff6c5d306c21aa8cd50e90 Parents: 4523b5f Author: Romain manni-Bucau <[email protected]> Authored: Wed Mar 23 10:34:04 2016 +0100 Committer: Romain manni-Bucau <[email protected]> Committed: Wed Mar 23 10:34:04 2016 +0100 ---------------------------------------------------------------------- .../apache/deltaspike/core/api/lock/Locked.java | 88 ++++++++++ .../core/impl/future/FutureableInterceptor.java | 15 +- .../core/impl/lock/LockedInterceptor.java | 170 +++++++++++++++++++ .../impl/throttling/ThrottledInterceptor.java | 15 +- .../core/impl/util/AnnotatedMethods.java | 49 ++++++ .../impl/src/main/resources/META-INF/beans.xml | 1 + .../test/core/impl/lock/LockedTest.java | 161 ++++++++++++++++++ .../deltaspike/test/core/impl/lock/Service.java | 54 ++++++ 8 files changed, 527 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/deltaspike/blob/2b630819/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/lock/Locked.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/lock/Locked.java b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/lock/Locked.java new file mode 100644 index 0000000..c866327 --- /dev/null +++ b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/lock/Locked.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.core.api.lock; + +import javax.enterprise.inject.spi.AnnotatedMethod; +import javax.enterprise.util.Nonbinding; +import javax.interceptor.InterceptorBinding; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; + +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * The access to the method is protected by a read/write lock. + */ +@InterceptorBinding +@Retention(RUNTIME) +@Target({ TYPE, METHOD }) +public @interface Locked +{ + /** + * @return if the lock is fair. + */ + @Nonbinding + boolean fair() default false; + + /** + * @return the operation used on the lock, default to read but you can use write. + */ + @Nonbinding + Operation operation() default Operation.READ; + + /** + * @return how to retrieve the lock for this method. Default uses a lock per class. + */ + @Nonbinding + Class<? extends LockFactory> factory() default LockFactory.class; + + /** + * @return the access timeout for this method. Ignored by default since it is 0. + */ + @Nonbinding + long timeout() default 0L; + + /** + * @return the timeout unit (default ms). + */ + @Nonbinding + TimeUnit timeoutUnit() default TimeUnit.MILLISECONDS; + + enum Operation + { + READ, WRITE + } + + /** + * Provide a way to switch the ReadWriteLock implementation for @Locked. + */ + interface LockFactory + { + /** + * @param method the intercepted method. + * @param fair is the lock fair. + * @return a read/write lock used for @Locked implementation. + */ + ReadWriteLock newLock(AnnotatedMethod<?> method, boolean fair); + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/2b630819/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java index a9119ed..6aec388 100644 --- a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java +++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java @@ -20,6 +20,7 @@ package org.apache.deltaspike.core.impl.future; import org.apache.deltaspike.core.api.config.ConfigResolver; import org.apache.deltaspike.core.api.future.Futureable; +import org.apache.deltaspike.core.impl.util.AnnotatedMethods; import javax.annotation.PreDestroy; import javax.enterprise.context.ApplicationScoped; @@ -174,19 +175,7 @@ public class FutureableInterceptor implements Serializable if (executorService == null) { final AnnotatedType<?> annotatedType = beanManager.createAnnotatedType(method.getDeclaringClass()); - AnnotatedMethod<?> annotatedMethod = null; - for (final AnnotatedMethod<?> am : annotatedType.getMethods()) - { - if (am.getJavaMember().equals(method)) - { - annotatedMethod = am; - break; - } - } - if (annotatedMethod == null) - { - throw new IllegalStateException("No annotated method for " + method); - } + final AnnotatedMethod<?> annotatedMethod = AnnotatedMethods.findMethod(annotatedType, method); final Futureable methodConfig = annotatedMethod.getAnnotation(Futureable.class); final ExecutorService instance = manager.find( (methodConfig == null ? annotatedType.getAnnotation(Futureable.class) : methodConfig).value()); http://git-wip-us.apache.org/repos/asf/deltaspike/blob/2b630819/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/lock/LockedInterceptor.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/lock/LockedInterceptor.java b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/lock/LockedInterceptor.java new file mode 100644 index 0000000..aff1efc --- /dev/null +++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/lock/LockedInterceptor.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.core.impl.lock; + +import org.apache.deltaspike.core.api.lock.Locked; +import org.apache.deltaspike.core.impl.util.AnnotatedMethods; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.inject.Typed; +import javax.enterprise.inject.spi.AnnotatedMethod; +import javax.enterprise.inject.spi.AnnotatedType; +import javax.enterprise.inject.spi.BeanManager; +import javax.inject.Inject; +import javax.interceptor.AroundInvoke; +import javax.interceptor.Interceptor; +import javax.interceptor.InvocationContext; +import java.io.Serializable; +import java.lang.reflect.Method; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.deltaspike.core.api.lock.Locked.Operation.READ; + +@Locked +@Interceptor +public class LockedInterceptor implements Serializable +{ + @Inject + private Locks locks; + + @AroundInvoke + public Object invoke(final InvocationContext ic) throws Exception + { + final Lock l = locks.lockAcquirer(ic).get(); + try + { + return ic.proceed(); + } + finally + { + l.unlock(); + } + } + + @ApplicationScoped + @Typed(Locks.class) + static class Locks implements Locked.LockFactory + { + private final ConcurrentMap<String, ReadWriteLock> locks = new ConcurrentHashMap<String, ReadWriteLock>(); + + // read or write + private final ConcurrentMap<Method, LockSupplier> lockSuppliers = new ConcurrentHashMap<Method, LockSupplier>(); + + @Inject + private BeanManager beanManager; + + LockSupplier lockAcquirer(final InvocationContext ic) + { + final Method key = ic.getMethod(); + LockSupplier operation = lockSuppliers.get(key); + if (operation == null) + { + final Class declaringClass = key.getDeclaringClass(); + final AnnotatedType<Object> annotatedType = beanManager.createAnnotatedType(declaringClass); + final AnnotatedMethod<?> annotatedMethod = AnnotatedMethods.findMethod(annotatedType, key); + + Locked config = annotatedMethod.getAnnotation(Locked.class); + if (config == null) + { + config = annotatedType.getAnnotation(Locked.class); + } + final Locked.LockFactory factory = config.factory() != Locked.LockFactory.class ? + Locked.LockFactory.class.cast( + beanManager.getReference(beanManager.resolve( + beanManager.getBeans( + config.factory())), + Locked.LockFactory.class, null)) : this; + + final ReadWriteLock writeLock = factory.newLock(annotatedMethod, config.fair()); + final long timeout = config.timeoutUnit().toMillis(config.timeout()); + final Lock lock = config.operation() == READ ? writeLock.readLock() : writeLock.writeLock(); + + if (timeout > 0) + { + operation = new LockSupplier() + { + @Override + public Lock get() + { + try + { + if (!lock.tryLock(timeout, TimeUnit.MILLISECONDS)) + { + throw new IllegalStateException("Can't lock for " + key + " in " + timeout + "ms"); + } + } + catch (final InterruptedException e) + { + Thread.interrupted(); + throw new IllegalStateException("Locking interrupted", e); + } + return lock; + } + }; + } + else + { + operation = new LockSupplier() + { + @Override + public Lock get() + { + lock.lock(); + return lock; + } + }; + } + + final LockSupplier existing = lockSuppliers.putIfAbsent(key, operation); + if (existing != null) + { + operation = existing; + } + } + return operation; + } + + @Override + public ReadWriteLock newLock(final AnnotatedMethod<?> method, final boolean fair) + { + final String name = method.getJavaMember().getDeclaringClass().getName(); + ReadWriteLock lock = locks.get(name); + if (lock == null) + { + lock = new ReentrantReadWriteLock(fair); + final ReadWriteLock existing = locks.putIfAbsent(name, lock); + if (existing != null) + { + lock = existing; + } + } + return lock; + } + } + + private interface LockSupplier // yes we miss a bit java 8 there + { + Lock get(); + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/2b630819/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java index 6b9e1f8..f02b14f 100644 --- a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java +++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java @@ -20,6 +20,7 @@ package org.apache.deltaspike.core.impl.throttling; import org.apache.deltaspike.core.api.throttling.Throttled; import org.apache.deltaspike.core.api.throttling.Throttling; +import org.apache.deltaspike.core.impl.util.AnnotatedMethods; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.inject.Typed; @@ -74,19 +75,7 @@ public class ThrottledInterceptor implements Serializable { final Class declaringClass = method.getDeclaringClass(); final AnnotatedType<Object> annotatedType = beanManager.createAnnotatedType(declaringClass); - AnnotatedMethod<?> annotatedMethod = null; - for (final AnnotatedMethod<?> am : annotatedType.getMethods()) - { - if (am.getJavaMember().equals(method)) - { - annotatedMethod = am; - break; - } - } - if (annotatedMethod == null) - { - throw new IllegalStateException("No annotated method for " + method); - } + final AnnotatedMethod<?> annotatedMethod = AnnotatedMethods.findMethod(annotatedType, method); Throttled config = annotatedMethod.getAnnotation(Throttled.class); if (config == null) http://git-wip-us.apache.org/repos/asf/deltaspike/blob/2b630819/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/util/AnnotatedMethods.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/util/AnnotatedMethods.java b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/util/AnnotatedMethods.java new file mode 100644 index 0000000..fc0c980 --- /dev/null +++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/util/AnnotatedMethods.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.core.impl.util; + +import javax.enterprise.inject.spi.AnnotatedMethod; +import javax.enterprise.inject.spi.AnnotatedType; +import java.lang.reflect.Method; + +public final class AnnotatedMethods +{ + private AnnotatedMethods() + { + // no-op + } + + public static AnnotatedMethod<?> findMethod(final AnnotatedType<?> type, final Method method) + { + AnnotatedMethod<?> annotatedMethod = null; + for (final AnnotatedMethod<?> am : type.getMethods()) + { + if (am.getJavaMember().equals(method)) + { + annotatedMethod = am; + break; + } + } + if (annotatedMethod == null) + { + throw new IllegalStateException("No annotated method for " + method); + } + return annotatedMethod; + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/2b630819/deltaspike/core/impl/src/main/resources/META-INF/beans.xml ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/main/resources/META-INF/beans.xml b/deltaspike/core/impl/src/main/resources/META-INF/beans.xml index 9994be5..5d96bf1 100644 --- a/deltaspike/core/impl/src/main/resources/META-INF/beans.xml +++ b/deltaspike/core/impl/src/main/resources/META-INF/beans.xml @@ -22,6 +22,7 @@ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/beans_1_0.xsd"> <interceptors> <class>org.apache.deltaspike.core.impl.throttling.ThrottledInterceptor</class> + <class>org.apache.deltaspike.core.impl.lock.LockedInterceptor</class> <class>org.apache.deltaspike.core.impl.future.FutureableInterceptor</class> </interceptors> </beans> http://git-wip-us.apache.org/repos/asf/deltaspike/blob/2b630819/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/lock/LockedTest.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/lock/LockedTest.java b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/lock/LockedTest.java new file mode 100644 index 0000000..c0f43ce --- /dev/null +++ b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/lock/LockedTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.test.core.impl.lock; + +import org.apache.deltaspike.test.util.ArchiveUtils; +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +import javax.inject.Inject; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(Arquillian.class) +public class LockedTest { + @Deployment + public static WebArchive deploy() + { + JavaArchive testJar = ShrinkWrap.create(JavaArchive.class, "FutureableTest.jar") + .addPackage(Service.class.getPackage().getName()) + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml"); + + return ShrinkWrap.create(WebArchive.class, "FutureableTest.war") + .addAsLibraries(ArchiveUtils.getDeltaSpikeCoreArchive()) + .addAsLibraries(testJar) + .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml"); + } + + @Inject + private Service service; + + @Test + public void simpleNotConcurrent() + { + final CountDownLatch synchro = new CountDownLatch(1); + final Thread writer = new Thread() + { + @Override + public void run() + { + service.write("test", "value"); + synchro.countDown(); + } + }; + + final CountDownLatch end = new CountDownLatch(1); + final AtomicReference<String> val = new AtomicReference<String>(); + final Thread reader = new Thread() + { + @Override + public void run() + { + try + { + synchro.await(1, TimeUnit.MINUTES); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + fail(); + } + val.set(service.read("test")); + end.countDown(); + } + }; + + reader.start(); + writer.start(); + try + { + end.await(1, TimeUnit.MINUTES); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + fail(); + } + assertEquals("value", val.get()); + } + + @Test + public void concurrentTimeout() + { + final AtomicBoolean doAgain = new AtomicBoolean(true); + final CountDownLatch endWriter = new CountDownLatch(1); + final Thread writer = new Thread() + { + @Override + public void run() + { + while (doAgain.get()) + { + service.write("test", "value"); + service.force(); + } + endWriter.countDown(); + } + }; + + final CountDownLatch endReader = new CountDownLatch(1); + final Thread reader = new Thread() + { + @Override + public void run() + { + while (doAgain.get()) + { + try + { + service.read("test"); + } + catch (final IllegalStateException e) + { + doAgain.set(false); + } + } + endReader.countDown(); + } + }; + + reader.start(); + writer.start(); + try + { + endReader.await(1, TimeUnit.MINUTES); + endWriter.await(1, TimeUnit.MINUTES); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + fail(); + } + assertEquals("value", service.read("test")); + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/2b630819/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/lock/Service.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/lock/Service.java b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/lock/Service.java new file mode 100644 index 0000000..e2c7894 --- /dev/null +++ b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/lock/Service.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.test.core.impl.lock; + +import org.apache.deltaspike.core.api.lock.Locked; + +import javax.enterprise.context.ApplicationScoped; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.deltaspike.core.api.lock.Locked.Operation.WRITE; +import static org.junit.Assert.fail; + +@ApplicationScoped +public class Service { + private final Map<String, String> entries = new HashMap<String, String>(); + + @Locked(timeout = 1, timeoutUnit = TimeUnit.SECONDS) + public String read(final String k) { + return entries.get(k); + } + + @Locked(timeout = 1, timeoutUnit = TimeUnit.SECONDS, operation = WRITE) + public void write(final String k, final String v) { + entries.put(k, v); + } + + @Locked(operation = WRITE) + public void force() { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); + } catch (final InterruptedException e) { + Thread.interrupted(); + fail(); + } + } +}
