Repository: geode Updated Branches: refs/heads/develop 4a5c56eb8 -> ca2f20b86
GEODE-3448: Implement and expose parallel snapshot import This closes #721 Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/ca2f20b8 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/ca2f20b8 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/ca2f20b8 Branch: refs/heads/develop Commit: ca2f20b86cb574a9fbc67146efd40c8246d8f685 Parents: 4a5c56e Author: Nick Reich <nre...@pivotal.io> Authored: Thu Aug 17 16:29:45 2017 -0700 Committer: Darrel Schneider <dschnei...@pivotal.io> Committed: Mon Aug 28 15:21:11 2017 -0700 ---------------------------------------------------------------------- .../geode/cache/snapshot/SnapshotIterator.java | 2 +- .../snapshot/CacheSnapshotServiceImpl.java | 2 +- .../snapshot/ParallelSnapshotFileMapper.java | 3 +- .../snapshot/RegionSnapshotServiceImpl.java | 50 ++++----- .../cache/snapshot/CacheSnapshotJUnitTest.java | 22 ++-- .../snapshot/ParallelSnapshotDUnitTest.java | 98 +++++++++++------ .../cache/snapshot/RegionSnapshotJUnitTest.java | 109 ++++++++----------- .../geode/cache/snapshot/SnapshotTestCase.java | 56 ++++------ .../cache/snapshot/TestSnapshotFileMapper.java | 14 +-- .../cache/snapshot/WanSnapshotJUnitTest.java | 17 +-- .../ParallelSnapshotFileMapperTest.java | 7 +- 11 files changed, 185 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java b/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java index 767e63d..7022531 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java +++ b/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java @@ -29,7 +29,7 @@ import java.util.Map.Entry; * * @since GemFire 7.0 */ -public interface SnapshotIterator<K, V> { +public interface SnapshotIterator<K, V> extends AutoCloseable { /** * Returns true if there are more elements in the iteration. * http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java index c35b413..b3c920c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java @@ -42,7 +42,7 @@ public class CacheSnapshotServiceImpl implements CacheSnapshotService { @Override public SnapshotOptions<Object, Object> createOptions() { - return new SnapshotOptionsImpl<Object, Object>(); + return new SnapshotOptionsImpl<>(); } @Override http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java index 8e7a4c2..e86dcc2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java @@ -38,8 +38,7 @@ public class ParallelSnapshotFileMapper implements SnapshotFileMapper { @Override public File[] mapImportPath(DistributedMember member, File snapshot) { - // parallel import is not yet supported - throw new UnsupportedOperationException(); + return new File[] {snapshot}; } private String getBaseName(File snapshot) { http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java index c5e67ac..fb21594 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java @@ -46,7 +46,6 @@ import org.apache.geode.internal.cache.snapshot.SnapshotPacket.SnapshotRecord; import org.apache.geode.internal.i18n.LocalizedStrings; import java.io.File; -import java.io.FileFilter; import java.io.IOException; import java.io.InterruptedIOException; import java.io.Serializable; @@ -60,6 +59,8 @@ import java.util.concurrent.Future; import static org.apache.geode.distributed.internal.InternalDistributedSystem.getLoggerI18n; +import org.apache.logging.log4j.LogManager; + /** * Provides an implementation for region snapshots. * @@ -90,12 +91,7 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K, return new File[] {snapshot}; } - return snapshot.listFiles(new FileFilter() { - @Override - public boolean accept(File pathname) { - return !pathname.isDirectory(); - } - }); + return snapshot.listFiles(pathname -> !pathname.isDirectory()); } }; @@ -141,7 +137,7 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K, @Override public SnapshotOptions<K, V> createOptions() { - return new SnapshotOptionsImpl<K, V>(); + return new SnapshotOptionsImpl<>(); } @Override @@ -158,7 +154,7 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K, } if (shouldRunInParallel(options)) { - snapshotInParallel(new ParallelArgs<K, V>(snapshot, format, options), + snapshotInParallel(new ParallelArgs<>(snapshot, format, options), new ParallelExportFunction<K, V>()); } else { exportOnMember(snapshot, format, options); @@ -176,9 +172,8 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K, throws IOException, ClassNotFoundException { if (shouldRunInParallel(options)) { - snapshotInParallel(new ParallelArgs<K, V>(snapshot, format, options), - new ParallelImportFunction<K, V>()); - return; + snapshotInParallel(new ParallelArgs<>(snapshot, format, options), + new ParallelImportFunction<>()); } else { importOnMember(snapshot, format, options); @@ -241,12 +236,12 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K, // Would be interesting to use a PriorityQueue ordered on isDone() // but this is probably close enough in practice. - LinkedList<Future<?>> puts = new LinkedList<Future<?>>(); + LinkedList<Future<?>> puts = new LinkedList<>(); GFSnapshotImporter in = new GFSnapshotImporter(snapshot); try { int bufferSize = 0; - Map<K, V> buffer = new HashMap<K, V>(); + Map<K, V> buffer = new HashMap<>(); SnapshotRecord record; while ((record = in.readSnapshotRecord()) != null) { @@ -286,14 +281,10 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K, puts.removeFirst().get(); } - final Map<K, V> copy = new HashMap<K, V>(buffer); + final Map<K, V> copy = new HashMap<>(buffer); Future<?> f = GemFireCacheImpl.getExisting("Importing region from snapshot") - .getDistributionManager().getWaitingThreadPool().submit(new Runnable() { - @Override - public void run() { - local.basicImportPutAll(copy, !options.shouldInvokeCallbacks()); - } - }); + .getDistributionManager().getWaitingThreadPool().submit((Runnable) () -> local + .basicImportPutAll(copy, !options.shouldInvokeCallbacks())); puts.addLast(f); buffer.clear(); @@ -400,12 +391,12 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K, static <K, V> Exporter<K, V> createExporter(Region<?, ?> region, SnapshotOptions<K, V> options) { String pool = region.getAttributes().getPoolName(); if (pool != null) { - return new ClientExporter<K, V>(PoolManager.find(pool)); + return new ClientExporter<>(PoolManager.find(pool)); } else if (InternalDistributedSystem.getAnyInstance().isLoner() || region.getAttributes().getDataPolicy().equals(DataPolicy.NORMAL) || region.getAttributes().getDataPolicy().equals(DataPolicy.PRELOADED) - || region instanceof LocalDataSet || (((SnapshotOptionsImpl<K, V>) options).isParallelMode() + || region instanceof LocalDataSet || (options.isParallelMode() && region.getAttributes().getDataPolicy().withPartitioning())) { // Avoid function execution: @@ -413,10 +404,10 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K, // for NORMAL/PRELOAD since they don't support fn execution // for LocalDataSet since we're already running a fn // for parallel ops since we're already running a fn - return new LocalExporter<K, V>(); + return new LocalExporter<>(); } - return new WindowedExporter<K, V>(); + return new WindowedExporter<>(); } static LocalRegion getLocalRegion(Region<?, ?> region) { @@ -566,11 +557,12 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K, if (files != null) { for (File f : files) { - if (f.isDirectory() || !f.exists()) { - throw new IOException( - LocalizedStrings.Snapshot_INVALID_IMPORT_FILE.toLocalizedString(f)); + if (f.exists()) { + local.getSnapshotService().load(f, args.getFormat(), args.getOptions()); + } else { + LogManager.getLogger(RegionSnapshotServiceImpl.class) + .info("Nothing to import as location does not exist: " + f.getAbsolutePath()); } - local.getSnapshotService().load(f, args.getFormat(), args.getOptions()); } } context.getResultSender().lastResult(Boolean.TRUE); http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java index e310042..8999cda 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java @@ -35,14 +35,14 @@ public class CacheSnapshotJUnitTest extends SnapshotTestCase { public void testExportAndImport() throws Exception { for (final RegionType rt : RegionType.values()) { for (final SerializationType st : SerializationType.values()) { - Region<Integer, MyObject> region = - rgen.createRegion(cache, ds.getName(), rt, "test-" + rt.name() + "-" + st.name()); + Region<Integer, MyObject> region = regionGenerator.createRegion(cache, diskStore.getName(), + rt, "test-" + rt.name() + "-" + st.name()); region.putAll(createExpected(st)); } } // save all regions - cache.getSnapshotService().save(snaps, SnapshotFormat.GEMFIRE); + cache.getSnapshotService().save(getSnapshotDirectory(), SnapshotFormat.GEMFIRE); for (final RegionType rt : RegionType.values()) { for (final SerializationType st : SerializationType.values()) { @@ -51,12 +51,12 @@ public class CacheSnapshotJUnitTest extends SnapshotTestCase { Region<Integer, MyObject> region = cache.getRegion(name); region.destroyRegion(); - rgen.createRegion(cache, ds.getName(), rt, name); + regionGenerator.createRegion(cache, diskStore.getName(), rt, name); } } // load all regions - cache.getSnapshotService().load(snaps, SnapshotFormat.GEMFIRE); + cache.getSnapshotService().load(getSnapshotDirectory(), SnapshotFormat.GEMFIRE); for (final RegionType rt : RegionType.values()) { for (final SerializationType st : SerializationType.values()) { @@ -73,8 +73,8 @@ public class CacheSnapshotJUnitTest extends SnapshotTestCase { public void testFilter() throws Exception { for (final RegionType rt : RegionType.values()) { for (final SerializationType st : SerializationType.values()) { - Region<Integer, MyObject> region = - rgen.createRegion(cache, ds.getName(), rt, "test-" + rt.name() + "-" + st.name()); + Region<Integer, MyObject> region = regionGenerator.createRegion(cache, diskStore.getName(), + rt, "test-" + rt.name() + "-" + st.name()); region.putAll(createExpected(st)); } } @@ -88,18 +88,20 @@ public class CacheSnapshotJUnitTest extends SnapshotTestCase { // save even entries CacheSnapshotService css = cache.getSnapshotService(); SnapshotOptions<Object, Object> options = css.createOptions().setFilter(even); - cache.getSnapshotService().save(snaps, SnapshotFormat.GEMFIRE, options); + cache.getSnapshotService().save(getSnapshotDirectory(), SnapshotFormat.GEMFIRE, options); for (final RegionType rt : RegionType.values()) { for (final SerializationType st : SerializationType.values()) { Region region = cache.getRegion("test-" + rt.name() + "-" + st.name()); region.destroyRegion(); - rgen.createRegion(cache, ds.getName(), rt, "test-" + rt.name() + "-" + st.name()); + regionGenerator.createRegion(cache, diskStore.getName(), rt, + "test-" + rt.name() + "-" + st.name()); } } // load odd entries - File[] snapshots = snaps.listFiles(pathname -> pathname.getName().startsWith("snapshot-")); + File[] snapshots = + getSnapshotDirectory().listFiles(pathname -> pathname.getName().startsWith("snapshot-")); options = css.createOptions().setFilter(odd); css.load(snapshots, SnapshotFormat.GEMFIRE, options); http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java index 541a603..5b59674 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java @@ -17,6 +17,8 @@ package org.apache.geode.cache.snapshot; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import org.junit.Before; +import org.junit.Rule; import org.junit.experimental.categories.Category; import org.junit.Test; @@ -28,6 +30,7 @@ import java.io.IOException; import java.util.Arrays; import com.examples.snapshot.MyPdxSerializer; + import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.Region; @@ -36,43 +39,79 @@ import org.apache.geode.cache.snapshot.SnapshotOptions.SnapshotFormat; import org.apache.geode.internal.cache.snapshot.SnapshotOptionsImpl; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.SerializableCallable; +import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; @Category(DistributedTest.class) public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase { private static final byte[] ffff = new byte[] {0xf, 0xf, 0xf, 0xf}; private static final byte[] eeee = new byte[] {0xe, 0xe, 0xe, 0xe}; + private static final int DATA_POINTS = 100; + + private File directory; + + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); + + @Before + public void setup() throws IOException { + directory = temporaryFolder.newFolder(); + } @Test public void testExportImport() throws Exception { + loadCache(); doExport(false); doImport(false); } @Test public void testExportWithSequentialImport() throws Exception { + loadCache(); doExport(false); doSequentialImport(); } @Test public void testExportImportErrors() throws Exception { + loadCache(); try { doExport(true); - fail(); + fail("Expected exception not thrown"); } catch (Exception e) { + // do nothing on expected exception from test } doExport(false); try { doImport(true); - fail(); + fail("Expected exception not thrown"); } catch (Exception e) { + // do nothing on expected exception from test } } + /** + * This test ensures that parallel import succeeds even when each node does not have a file to + * import (import cluster larger than export one) + * + * @throws Exception + */ + @Test + public void testImportOnLargerCluster() throws Exception { + loadCache(2); + doExport(false, 2); + getCache().getRegion("test").destroyRegion(); + loadCache(); + doImport(false); + } + private void doExport(boolean explode) throws Exception { + doExport(explode, Host.getHost(0).getVMCount()); + } + + private void doExport(boolean explode, int nodes) throws Exception { Region region = getCache().getRegion("test"); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < DATA_POINTS; i++) { region.put(i, ffff); } @@ -85,7 +124,7 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase { opt.setParallelMode(true); opt.setMapper(mapper); - File f = new File("mysnap.gfd").getAbsoluteFile(); + File f = new File(directory, "mysnap.gfd").getAbsoluteFile(); rss.save(f, SnapshotFormat.GEMFIRE, opt); mapper.setShouldExplode(false); @@ -100,7 +139,7 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase { } }; - forEachVm(check, true); + forEachVm(check, true, nodes); } private void doImport(boolean explode) throws ClassNotFoundException, IOException { @@ -114,14 +153,12 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase { opt.setParallelMode(true); opt.setMapper(mapper); - final File f = new File("mysnap.gfd").getAbsoluteFile(); - - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < DATA_POINTS; i++) { region.put(i, eeee); } - rss.load(f, SnapshotFormat.GEMFIRE, opt); - for (int i = 0; i < 1000; i++) { + rss.load(directory, SnapshotFormat.GEMFIRE, opt); + for (int i = 0; i < DATA_POINTS; i++) { assertTrue(Arrays.equals(ffff, (byte[]) region.get(i))); } } @@ -132,48 +169,43 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase { SnapshotOptionsImpl opt = (SnapshotOptionsImpl) rss.createOptions(); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < DATA_POINTS; i++) { region.put(i, eeee); } - - final File file = new File("").getAbsoluteFile(); - rss.load(file, SnapshotFormat.GEMFIRE, opt); - for (int i = 0; i < 1000; i++) { + int vmCount = Host.getHost(0).getVMCount(); + for (int i = 0; i <= vmCount; i++) { + rss.load(new File(directory, Integer.toString(i)), SnapshotFormat.GEMFIRE, opt); + } + for (int i = 0; i < DATA_POINTS; i++) { assertTrue(Arrays.equals(ffff, (byte[]) region.get(i))); } } - public Object forEachVm(SerializableCallable call, boolean local) throws Exception { + private void forEachVm(SerializableCallable call, boolean local) throws Exception { + this.forEachVm(call, local, Integer.MAX_VALUE); + } + + private void forEachVm(SerializableCallable call, boolean local, int maxNodes) throws Exception { Host host = Host.getHost(0); - int vms = host.getVMCount(); + int vms = Math.min(host.getVMCount(), maxNodes); for (int i = 0; i < vms; ++i) { host.getVM(i).invoke(call); } if (local) { - return call.call(); + call.call(); } - return null; } @Override - public final void postSetUp() throws Exception { - loadCache(); - } - - @Override - public final void postTearDownCacheTestCase() throws Exception { - File[] snaps = new File(".").listFiles((dir, name) -> name.startsWith("mysnap")); + public final void postSetUp() throws Exception {} - if (snaps != null) { - for (File f : snaps) { - f.delete(); - } - } + private void loadCache() throws Exception { + this.loadCache(Integer.MAX_VALUE); } - public void loadCache() throws Exception { + private void loadCache(int maxNodes) throws Exception { SerializableCallable setup = new SerializableCallable() { @Override public Object call() throws Exception { @@ -187,6 +219,6 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase { } }; - forEachVm(setup, true); + forEachVm(setup, true, maxNodes); } } http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java index 706067a..46c491c 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java @@ -40,29 +40,27 @@ import static org.junit.Assert.*; @Category(IntegrationTest.class) public class RegionSnapshotJUnitTest extends SnapshotTestCase { - private File f; + private File snapshotFile; @Test public void testExportAndReadSnapshot() throws Exception { - for (final RegionType rt : RegionType.values()) { + for (final RegionType type : RegionType.values()) { for (final SerializationType st : SerializationType.values()) { - String name = "test-" + rt.name() + "-" + st.name(); - Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name); + String name = "test-" + type.name() + "-" + st.name(); + Region<Integer, MyObject> region = + regionGenerator.createRegion(cache, diskStore.getName(), type, name); final Map<Integer, MyObject> expected = createExpected(st); region.putAll(expected); - region.getSnapshotService().save(f, SnapshotFormat.GEMFIRE); + region.getSnapshotService().save(snapshotFile, SnapshotFormat.GEMFIRE); - final Map<Integer, Object> read = new HashMap<Integer, Object>(); - SnapshotIterator<Integer, Object> iter = SnapshotReader.read(f); - try { + final Map<Integer, Object> read = new HashMap<>(); + try (SnapshotIterator<Integer, Object> iter = SnapshotReader.read(snapshotFile)) { while (iter.hasNext()) { Entry<Integer, Object> entry = iter.next(); read.put(entry.getKey(), entry.getValue()); } - assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), expected, read); - } finally { - iter.close(); + assertEquals("Comparison failure for " + type.name() + "/" + st.name(), expected, read); } } } @@ -73,14 +71,15 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase { for (final RegionType rt : RegionType.values()) { for (final SerializationType st : SerializationType.values()) { String name = "test-" + rt.name() + "-" + st.name(); - Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name); + Region<Integer, MyObject> region = + regionGenerator.createRegion(cache, diskStore.getName(), rt, name); final Map<Integer, MyObject> expected = createExpected(st); region.putAll(expected); - region.getSnapshotService().save(f, SnapshotFormat.GEMFIRE); + region.getSnapshotService().save(snapshotFile, SnapshotFormat.GEMFIRE); region.destroyRegion(); - region = rgen.createRegion(cache, ds.getName(), rt, name); + region = regionGenerator.createRegion(cache, diskStore.getName(), rt, name); region.getAttributesMutator().setCacheWriter(new CacheWriterAdapter<Integer, MyObject>() { @Override @@ -98,7 +97,7 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase { } }); - region.getSnapshotService().load(f, SnapshotFormat.GEMFIRE); + region.getSnapshotService().load(snapshotFile, SnapshotFormat.GEMFIRE); assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), expected.entrySet(), region.entrySet()); @@ -109,37 +108,30 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase { @Test public void testFilter() throws Exception { - SnapshotFilter<Integer, MyObject> even = new SnapshotFilter<Integer, MyObject>() { - @Override - public boolean accept(Entry<Integer, MyObject> entry) { - return entry.getKey() % 2 == 0; - } - }; + SnapshotFilter<Integer, MyObject> even = + (SnapshotFilter<Integer, MyObject>) entry -> entry.getKey() % 2 == 0; - SnapshotFilter<Integer, MyObject> odd = new SnapshotFilter<Integer, MyObject>() { - @Override - public boolean accept(Entry<Integer, MyObject> entry) { - return entry.getKey() % 2 == 1; - } - }; + SnapshotFilter<Integer, MyObject> odd = + (SnapshotFilter<Integer, MyObject>) entry -> entry.getKey() % 2 == 1; for (final RegionType rt : RegionType.values()) { for (final SerializationType st : SerializationType.values()) { String name = "test-" + rt.name() + "-" + st.name(); - Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name); + Region<Integer, MyObject> region = + regionGenerator.createRegion(cache, diskStore.getName(), rt, name); final Map<Integer, MyObject> expected = createExpected(st); region.putAll(expected); RegionSnapshotService<Integer, MyObject> rss = region.getSnapshotService(); SnapshotOptions<Integer, MyObject> options = rss.createOptions().setFilter(even); - rss.save(f, SnapshotFormat.GEMFIRE, options); + rss.save(snapshotFile, SnapshotFormat.GEMFIRE, options); region.destroyRegion(); - region = rgen.createRegion(cache, ds.getName(), rt, name); + region = regionGenerator.createRegion(cache, diskStore.getName(), rt, name); rss = region.getSnapshotService(); options = rss.createOptions().setFilter(odd); - rss.load(f, SnapshotFormat.GEMFIRE, options); + rss.load(snapshotFile, SnapshotFormat.GEMFIRE, options); assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), 0, region.size()); } @@ -148,17 +140,15 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase { @Test public void testFilterExportException() throws Exception { - SnapshotFilter<Integer, MyObject> oops = new SnapshotFilter<Integer, MyObject>() { - @Override - public boolean accept(Entry<Integer, MyObject> entry) { - throw new RuntimeException(); - } + SnapshotFilter<Integer, MyObject> oops = (SnapshotFilter<Integer, MyObject>) entry -> { + throw new RuntimeException(); }; for (final RegionType rt : RegionType.values()) { for (final SerializationType st : SerializationType.values()) { String name = "test-" + rt.name() + "-" + st.name(); - Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name); + Region<Integer, MyObject> region = + regionGenerator.createRegion(cache, diskStore.getName(), rt, name); final Map<Integer, MyObject> expected = createExpected(st); region.putAll(expected); @@ -167,17 +157,17 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase { boolean caughtException = false; try { - rss.save(f, SnapshotFormat.GEMFIRE, options); + rss.save(snapshotFile, SnapshotFormat.GEMFIRE, options); } catch (RuntimeException e) { caughtException = true; } assertTrue(caughtException); region.destroyRegion(); - region = rgen.createRegion(cache, ds.getName(), rt, name); + region = regionGenerator.createRegion(cache, diskStore.getName(), rt, name); rss = region.getSnapshotService(); - rss.load(f, SnapshotFormat.GEMFIRE, options); + rss.load(snapshotFile, SnapshotFormat.GEMFIRE, options); assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), 0, region.size()); } @@ -186,32 +176,30 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase { @Test public void testFilterImportException() throws Exception { - SnapshotFilter<Integer, MyObject> oops = new SnapshotFilter<Integer, MyObject>() { - @Override - public boolean accept(Entry<Integer, MyObject> entry) { - throw new RuntimeException(); - } + SnapshotFilter<Integer, MyObject> oops = (SnapshotFilter<Integer, MyObject>) entry -> { + throw new RuntimeException(); }; for (final RegionType rt : RegionType.values()) { for (final SerializationType st : SerializationType.values()) { String name = "test-" + rt.name() + "-" + st.name(); - Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name); + Region<Integer, MyObject> region = + regionGenerator.createRegion(cache, diskStore.getName(), rt, name); final Map<Integer, MyObject> expected = createExpected(st); region.putAll(expected); RegionSnapshotService<Integer, MyObject> rss = region.getSnapshotService(); - rss.save(f, SnapshotFormat.GEMFIRE); + rss.save(snapshotFile, SnapshotFormat.GEMFIRE); region.destroyRegion(); - region = rgen.createRegion(cache, ds.getName(), rt, name); + region = regionGenerator.createRegion(cache, diskStore.getName(), rt, name); rss = region.getSnapshotService(); SnapshotOptions<Integer, MyObject> options = rss.createOptions().setFilter(oops); boolean caughtException = false; try { - rss.load(f, SnapshotFormat.GEMFIRE, options); + rss.load(snapshotFile, SnapshotFormat.GEMFIRE, options); } catch (RuntimeException e) { caughtException = true; } @@ -225,14 +213,15 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase { @Test public void testInvalidate() throws Exception { Region<Integer, MyObject> region = - rgen.createRegion(cache, ds.getName(), RegionType.REPLICATE, "test"); - MyObject obj = rgen.createData(SerializationType.SERIALIZABLE, 1, "invalidated value"); + regionGenerator.createRegion(cache, diskStore.getName(), RegionType.REPLICATE, "test"); + MyObject obj = + regionGenerator.createData(SerializationType.SERIALIZABLE, 1, "invalidated value"); region.put(1, obj); region.invalidate(1); - region.getSnapshotService().save(f, SnapshotFormat.GEMFIRE); - region.getSnapshotService().load(f, SnapshotFormat.GEMFIRE); + region.getSnapshotService().save(snapshotFile, SnapshotFormat.GEMFIRE); + region.getSnapshotService().load(snapshotFile, SnapshotFormat.GEMFIRE); assertTrue(region.containsKey(1)); assertFalse(region.containsValueForKey(1)); @@ -251,11 +240,12 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase { SerializationType st = SerializationType.PDX_SERIALIZER; String name = "test-" + rt.name() + "-" + st.name() + "-dsid"; - Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name); + Region<Integer, MyObject> region = + regionGenerator.createRegion(cache, diskStore.getName(), rt, name); final Map<Integer, MyObject> expected = createExpected(st); region.putAll(expected); - region.getSnapshotService().save(f, SnapshotFormat.GEMFIRE); + region.getSnapshotService().save(snapshotFile, SnapshotFormat.GEMFIRE); cache.close(); @@ -264,22 +254,19 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase { .setPdxSerializer(new MyPdxSerializer()).set(DISTRIBUTED_SYSTEM_ID, "100"); cache = cf2.create(); - final Map<Integer, Object> read = new HashMap<Integer, Object>(); - SnapshotIterator<Integer, Object> iter = SnapshotReader.read(f); - try { + final Map<Integer, Object> read = new HashMap<>(); + try (SnapshotIterator<Integer, Object> iter = SnapshotReader.read(snapshotFile)) { while (iter.hasNext()) { Entry<Integer, Object> entry = iter.next(); read.put(entry.getKey(), entry.getValue()); } assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), expected, read); - } finally { - iter.close(); } } @Before public void setUp() throws Exception { super.setUp(); - f = new File(snaps, "test.snapshot.gfd"); + snapshotFile = new File(getSnapshotDirectory(), "test.snapshot.gfd"); } } http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java index 3898925..d0040f7 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java +++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java @@ -21,69 +21,55 @@ import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.DiskStore; import org.apache.geode.cache.snapshot.RegionGenerator.SerializationType; + import org.junit.After; import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; import java.io.File; -import java.io.FilenameFilter; import java.util.HashMap; import java.util.Map; -import java.util.Random; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; public class SnapshotTestCase { - protected File store; - protected File snaps; + private File snapshotDirectory; protected Cache cache; - protected RegionGenerator rgen; - protected DiskStore ds; + RegionGenerator regionGenerator; + DiskStore diskStore; + + @Rule + public TemporaryFolder baseDir = new TemporaryFolder(); @Before public void setUp() throws Exception { - store = new File("store-" + Math.abs(new Random().nextInt())); - store.mkdir(); - - snaps = new File("snapshots-" + Math.abs(new Random().nextInt())); - snaps.mkdir(); + File storeDirectory = baseDir.newFolder("store"); + snapshotDirectory = baseDir.newFolder("snapshots"); - rgen = new RegionGenerator(); + regionGenerator = new RegionGenerator(); CacheFactory cf = new CacheFactory().set(MCAST_PORT, "0").set(LOG_LEVEL, "error"); cache = cf.create(); - ds = cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {store}) - .create("snapshotTest"); + diskStore = cache.createDiskStoreFactory().setMaxOplogSize(1) + .setDiskDirs(new File[] {storeDirectory}).create("snapshotTest"); } @After public void tearDown() throws Exception { cache.close(); - deleteFiles(store); - deleteFiles(snaps); } - public Map<Integer, MyObject> createExpected(SerializationType type) { - Map<Integer, MyObject> expected = new HashMap<Integer, MyObject>(); - for (int i = 0; i < 1000; i++) { - expected.put(i, rgen.createData(type, i, "The number is " + i)); - } - return expected; + File getSnapshotDirectory() { + return snapshotDirectory; } - public static void deleteFiles(File dir) { - File[] deletes = dir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return true; - } - }); - - if (deletes != null) { - for (File f : deletes) { - f.delete(); - } + Map<Integer, MyObject> createExpected(SerializationType type) { + Map<Integer, MyObject> expected = new HashMap<>(); + for (int i = 0; i < 1000; i++) { + expected.put(i, regionGenerator.createData(type, i, "The number is " + i)); } - dir.delete(); + return expected; } } http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java index 953721f..ba89630 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java +++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java @@ -32,7 +32,9 @@ public class TestSnapshotFileMapper implements SnapshotFileMapper { if (shouldExplode) { throw new RuntimeException(); } - return new File(snapshot.getParentFile(), mapFilename(snapshot)); + File directory = new File(snapshot.getParent(), Integer.toString(1 + VM.getCurrentVMNum())); + directory.mkdirs(); + return new File(directory, mapFilename(snapshot)); } @Override @@ -40,15 +42,11 @@ public class TestSnapshotFileMapper implements SnapshotFileMapper { if (shouldExplode) { throw new RuntimeException(); } - - File f = new File(snapshot.getParentFile(), mapFilename(snapshot)); - return new File[] {f}; + File directory = new File(snapshot, Integer.toString(1 + VM.getCurrentVMNum())); + return new File[] {directory}; } private String mapFilename(File snapshot) { - String filename = snapshot.getName(); - int suffixLocation = filename.indexOf(RegionSnapshotService.SNAPSHOT_FILE_EXTENSION); - return filename.substring(0, suffixLocation) + "-" + VM.getCurrentVMNum() - + RegionSnapshotService.SNAPSHOT_FILE_EXTENSION; + return snapshot.getName(); } } http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java index 46245c3..b62b7bf 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java @@ -15,12 +15,13 @@ package org.apache.geode.cache.snapshot; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; import java.io.File; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.awaitility.Awaitility; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -37,7 +38,6 @@ import org.apache.geode.test.junit.categories.IntegrationTest; public class WanSnapshotJUnitTest extends SnapshotTestCase { private Region<Integer, MyObject> region; private WanListener wan; - private static final long MAX_WAIT = 5 * 60 * 1000; // 6 minutes @Test public void testWanCallback() throws Exception { @@ -46,18 +46,12 @@ public class WanSnapshotJUnitTest extends SnapshotTestCase { region.put(i, new MyObject(i, "clienttest " + i)); } - File snapshot = new File("wan.snapshot.gfd"); + File snapshot = new File(getSnapshotDirectory(), "wan.snapshot.gfd"); region.getSnapshotService().save(snapshot, SnapshotFormat.GEMFIRE); region.clear(); - long start = System.currentTimeMillis(); - // wait for the events to drain out - while (!wan.ticker.compareAndSet(count, 0)) { - Thread.sleep(100); - if (System.currentTimeMillis() - start > MAX_WAIT) { - fail("Event did not drain in 5 minutes"); - } - } + Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> wan.ticker.compareAndSet(count, 0)); region.getSnapshotService().load(snapshot, SnapshotFormat.GEMFIRE); @@ -92,4 +86,3 @@ public class WanSnapshotJUnitTest extends SnapshotTestCase { } } - http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java index 2f045fc..371fc6e 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java @@ -69,9 +69,10 @@ public class ParallelSnapshotFileMapperTest { } @Test - public void mapImportPathIsUnsupported() throws Exception { - thrown.expect(UnsupportedOperationException.class); - mapper.mapImportPath(null, null); + public void mapImportReturnsUnchangedPath() { + File file = new File(BASE_LOCATION + FILE_TYPE); + File[] mappedFiles = mapper.mapImportPath(null, file); + assertEquals(file, mappedFiles[0]); } @Test