This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 8ef963c Improved IT and added sanity check 8ef963c is described below commit 8ef963cbd923dbd22c21329a68601a27d237d1a7 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Apr 13 10:37:20 2021 -0400 Improved IT and added sanity check --- .../accumulo/tserver/ThriftClientHandler.java | 7 +- .../tserver/compactions/CompactionManager.java | 14 +-- .../apache/accumulo/test/ExternalCompactionIT.java | 120 +++++++++++++++------ 3 files changed, 100 insertions(+), 41 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java index abb9968..2ac0d65 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java @@ -1703,8 +1703,8 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe } server.getCompactionManager().commitExternalCompaction( - ExternalCompactionId.of(externalCompactionId), extent, server.getOnlineTablets(), fileSize, - entries); + ExternalCompactionId.of(externalCompactionId), KeyExtent.fromThrift(extent), + server.getOnlineTablets(), fileSize, entries); } @Override @@ -1716,7 +1716,8 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe } server.getCompactionManager().externalCompactionFailed( - ExternalCompactionId.of(externalCompactionId), extent, server.getOnlineTablets()); + ExternalCompactionId.of(externalCompactionId), KeyExtent.fromThrift(extent), + server.getOnlineTablets()); } @Override diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java index 44f0ccf..8228d12 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java @@ -33,7 +33,6 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; import org.apache.accumulo.core.spi.compaction.CompactionKind; @@ -50,6 +49,7 @@ import org.apache.accumulo.tserver.tablet.Tablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.Sets; public class CompactionManager { @@ -447,12 +447,12 @@ public class CompactionManager { } public void commitExternalCompaction(ExternalCompactionId extCompactionId, - TKeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets, long fileSize, + KeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets, long fileSize, long entries) { KeyExtent extent = runningExternalCompactions.get(extCompactionId); - // CBUG Use extentCompacted to perform additional validation that the extent has not - // merged, split, or otherwise changed. if (extent != null) { + Preconditions.checkState(extent.equals(extentCompacted), + "Unexpected extent seen on compaction commit %s %s", extent, extentCompacted); Tablet tablet = currentTablets.get(extent); if (tablet != null) { tablet.asCompactable().commitExternalCompaction(extCompactionId, fileSize, entries); @@ -469,12 +469,12 @@ public class CompactionManager { return (null != extent && extent.compareTo(ke) == 0); } - public void externalCompactionFailed(ExternalCompactionId ecid, TKeyExtent extentCompacted, + public void externalCompactionFailed(ExternalCompactionId ecid, KeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets) { - // CBUG Use extentCompacted to perform additional validation that the extent has not - // merged, split, or otherwise changed. KeyExtent extent = runningExternalCompactions.get(ecid); if (extent != null) { + Preconditions.checkState(extent.equals(extentCompacted), + "Unexpected extent seen on compaction commit %s %s", extent, extentCompacted); Tablet tablet = currentTablets.get(extent); if (tablet != null) { tablet.asCompactable().externalCompactionFailed(ecid); diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java index 1b47f58..be208a1 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java @@ -28,9 +28,13 @@ import org.apache.accumulo.compactor.Compactor; import org.apache.accumulo.coordinator.CompactionCoordinator; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.data.Key; @@ -38,6 +42,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; @@ -57,10 +62,16 @@ public class ExternalCompactionIT extends ConfigurableMacBase { DefaultCompactionPlanner.class.getName()); cfg.setProperty("tserver.compaction.major.service.cs1.planner.opts.executors", "[{'name':'all','externalQueue':'DCQ1'}]"); + cfg.setProperty("tserver.compaction.major.service.cs2.planner", + DefaultCompactionPlanner.class.getName()); + cfg.setProperty("tserver.compaction.major.service.cs2.planner.opts.executors", + "[{'name':'all','externalQueue':'DCQ2'}]"); } public static class TestFilter extends Filter { + int modulus = 1; + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { @@ -71,11 +82,22 @@ public class ExternalCompactionIT extends ConfigurableMacBase { Preconditions.checkArgument(!cienv.getQueueName().isEmpty()); Preconditions .checkArgument(options.getOrDefault("expectedQ", "").equals(cienv.getQueueName())); + + Preconditions.checkArgument(cienv.isFullMajorCompaction()); + Preconditions.checkArgument(cienv.isUserCompaction()); + Preconditions.checkArgument(cienv.getIteratorScope() == IteratorScope.majc); + Preconditions.checkArgument(!cienv.isSamplingEnabled()); + + // if the init function is never called at all, then not setting the modulus option should + // cause the test to fail + if (options.containsKey("modulus")) { + modulus = Integer.parseInt(options.get("modulus")); + } } @Override public boolean accept(Key k, Value v) { - return Integer.parseInt(v.toString()) % 2 == 0; + return Integer.parseInt(v.toString()) % modulus == 0; } } @@ -83,44 +105,80 @@ public class ExternalCompactionIT extends ConfigurableMacBase { @Test public void testExternalCompaction() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - Map<String,String> props = - Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(), - "table.compaction.dispatcher.opts.service", "cs1"); - NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); - - String tableName = "ectt"; - client.tableOperations().create(tableName, ntc); + String table1 = "ectt1"; + createTable(client, table1, "cs1"); - try (BatchWriter bw = client.createBatchWriter(tableName)) { - for (int i = 0; i < 10; i++) { - Mutation m = new Mutation("r:" + i); - m.put("", "", "" + i); - bw.addMutation(m); - } - } + String table2 = "ectt2"; + createTable(client, table2, "cs2"); - client.tableOperations().flush(tableName); + wrtieData(client, table1); + wrtieData(client, table2); cluster.exec(Compactor.class, "-q", "DCQ1"); + cluster.exec(Compactor.class, "-q", "DCQ2"); cluster.exec(CompactionCoordinator.class); - IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class); - // make sure iterator options make it to compactor process - iterSetting.addOption("expectedQ", "DCQ1"); - CompactionConfig config = - new CompactionConfig().setIterators(List.of(iterSetting)).setWait(true); - client.tableOperations().compact(tableName, config); - - try (Scanner scanner = client.createScanner(tableName)) { - int count = 0; - for (Entry<Key,Value> entry : scanner) { - Assert.assertTrue(Integer.parseInt(entry.getValue().toString()) % 2 == 0); - count++; - } - - Assert.assertEquals(5, count); + compact(client, table1, 2, "DCQ1"); + verify(client, table1, 2); + + compact(client, table2, 3, "DCQ2"); + verify(client, table2, 3); + + } + } + + private void verify(AccumuloClient client, String table1, int modulus) + throws TableNotFoundException, AccumuloSecurityException, AccumuloException { + try (Scanner scanner = client.createScanner(table1)) { + int count = 0; + for (Entry<Key,Value> entry : scanner) { + Assert.assertTrue(Integer.parseInt(entry.getValue().toString()) % modulus == 0); + count++; + } + + int expectedCount = 0; + for (int i = 0; i < 10; i++) { + if (i % modulus == 0) + expectedCount++; } + + Assert.assertEquals(expectedCount, count); } } + + private void compact(AccumuloClient client, String table1, int modulus, String expectedQueue) + throws AccumuloSecurityException, TableNotFoundException, AccumuloException { + IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class); + // make sure iterator options make it to compactor process + iterSetting.addOption("expectedQ", expectedQueue); + iterSetting.addOption("modulus", modulus + ""); + CompactionConfig config = + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(true); + client.tableOperations().compact(table1, config); + } + + private void createTable(AccumuloClient client, String tableName, String service) + throws Exception { + Map<String,String> props = + Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(), + "table.compaction.dispatcher.opts.service", service); + NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); + + client.tableOperations().create(tableName, ntc); + + } + + private void wrtieData(AccumuloClient client, String table1) throws MutationsRejectedException, + TableNotFoundException, AccumuloException, AccumuloSecurityException { + try (BatchWriter bw = client.createBatchWriter(table1)) { + for (int i = 0; i < 10; i++) { + Mutation m = new Mutation("r:" + i); + m.put("", "", "" + i); + bw.addMutation(m); + } + } + + client.tableOperations().flush(table1); + } }