This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch 1.10 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.10 by this push: new daa6985 Add a Bulk import to randomwalk MultiTable (#1911) daa6985 is described below commit daa6985409618e6242cbaf693a908d94abd55d9e Author: Mike Miller <mmil...@apache.org> AuthorDate: Mon Feb 8 12:24:15 2021 -0500 Add a Bulk import to randomwalk MultiTable (#1911) * Create new BulkImport test in MultiTable for more realistic case * Other improvements to Randomwalk MultiTable * Modify randomwalk.conf.example to work with Uno by default --- .../test/randomwalk/multitable/BulkImport.java | 115 +++++++++++++++++++++ .../randomwalk/multitable/MultiTableFixture.java | 3 + .../test/randomwalk/multitable/OfflineTable.java | 10 +- test/system/randomwalk/conf/modules/MultiTable.xml | 8 +- .../system/randomwalk/conf/randomwalk.conf.example | 2 +- 5 files changed, 131 insertions(+), 7 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/BulkImport.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/BulkImport.java new file mode 100644 index 0000000..1dc9316 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/BulkImport.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.randomwalk.multitable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.client.IteratorSetting.Column; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.test.randomwalk.Environment; +import org.apache.accumulo.test.randomwalk.State; +import org.apache.accumulo.test.randomwalk.Test; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +public class BulkImport extends Test { + + public static final int LOTS = 100000; + public static final int COLS = 10; + public static final List<Column> COLNAMES = new ArrayList<>(); + public static final Text CHECK_COLUMN_FAMILY = new Text("cf"); + static { + for (int i = 0; i < COLS; i++) { + COLNAMES.add(new Column(CHECK_COLUMN_FAMILY, new Text(String.format("%03d", i)))); + } + } + public static final Text MARKER_CF = new Text("marker"); + static final AtomicLong counter = new AtomicLong(); + + private static final Value ONE = new Value("1".getBytes()); + + public void visit(final State state, final Environment env, Properties props) throws Exception { + @SuppressWarnings("unchecked") + List<String> tables = (List<String>) state.get("tableList"); + + if (tables.isEmpty()) { + log.debug("No tables to ingest into"); + return; + } + + Random rand = new Random(); + String tableName = tables.get(rand.nextInt(tables.size())); + + String uuid = UUID.randomUUID().toString(); + final Path dir = new Path("/tmp", "bulk_" + uuid); + final Path fail = new Path(dir.toString() + "_fail"); + final DefaultConfiguration defaultConfiguration = + AccumuloConfiguration.getDefaultConfiguration(); + final FileSystem fs = (FileSystem) state.get("fs"); + fs.mkdirs(fail); + final int parts = rand.nextInt(10) + 1; + + TreeSet<String> rows = new TreeSet<>(); + for (int i = 0; i < LOTS; i++) + rows.add(uuid + String.format("__%05d", i)); + + String markerColumnQualifier = String.format("%07d", counter.incrementAndGet()); + log.debug("Preparing bulk import to " + tableName + " start: " + rows.first() + " last: " + + rows.last() + " marker: " + markerColumnQualifier); + + for (int i = 0; i < parts; i++) { + String fileName = dir + "/" + String.format("part_%d.", i) + RFile.EXTENSION; + FileSKVWriter f = FileOperations.getInstance().newWriterBuilder() + .forFile(fileName, fs, fs.getConf()).withTableConfiguration(defaultConfiguration).build(); + f.startDefaultLocalityGroup(); + for (String r : rows) { + Text row = new Text(r); + for (Column col : COLNAMES) { + f.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), ONE); + } + f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE); + } + f.close(); + } + env.getConnector().tableOperations().importDirectory(tableName, dir.toString(), fail.toString(), + true); + fs.delete(dir, true); + FileStatus[] failures = fs.listStatus(fail); + if (failures != null && failures.length > 0) { + state.set("bulkImportSuccess", "false"); + throw new Exception(failures.length + " failure files found importing files from " + dir); + } + fs.delete(fail, true); + log.debug("Finished bulk import to " + tableName + " start: " + rows.first() + " last: " + + rows.last() + " marker " + markerColumnQualifier); + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/MultiTableFixture.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/MultiTableFixture.java index 3e0ee87..b04c69e 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/MultiTableFixture.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/MultiTableFixture.java @@ -24,9 +24,11 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.test.randomwalk.Environment; import org.apache.accumulo.test.randomwalk.Fixture; import org.apache.accumulo.test.randomwalk.State; +import org.apache.hadoop.fs.FileSystem; public class MultiTableFixture extends Fixture { @@ -41,6 +43,7 @@ public class MultiTableFixture extends Fixture { state.set("numWrites", Long.valueOf(0)); state.set("totalWrites", Long.valueOf(0)); state.set("tableList", new CopyOnWriteArrayList<String>()); + state.set("fs", FileSystem.get(CachedConfiguration.getInstance())); } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/OfflineTable.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/OfflineTable.java index c9e0629..b7df3de 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/OfflineTable.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/OfflineTable.java @@ -38,10 +38,12 @@ public class OfflineTable extends Test { Random rand = new Random(); String tableName = tables.get(rand.nextInt(tables.size())); + boolean wait = rand.nextBoolean(); - env.getConnector().tableOperations().offline(tableName, rand.nextBoolean()); - log.debug("Table " + tableName + " offline "); - env.getConnector().tableOperations().online(tableName, rand.nextBoolean()); - log.debug("Table " + tableName + " online "); + log.debug("Calling Table " + tableName + " offline with wait = " + wait); + env.getConnector().tableOperations().offline(tableName, wait); + wait = rand.nextBoolean(); + log.debug("Calling Table " + tableName + " online with wait = " + wait); + env.getConnector().tableOperations().online(tableName, wait); } } diff --git a/test/system/randomwalk/conf/modules/MultiTable.xml b/test/system/randomwalk/conf/modules/MultiTable.xml index 55f6590..8b3de85 100644 --- a/test/system/randomwalk/conf/modules/MultiTable.xml +++ b/test/system/randomwalk/conf/modules/MultiTable.xml @@ -29,11 +29,11 @@ <node id="dummy.ToAll"> <edge id="mt.CreateTable" weight="20"/> - <edge id="mt.Write" weight="100"/> + <edge id="mt.Write" weight="10"/> <edge id="mt.CopyTable" weight="5"/> + <edge id="mt.BulkImport" weight="100"/> <edge id="mt.OfflineTable" weight="10"/> <edge id="mt.DropTable" weight="3"/> - <edge id="END" weight="1"/> </node> <node id="mt.Write"> @@ -57,4 +57,8 @@ <edge id="dummy.ToAll" weight="1"/> </node> +<node id="mt.BulkImport"> + <edge id="dummy.ToAll" weight="1"/> +</node> + </module> diff --git a/test/system/randomwalk/conf/randomwalk.conf.example b/test/system/randomwalk/conf/randomwalk.conf.example index 8626685..32e3382 100644 --- a/test/system/randomwalk/conf/randomwalk.conf.example +++ b/test/system/randomwalk/conf/randomwalk.conf.example @@ -14,7 +14,7 @@ # limitations under the License. # Basic -INSTANCE=instance +INSTANCE=uno ZOOKEEPERS=localhost USERNAME=root PASSWORD=secret