This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch 1.10
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.10 by this push:
     new daa6985  Add a Bulk import to randomwalk MultiTable (#1911)
daa6985 is described below

commit daa6985409618e6242cbaf693a908d94abd55d9e
Author: Mike Miller <mmil...@apache.org>
AuthorDate: Mon Feb 8 12:24:15 2021 -0500

    Add a Bulk import to randomwalk MultiTable (#1911)
    
    * Create new BulkImport test in MultiTable for more realistic case
    * Other improvements to Randomwalk MultiTable
    * Modify randomwalk.conf.example to work with Uno by default
---
 .../test/randomwalk/multitable/BulkImport.java     | 115 +++++++++++++++++++++
 .../randomwalk/multitable/MultiTableFixture.java   |   3 +
 .../test/randomwalk/multitable/OfflineTable.java   |  10 +-
 test/system/randomwalk/conf/modules/MultiTable.xml |   8 +-
 .../system/randomwalk/conf/randomwalk.conf.example |   2 +-
 5 files changed, 131 insertions(+), 7 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/BulkImport.java
 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/BulkImport.java
new file mode 100644
index 0000000..1dc9316
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/BulkImport.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.randomwalk.multitable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.client.IteratorSetting.Column;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.test.randomwalk.Environment;
+import org.apache.accumulo.test.randomwalk.State;
+import org.apache.accumulo.test.randomwalk.Test;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+public class BulkImport extends Test {
+
+  public static final int LOTS = 100000;
+  public static final int COLS = 10;
+  public static final List<Column> COLNAMES = new ArrayList<>();
+  public static final Text CHECK_COLUMN_FAMILY = new Text("cf");
+  static {
+    for (int i = 0; i < COLS; i++) {
+      COLNAMES.add(new Column(CHECK_COLUMN_FAMILY, new 
Text(String.format("%03d", i))));
+    }
+  }
+  public static final Text MARKER_CF = new Text("marker");
+  static final AtomicLong counter = new AtomicLong();
+
+  private static final Value ONE = new Value("1".getBytes());
+
+  public void visit(final State state, final Environment env, Properties 
props) throws Exception {
+    @SuppressWarnings("unchecked")
+    List<String> tables = (List<String>) state.get("tableList");
+
+    if (tables.isEmpty()) {
+      log.debug("No tables to ingest into");
+      return;
+    }
+
+    Random rand = new Random();
+    String tableName = tables.get(rand.nextInt(tables.size()));
+
+    String uuid = UUID.randomUUID().toString();
+    final Path dir = new Path("/tmp", "bulk_" + uuid);
+    final Path fail = new Path(dir.toString() + "_fail");
+    final DefaultConfiguration defaultConfiguration =
+        AccumuloConfiguration.getDefaultConfiguration();
+    final FileSystem fs = (FileSystem) state.get("fs");
+    fs.mkdirs(fail);
+    final int parts = rand.nextInt(10) + 1;
+
+    TreeSet<String> rows = new TreeSet<>();
+    for (int i = 0; i < LOTS; i++)
+      rows.add(uuid + String.format("__%05d", i));
+
+    String markerColumnQualifier = String.format("%07d", 
counter.incrementAndGet());
+    log.debug("Preparing bulk import to " + tableName + " start: " + 
rows.first() + " last: "
+        + rows.last() + " marker: " + markerColumnQualifier);
+
+    for (int i = 0; i < parts; i++) {
+      String fileName = dir + "/" + String.format("part_%d.", i) + 
RFile.EXTENSION;
+      FileSKVWriter f = FileOperations.getInstance().newWriterBuilder()
+          .forFile(fileName, fs, 
fs.getConf()).withTableConfiguration(defaultConfiguration).build();
+      f.startDefaultLocalityGroup();
+      for (String r : rows) {
+        Text row = new Text(r);
+        for (Column col : COLNAMES) {
+          f.append(new Key(row, col.getColumnFamily(), 
col.getColumnQualifier()), ONE);
+        }
+        f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), 
ONE);
+      }
+      f.close();
+    }
+    env.getConnector().tableOperations().importDirectory(tableName, 
dir.toString(), fail.toString(),
+        true);
+    fs.delete(dir, true);
+    FileStatus[] failures = fs.listStatus(fail);
+    if (failures != null && failures.length > 0) {
+      state.set("bulkImportSuccess", "false");
+      throw new Exception(failures.length + " failure files found importing 
files from " + dir);
+    }
+    fs.delete(fail, true);
+    log.debug("Finished bulk import to " + tableName + " start: " + 
rows.first() + " last: "
+        + rows.last() + " marker " + markerColumnQualifier);
+  }
+
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/MultiTableFixture.java
 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/MultiTableFixture.java
index 3e0ee87..b04c69e 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/MultiTableFixture.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/MultiTableFixture.java
@@ -24,9 +24,11 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.test.randomwalk.Environment;
 import org.apache.accumulo.test.randomwalk.Fixture;
 import org.apache.accumulo.test.randomwalk.State;
+import org.apache.hadoop.fs.FileSystem;
 
 public class MultiTableFixture extends Fixture {
 
@@ -41,6 +43,7 @@ public class MultiTableFixture extends Fixture {
     state.set("numWrites", Long.valueOf(0));
     state.set("totalWrites", Long.valueOf(0));
     state.set("tableList", new CopyOnWriteArrayList<String>());
+    state.set("fs", FileSystem.get(CachedConfiguration.getInstance()));
   }
 
   @Override
diff --git 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/OfflineTable.java
 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/OfflineTable.java
index c9e0629..b7df3de 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/OfflineTable.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/OfflineTable.java
@@ -38,10 +38,12 @@ public class OfflineTable extends Test {
 
     Random rand = new Random();
     String tableName = tables.get(rand.nextInt(tables.size()));
+    boolean wait = rand.nextBoolean();
 
-    env.getConnector().tableOperations().offline(tableName, 
rand.nextBoolean());
-    log.debug("Table " + tableName + " offline ");
-    env.getConnector().tableOperations().online(tableName, rand.nextBoolean());
-    log.debug("Table " + tableName + " online ");
+    log.debug("Calling Table " + tableName + " offline with wait = " + wait);
+    env.getConnector().tableOperations().offline(tableName, wait);
+    wait = rand.nextBoolean();
+    log.debug("Calling Table " + tableName + " online with wait = " + wait);
+    env.getConnector().tableOperations().online(tableName, wait);
   }
 }
diff --git a/test/system/randomwalk/conf/modules/MultiTable.xml 
b/test/system/randomwalk/conf/modules/MultiTable.xml
index 55f6590..8b3de85 100644
--- a/test/system/randomwalk/conf/modules/MultiTable.xml
+++ b/test/system/randomwalk/conf/modules/MultiTable.xml
@@ -29,11 +29,11 @@
 
 <node id="dummy.ToAll">
   <edge id="mt.CreateTable" weight="20"/>
-  <edge id="mt.Write" weight="100"/>
+  <edge id="mt.Write" weight="10"/>
   <edge id="mt.CopyTable" weight="5"/>
+  <edge id="mt.BulkImport" weight="100"/>
   <edge id="mt.OfflineTable" weight="10"/>
   <edge id="mt.DropTable" weight="3"/>
-  <edge id="END" weight="1"/>
 </node>
 
 <node id="mt.Write">
@@ -57,4 +57,8 @@
   <edge id="dummy.ToAll" weight="1"/>
 </node>
 
+<node id="mt.BulkImport">
+  <edge id="dummy.ToAll" weight="1"/>
+</node>
+
 </module>
diff --git a/test/system/randomwalk/conf/randomwalk.conf.example 
b/test/system/randomwalk/conf/randomwalk.conf.example
index 8626685..32e3382 100644
--- a/test/system/randomwalk/conf/randomwalk.conf.example
+++ b/test/system/randomwalk/conf/randomwalk.conf.example
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 # Basic
-INSTANCE=instance
+INSTANCE=uno
 ZOOKEEPERS=localhost
 USERNAME=root
 PASSWORD=secret

Reply via email to