Repository: incubator-reef Updated Branches: refs/heads/master 763a6271a -> c41eebde9
[REEF-41] Replace Guava cache usage in core REEF Add Cache interface to reef-utils and basic CacheImpl. Cache interface provides get-if-absent-compute and explicit invalidation and CacheImpl provides an implementation that also includes expire-after-write timeout. JIRA: REEF-41 https://issues.apache.org/jira/browse/REEF-41 Pull Request: This closes #122 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c41eebde Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c41eebde Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c41eebde Branch: refs/heads/master Commit: c41eebde98ddc7d569d140cdff7b4cb600e56c29 Parents: 763a627 Author: Brian Cho <[email protected]> Authored: Thu Mar 26 17:23:05 2015 +0900 Committer: Markus Weimer <[email protected]> Committed: Fri Mar 27 09:11:05 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/reef/io/network/Cache.java | 53 ----- .../reef/io/network/naming/NameCache.java | 14 +- .../reef/io/network/naming/NameClient.java | 2 +- .../io/network/naming/NameLookupClient.java | 2 +- lang/java/reef-utils/pom.xml | 5 + .../java/org/apache/reef/util/cache/Cache.java | 48 +++++ .../org/apache/reef/util/cache/CacheImpl.java | 125 ++++++++++++ .../org/apache/reef/util/cache/CurrentTime.java | 26 +++ .../org/apache/reef/util/cache/SystemTime.java | 33 ++++ .../apache/reef/util/cache/WrappedValue.java | 92 +++++++++ .../util/cache/CacheImplConcurrentTest.java | 198 +++++++++++++++++++ .../apache/reef/util/cache/CacheImplTest.java | 131 ++++++++++++ .../reef/util/cache/ImmediateInteger.java | 37 ++++ .../apache/reef/util/cache/SleepingInteger.java | 46 +++++ .../reef/util/cache/WrappedValueTest.java | 90 +++++++++ 15 files changed, 839 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java deleted file mode 100644 index 718a576..0000000 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.io.network; - -import org.apache.reef.exception.evaluator.NetworkException; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; - -/** - * Cache for network and naming services - */ -public interface Cache<K, V> { - /** - * Constructs with timeout - * key is evicted when it's not used for timeout milli-seconds - */ - - /** - * Returns a value for the key if cached; otherwise creates, caches and returns - * When it creates a value for a key, only one callable for the key is executed - * - * @param key a key - * @param callable a value fetcher - * @return a value - * @throws NetworkException - */ - public V get(K key, Callable<V> valueFetcher) throws ExecutionException; - - /** - * Invalidates a key from the cache - * - * @param key a key - */ - public void invalidate(K key); - -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java index 384bfe4..e32b144 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java @@ -18,31 +18,29 @@ */ package org.apache.reef.io.network.naming; -import com.google.common.cache.CacheBuilder; -import org.apache.reef.io.network.Cache; +import org.apache.reef.util.cache.Cache; +import org.apache.reef.util.cache.CacheImpl; +import org.apache.reef.util.cache.SystemTime; import org.apache.reef.wake.Identifier; import java.net.InetSocketAddress; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; /** * Naming cache implementation */ public class NameCache implements Cache<Identifier, InetSocketAddress> { - private final com.google.common.cache.Cache<Identifier, InetSocketAddress> cache; + private final Cache<Identifier, InetSocketAddress> cache; /** * Constructs a naming cache * - * @param timeout a cache entry timeout after access + * @param timeout a cache entry timeout after write */ public NameCache(long timeout) { - cache = CacheBuilder.newBuilder() - .expireAfterWrite(timeout, TimeUnit.MILLISECONDS) - .build(); + cache = new CacheImpl<>(new SystemTime(), timeout); } /** http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java index 79b4a92..966ac94 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java @@ -19,7 +19,7 @@ package org.apache.reef.io.network.naming; import org.apache.reef.io.naming.Naming; -import org.apache.reef.io.network.Cache; +import org.apache.reef.util.cache.Cache; import org.apache.reef.io.network.naming.exception.NamingRuntimeException; import org.apache.reef.io.network.naming.serialization.NamingLookupResponse; import org.apache.reef.io.network.naming.serialization.NamingMessage; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java index 31b7fca..3a19b99 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java @@ -20,7 +20,7 @@ package org.apache.reef.io.network.naming; import org.apache.reef.io.naming.NameAssignment; import org.apache.reef.io.naming.NamingLookup; -import org.apache.reef.io.network.Cache; +import org.apache.reef.util.cache.Cache; import org.apache.reef.io.network.naming.exception.NamingException; import org.apache.reef.io.network.naming.serialization.NamingLookupRequest; import org.apache.reef.io.network.naming.serialization.NamingLookupResponse; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/pom.xml b/lang/java/reef-utils/pom.xml index f553be8..6d9c621 100644 --- a/lang/java/reef-utils/pom.xml +++ b/lang/java/reef-utils/pom.xml @@ -34,6 +34,11 @@ under the License. <!-- This module shouldn't have many dependencies to make sure it is broadly usable across reef subprojects --> <dependencies> <dependency> + <groupId>javax.inject</groupId> + <artifactId>javax.inject</artifactId> + <version>1</version> + </dependency> + <dependency> <groupId>net.jcip</groupId> <artifactId>jcip-annotations</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/Cache.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/Cache.java b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/Cache.java new file mode 100644 index 0000000..ef78753 --- /dev/null +++ b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/Cache.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.util.cache; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +/** + * Cache with get-if-absent-compute semantics. + * Supports explicit invalidation. + * Implementation may add other features, e.g. eviction on expire-after-write + */ +public interface Cache<K, V> { + /** + * Returns a value for the key if cached; otherwise creates, caches and returns + * When it creates a value for a key, only one callable for the key is executed + * + * @param key a key + * @param valueFetcher a value fetcher + * @return a value + * @throws ExecutionException + */ + public V get(K key, Callable<V> valueFetcher) throws ExecutionException; + + /** + * Invalidates a key from the cache + * + * @param key a key + */ + public void invalidate(K key); + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CacheImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CacheImpl.java b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CacheImpl.java new file mode 100644 index 0000000..778fcb3 --- /dev/null +++ b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CacheImpl.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.util.cache; + +import org.apache.reef.util.Optional; + +import javax.inject.Inject; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Implementation that supports expire-after-write. + * Entries that have expired are collected and invalidated on get calls. + * This obviates the need for a separate thread to invalidate expired entries, at + * the cost of some increase in get call latency. + * The invalidation sweep is only initiated after an interval (expireCheckInterval) + * has passed, and at most one invalidation sweep is run at a time. + * + * Operations on a single key are linearizable. The argument is: + * 1. The putIfAbsent call in get guarantees that loadAndGet is called exactly once + * for a WrappedValue instance that is put into the map: All putIfAbsent calls + * that return the WrappedValue instance will return the value loaded by loadAndGet. + * 2. Concurrent putIfAbsent and remove calls on a key have an ordering: if putIfAbsent + * returns null then it happened after the remove (and a new value will be loaded); + * else if it returns non-null then it happened before the remove + * (and the previous value will be returned). + */ +public final class CacheImpl<K, V> implements Cache<K, V> { + private final ConcurrentMap<K, WrappedValue<V>> internalMap; + private final CurrentTime currentTime; + private final long timeoutMillis; + private final long expireCheckInterval; + private final AtomicBoolean expireInProgress; + + private long expireCheckedTime; + + /** + * Construct an expire-after-write cache + * + * @param currentTime class that returns the current time for timeout purposes + * @param timeoutMillis a cache entry timeout after write + */ + @Inject + public CacheImpl(final CurrentTime currentTime, + final long timeoutMillis) { + this.internalMap = new ConcurrentHashMap<>(); + this.currentTime = currentTime; + this.timeoutMillis = timeoutMillis; + this.expireCheckInterval = timeoutMillis / 2; + this.expireInProgress = new AtomicBoolean(false); + + this.expireCheckedTime = currentTime.now(); + } + + @Override + public V get(final K key, final Callable<V> valueFetcher) throws ExecutionException { + // Before get, try to invalidate as many expired as possible + expireEntries(); + + final WrappedValue<V> newWrappedValue = new WrappedValue<>(valueFetcher, currentTime); + final WrappedValue<V> existingWrappedValue = internalMap.putIfAbsent(key, newWrappedValue); + + if (existingWrappedValue == null) { + // If absent, compute and return + return newWrappedValue.loadAndGet(); + } else { + final Optional<V> existingValue = existingWrappedValue.getValue(); + if (existingValue.isPresent()) { + // If value already exists, get (without locking) and return + return existingValue.get(); + } else { + // If value is being computed, wait for computation to complete + return existingWrappedValue.waitAndGet(); + } + } + } + + private void expireEntries() { + if (expireInProgress.compareAndSet(false, true)) { + final long now = currentTime.now(); + if (expireCheckedTime + expireCheckInterval < now) { + expireEntriesAtTime(now); + expireCheckedTime = now; + } + expireInProgress.compareAndSet(true, false); + } + } + + private void expireEntriesAtTime(final long now) { + for (final K key : internalMap.keySet()) { + final WrappedValue<V> wrappedValue = internalMap.get(key); + if (wrappedValue != null) { + final Optional<Long> writeTime = wrappedValue.getWriteTime(); + if (writeTime.isPresent() && + writeTime.get() + timeoutMillis < now) { + invalidate(key); + } + } + } + } + + @Override + public void invalidate(final K key) { + internalMap.remove(key); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CurrentTime.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CurrentTime.java b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CurrentTime.java new file mode 100644 index 0000000..6157210 --- /dev/null +++ b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CurrentTime.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.util.cache; + +/** + * Return the current time + */ +public interface CurrentTime { + long now(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/SystemTime.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/SystemTime.java b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/SystemTime.java new file mode 100644 index 0000000..c1d19ce --- /dev/null +++ b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/SystemTime.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.util.cache; + +/** + * Return the system time + */ +public final class SystemTime implements CurrentTime { + + /** + * @return System time in milliseconds + */ + @Override + public long now() { + return System.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/WrappedValue.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/WrappedValue.java b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/WrappedValue.java new file mode 100644 index 0000000..abc2a07 --- /dev/null +++ b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/WrappedValue.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.util.cache; + +import org.apache.reef.util.Optional; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +/** + * A representation of a cached entry. + * Contains a value and when it was written. + */ +final class WrappedValue<V> { + private final Callable<V> valueFetcher; + private final CurrentTime currentTime; + + private Optional<V> value; + private Optional<Long> writeTime; + + /** + * Construct a representation of a cached entry. + * The value is written after computing valueFetcher and the time + * it was written is recorded using currentTime. + * + * @param valueFetcher method used to fetch the value + * @param currentTime class that returns the current time + */ + public WrappedValue(final Callable<V> valueFetcher, + final CurrentTime currentTime) { + this.valueFetcher = valueFetcher; + this.currentTime = currentTime; + + this.value = Optional.empty(); + this.writeTime = Optional.empty(); + } + + public Optional<Long> getWriteTime() { + return writeTime; + } + + public Optional<V> getValue() { + return value; + } + + /** + * Must only be called once, by the thread that created this WrappedValue + * @return The value returned by valueFetcher + */ + public synchronized V loadAndGet() throws ExecutionException { + try { + value = Optional.ofNullable(valueFetcher.call()); + } catch (Exception e) { + throw new ExecutionException(e); + } finally { + writeTime = Optional.of(currentTime.now()); + this.notifyAll(); + } + if (!value.isPresent()) { + throw new ExecutionException("valueFetcher returned null", new NullPointerException()); + } else { + return value.get(); + } + } + + public synchronized V waitAndGet() { + while (!value.isPresent()) { + try { + this.wait(); + } catch (InterruptedException e) { + // Ignore, as while loop will be re-entered + } + } + return value.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplConcurrentTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplConcurrentTest.java b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplConcurrentTest.java new file mode 100644 index 0000000..153a526 --- /dev/null +++ b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplConcurrentTest.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.util.cache; + +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test concurrent access of CacheImpl + */ +public final class CacheImplConcurrentTest { + + private Cache<String, Integer> cache; + private final CurrentTime currentTime = new SystemTime(); + private final long timeoutMillis = 4000; + private final long computationMillis = 2000; + private final int numConcurrentCalls = 10; + + @Before + public void setUp() { + cache = new CacheImpl<>(currentTime, timeoutMillis); + } + + /** + * Test that the value computed on a get is returned for that key + * on the first and subsequent concurrent calls. + * In particular, for this test the first call takes awhile to compute. + */ + @Test + public void testGetReturnsFirstValue() throws ExecutionException, InterruptedException { + final String key = "testGetReturnsFirstValue"; + final int firstValue = 20; + final int secondValue = 40; + + final ExecutorService es = Executors.newFixedThreadPool(numConcurrentCalls); + es.submit(new Runnable() { + @Override + public void run() { + final int getFirstValue1; + try { + // Assert that firstValue is returned, even when other gets are called during the Callable execution + getFirstValue1 = cache.get(key, new SleepingInteger(firstValue, computationMillis)); + assertEquals(firstValue, getFirstValue1); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + }); + + Thread.sleep(500); + + for (int i = 1; i < numConcurrentCalls; i++) { + final int index = i; + es.submit(new Runnable() { + @Override + public void run() { + try { + // The original cached value should be retrieved + final int getFirstValue2 = cache.get(key, new ImmediateInteger(secondValue)); + assertEquals(firstValue, getFirstValue2); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + }); + } + + es.shutdown(); + assertTrue("Tasks should finish before timeout", es.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS)); + } + + /** + * Test that the same value computed on a get is returned for that key + * on all concurrent calls. + * In particular, for this test each thread would have computed a distinct value, + * but only one thread "comes first" and all other threads return this value. + */ + @Test + public void testGetReturnsSameValue() throws InterruptedException { + final String key = "testGetReturnsSameValue"; + final int[] values = new int[numConcurrentCalls]; + final int[] getValues = new int[numConcurrentCalls]; + final int nullValue = -1; + for (int i = 0; i < numConcurrentCalls; i++) { + values[i] = i; + getValues[i] = nullValue; + } + + final ExecutorService es = Executors.newFixedThreadPool(numConcurrentCalls); + + for (int i = 0; i < numConcurrentCalls; i++) { + final int index = i; + es.submit(new Runnable() { + @Override + public void run() { + try { + getValues[index] = cache.get(key, new ImmediateInteger(values[index])); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + }); + } + + es.shutdown(); + assertTrue("Tasks should finish before timeout", es.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS)); + + assertNotEquals("The value should be set", nullValue, getValues[0]); + for (int i = 1; i < numConcurrentCalls; i++) { + assertEquals(getValues[i-1], getValues[i]); + } + } + + /** + * Test that all gets called before an invalidate returns the same value, and all + * gets called after the invalidate returns a newly computed value. + * In particular, for this test the computation for the initial get is still + * running while the subsequent gets and invalidate are called. + */ + @Test + public void testInvalidateDuringCallableExecution() throws ExecutionException, InterruptedException { + final String key = "testGet"; + final int firstValue = 20; + final int secondValue = 40; + + final ExecutorService es = Executors.newSingleThreadExecutor(); + es.submit(new Runnable() { + @Override + public void run() { + final int getFirstValue1; + try { + // Assert that firstValue is returned, even when it is invalidated during the Callable execution + getFirstValue1 = cache.get(key, new SleepingInteger(firstValue, computationMillis)); + assertEquals(firstValue, getFirstValue1); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + }); + + Thread.sleep(500); + + // In this test, the calls are sequential, but we still run the same number of calls + final int numSequentialCalls = numConcurrentCalls; + + final int indexToInvalidateOn = numSequentialCalls / 2; + for (int i = 1; i < numSequentialCalls; i++) { + final int index = i; + if (index == indexToInvalidateOn) { + cache.invalidate(key); + } else if (index < indexToInvalidateOn) { + try { + // The original cached value should be retrieved, even when it is invalidated during the Callable execution + final int getFirstValue2 = cache.get(key, new ImmediateInteger(secondValue)); + assertEquals(firstValue, getFirstValue2); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } else { + try { + // The second value should be retrieved, because the cache has been invalidated + final int getFirstValue2 = cache.get(key, new ImmediateInteger(secondValue)); + assertEquals(secondValue, getFirstValue2); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + } + + es.shutdown(); + assertTrue("Tasks should finish before timeout", es.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplTest.java b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplTest.java new file mode 100644 index 0000000..bb244cf --- /dev/null +++ b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplTest.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.util.cache; + +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; + +/** + * Test basic access of CacheImpl + */ +public final class CacheImplTest { + + private Cache<String, Integer> cache; + private final CurrentTime currentTime = new SystemTime(); + private final long timeoutMillis = 3000; + + @Before + public void setUp() { + cache = new CacheImpl<>(currentTime, timeoutMillis); + } + + /** + * Test that an immediate get on the same key returns the cached value instead of triggering a new computation + */ + @Test + public void testGet() throws ExecutionException, InterruptedException { + final String key = "testGet"; + final int firstValue = 20; + final int secondValue = 40; + + final int getFirstValue1 = cache.get(key, new ImmediateInteger(firstValue)); + assertEquals(firstValue, getFirstValue1); + + // The original cached value should be retrieved if called immediately (before a timeout) + final int getFirstValue2 = cache.get(key, new ImmediateInteger(secondValue)); + assertEquals(firstValue, getFirstValue2); + + } + + /** + * Test that an invalidate clears the cached value, so the next access triggers a new computation + */ + @Test + public void testInvalidate() throws ExecutionException { + final String key = "testGet"; + final int firstValue = 20; + final int secondValue = 40; + + final int getValue = cache.get(key, new ImmediateInteger(firstValue)); + assertEquals(firstValue, getValue); + + cache.invalidate(key); + + // The second cached value should be retrieved after invalidation + final int getSecondValue = cache.get(key, new ImmediateInteger(secondValue)); + assertEquals(secondValue, getSecondValue); + } + + /** + * Test expire-after-write by sleeping beyond the timeout. + * Also, the test is designed to fail if the cache is actually expire-after-access. + */ + @Test + public void testExpireOnWrite() throws ExecutionException, InterruptedException { + final String key = "testExpireOnWrite"; + final int firstValue = 20; + final int secondValue = 40; + + final int getFirstValue1 = cache.get(key, new ImmediateInteger(firstValue)); + assertEquals(firstValue, getFirstValue1); + + // Sleep less than timeout and do another access; value should be the same + Thread.sleep(timeoutMillis/2); + final int getFirstValue2 = cache.get(key, new ImmediateInteger(firstValue)); + assertEquals(firstValue, getFirstValue2); + + // Sleep enough to trigger expire-after-write timeout + Thread.sleep(timeoutMillis + timeoutMillis/4); + // The next cached value should be retrieved after timeout + final int getSecondValue = cache.get(key, new ImmediateInteger(secondValue)); + assertEquals(secondValue, getSecondValue); + } + + /** + * Test expire-after-write is implemented _per-key_. + * The test is designed to fail if the cache actually resets the timer on a write to a different key. + */ + @Test + public void testExpireOnWritePerKey() throws ExecutionException, InterruptedException { + final String key = "testExpireOnWritePerKey"; + final String differentKey = "differentKey"; + final int firstValue = 20; + final int secondValue = 40; + + final int getFirstValue = cache.get(key, new ImmediateInteger(firstValue)); + assertEquals(firstValue, getFirstValue); + + // Sleep less than timeout and do a write on a different key; it should not affect + // the expiration of the original key + Thread.sleep(timeoutMillis/2); + final int getFirstValueForDifferentKey = cache.get(differentKey, new ImmediateInteger(firstValue)); + assertEquals(firstValue, getFirstValueForDifferentKey); + + // Sleep enough to trigger timeout + Thread.sleep(timeoutMillis + timeoutMillis/4); + + // The next cached value should be retrieved after timeout + final int getSecondValue = cache.get(key, new ImmediateInteger(secondValue)); + assertEquals(secondValue, getSecondValue); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/ImmediateInteger.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/ImmediateInteger.java b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/ImmediateInteger.java new file mode 100644 index 0000000..3b09373 --- /dev/null +++ b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/ImmediateInteger.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.util.cache; + +import java.util.concurrent.Callable; + +/** + * A mock computation that immediately returns an integer + */ +class ImmediateInteger implements Callable<Integer> { + private final int returnValue; + + public ImmediateInteger(final int returnValue) { + this.returnValue = returnValue; + } + + @Override + public Integer call() throws Exception { + return returnValue; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/SleepingInteger.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/SleepingInteger.java b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/SleepingInteger.java new file mode 100644 index 0000000..223e123 --- /dev/null +++ b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/SleepingInteger.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.util.cache; + +import java.util.concurrent.Callable; + +/** + * A mock computation that simulates computation time by sleeping and returns an integer + */ +class SleepingInteger implements Callable<Integer> { + private final int returnValue; + private final long sleepMillis; + + /** + * Construct the mock value fetcher + * + * @param returnValue value to return + * @param sleepMillis amount to sleep + */ + public SleepingInteger(final int returnValue, final long sleepMillis) { + this.returnValue = returnValue; + this.sleepMillis = sleepMillis; + } + + @Override + public Integer call() throws Exception { + Thread.sleep(sleepMillis); + return returnValue; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/WrappedValueTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/WrappedValueTest.java b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/WrappedValueTest.java new file mode 100644 index 0000000..35ff26a --- /dev/null +++ b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/WrappedValueTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.util.cache; + +import org.junit.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.*; + +public final class WrappedValueTest { + private static final CurrentTime systemTime = new SystemTime(); + private static final int numThreads = 10; + + @Test + public void testLoadAndGet() throws ExecutionException { + final Integer value = 5; + final WrappedValue<Integer> wrappedValue = new WrappedValue<>(new ImmediateInteger(value), systemTime); + + assertFalse(wrappedValue.getValue().isPresent()); + + final Integer loadedValue = wrappedValue.loadAndGet(); + assertEquals(value, loadedValue); + assertEquals(value, wrappedValue.getValue().get()); + assertTrue(value == loadedValue); + } + + @Test + public void testWaitAndGetOnPreviouslyLoadedValue() throws ExecutionException { + final Integer value = 5; + final WrappedValue<Integer> wrappedValue = new WrappedValue<>(new ImmediateInteger(value), systemTime); + final Integer loadedValue = wrappedValue.loadAndGet(); + final Integer waitedValue = wrappedValue.waitAndGet(); + + assertEquals(value, waitedValue); + assertTrue(value == waitedValue); + + assertEquals(loadedValue, waitedValue); + assertTrue(loadedValue == waitedValue); + } + + @Test + public void testConcurrentLoadWaitAndGet() throws ExecutionException, InterruptedException { + final Integer value = 5; + final long sleepMillis = 2000; + final ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + + final WrappedValue<Integer> wrappedValue = new WrappedValue<>( + new SleepingInteger(value, sleepMillis), systemTime); + final Integer loadedValue = wrappedValue.loadAndGet(); + + final Future<?>[] futures = new Future<?>[numThreads]; + for (int i = 0; i < numThreads; i++) { + futures[i] = executorService.submit(new Runnable() { + @Override + public void run() { + final Integer valueAfterWait = wrappedValue.waitAndGet(); + assertEquals(value, valueAfterWait); + assertTrue(value == valueAfterWait); + } + }); + } + for (int i = 0; i < numThreads; i++) { + futures[i].get(); + } + + assertEquals(value, loadedValue); + assertTrue(value == wrappedValue.getValue().get()); + assertTrue(value == loadedValue); + } +}
