Author: larsh
Date: Wed Feb  8 18:30:37 2012
New Revision: 1242037

URL: http://svn.apache.org/viewvc?rev=1242037&view=rev
Log:
HBASE-5229 Provide basic building blocks for 'multi-row' local transactions.

Added:
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationProtocol.java
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java

Added: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java?rev=1242037&view=auto
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
 (added)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
 Wed Feb  8 18:30:37 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.WrongRegionException;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This class demonstrates how to implement atomic multi row transactions using
+ * {@link HRegion#mutateRowsWithLocks(java.util.Collection, 
java.util.Collection)}
+ * and Coprocessor endpoints.
+ */
+public class MultiRowMutationEndpoint extends BaseEndpointCoprocessor 
implements
+    MultiRowMutationProtocol {
+
+  @Override
+  public void mutateRows(List<Mutation> mutations) throws IOException {
+    // get the coprocessor environment
+    RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) 
getEnvironment();
+
+    // set of rows to lock, sorted to avoid deadlocks
+    SortedSet<byte[]> rowsToLock = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+
+    HRegionInfo regionInfo = env.getRegion().getRegionInfo();
+    for (Mutation m : mutations) {
+      // check whether rows are in range for this region
+      if (!HRegion.rowIsInRange(regionInfo, m.getRow())) {
+        String msg = "Requested row out of range '"
+            + Bytes.toStringBinary(m.getRow()) + "'";
+        if (rowsToLock.isEmpty()) {
+          // if this is the first row, region might have moved,
+          // allow client to retry
+          throw new WrongRegionException(msg);
+        } else {
+          // rows are split between regions, do not retry
+          throw new DoNotRetryIOException(msg);
+        }
+      }
+      rowsToLock.add(m.getRow());
+    }
+    // call utility method on region
+    env.getRegion().mutateRowsWithLocks(mutations, rowsToLock);
+  }
+}

Added: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationProtocol.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationProtocol.java?rev=1242037&view=auto
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationProtocol.java
 (added)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationProtocol.java
 Wed Feb  8 18:30:37 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+/**
+ * Defines a protocol to perform multi row transactions.
+ * See {@link MultiRowMutationEndpoint} for the implementation.
+ * </br>
+ * See
+ * {@link HRegion#mutateRowsWithLocks(java.util.Collection, 
java.util.Collection)}
+ * for details and limitations.
+ * </br>
+ * Example:
+ * <code><pre>
+ * List<Mutation> mutations = ...;
+ * Put p1 = new Put(row1);
+ * Put p2 = new Put(row2);
+ * ...
+ * mutations.add(p1);
+ * mutations.add(p2);
+ * MultiRowMutationProtocol mrOp = t.coprocessorProxy(
+ *   MultiRowMutationProtocol.class, row1);
+ * mrOp.mutateRows(mutations);
+ * </pre></code>
+ */
+public interface MultiRowMutationProtocol extends CoprocessorProtocol {
+  public void mutateRows(List<Mutation> mutations) throws IOException;
+}

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1242037&r1=1242036&r2=1242037&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
Wed Feb  8 18:30:37 2012
@@ -4148,12 +4148,26 @@ public class HRegion implements HeapSize
     return results;
   }
 
-  public void mutateRow(RowMutation rm,
-      Integer lockid) throws IOException {
+  public void mutateRow(RowMutation rm) throws IOException {
+    mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
+  }
+
+  /**
+   * Perform atomic mutations within the region.
+   * @param mutations The list of mutations to perform.
+   * <code>mutations</code> can contain operations for multiple rows.
+   * Caller has to ensure that all rows are contained in this region.
+   * @param rowsToLock Rows to lock
+   * If multiple rows are locked care should be taken that
+   * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
+   * @throws IOException
+   */
+  public void mutateRowsWithLocks(Collection<Mutation> mutations,
+      Collection<byte[]> rowsToLock) throws IOException {
     boolean flush = false;
 
     startRegionOperation();
-    Integer lid = null;
+    List<Integer> acquiredLocks = null;
     try {
       // 1. run all pre-hooks before the atomic operation
       // if any pre hook indicates "bypass", bypass the entire operation
@@ -4161,7 +4175,7 @@ public class HRegion implements HeapSize
       // one WALEdit is used for all edits.
       WALEdit walEdit = new WALEdit();
       if (coprocessorHost != null) {
-        for (Mutation m : rm.getMutations()) {
+        for (Mutation m : mutations) {
           if (m instanceof Put) {
             if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
               // by pass everything
@@ -4178,8 +4192,17 @@ public class HRegion implements HeapSize
         }
       }
 
-      // 2. acquire the row lock
-      lid = getLock(lockid, rm.getRow(), true);
+      // 2. acquire the row lock(s)
+      acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
+      for (byte[] row : rowsToLock) {
+        // attempt to lock all involved rows, fail if one lock times out
+        Integer lid = getLock(null, row, true);
+        if (lid == null) {
+          throw new IOException("Failed to acquire lock on "
+              + Bytes.toStringBinary(row));
+        }
+        acquiredLocks.add(lid);
+      }
 
       // 3. acquire the region lock
       this.updatesLock.readLock().lock();
@@ -4191,7 +4214,7 @@ public class HRegion implements HeapSize
       byte[] byteNow = Bytes.toBytes(now);
       try {
         // 5. Check mutations and apply edits to a single WALEdit
-        for (Mutation m : rm.getMutations()) {
+        for (Mutation m : mutations) {
           if (m instanceof Put) {
             Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
             checkFamilies(familyMap.keySet());
@@ -4218,7 +4241,7 @@ public class HRegion implements HeapSize
 
         // 7. apply to memstore
         long addedSize = 0;
-        for (Mutation m : rm.getMutations()) {
+        for (Mutation m : mutations) {
           addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
         }
         flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
@@ -4231,7 +4254,7 @@ public class HRegion implements HeapSize
       }
       // 10. run all coprocessor post hooks, after region lock is released
       if (coprocessorHost != null) {
-        for (Mutation m : rm.getMutations()) {
+        for (Mutation m : mutations) {
           if (m instanceof Put) {
             coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
           } else if (m instanceof Delete) {
@@ -4240,9 +4263,11 @@ public class HRegion implements HeapSize
         }
       }
     } finally {
-      if (lid != null) {
+      if (acquiredLocks != null) {
         // 11. release the row lock
-        releaseRowLock(lid);
+        for (Integer lid : acquiredLocks) {
+          releaseRowLock(lid);
+        }
       }
       if (flush) {
         // 12. Flush cache if needed. Do it outside update lock.

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1242037&r1=1242036&r2=1242037&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 Wed Feb  8 18:30:37 2012
@@ -3160,7 +3160,7 @@ public class HRegionServer implements HR
       throws IOException {
     checkOpen();
     if (regionName == null) {
-      throw new IOException("Invalid arguments to atomicMutation " +
+      throw new IOException("Invalid arguments to mutateRow " +
       "regionName is null");
     }
     requestCount.incrementAndGet();
@@ -3169,7 +3169,7 @@ public class HRegionServer implements HR
       if (!region.getRegionInfo().isMetaTable()) {
         this.cacheFlusher.reclaimMemStoreMemory();
       }
-      region.mutateRow(rm, null);
+      region.mutateRow(rm);
     } catch (IOException e) {
       checkFileSystem();
       throw e;

Modified: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1242037&r1=1242036&r2=1242037&view=diff
==============================================================================
--- 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
 (original)
+++ 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
 Wed Feb  8 18:30:37 2012
@@ -36,7 +36,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -65,6 +64,9 @@ import org.apache.hadoop.hbase.LargeTest
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.HTable.DaemonThreadFactory;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationProtocol;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -112,6 +114,9 @@ public class TestFromClientSide {
    */
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        MultiRowMutationEndpoint.class.getName());
     // We need more than one region server in this test
     TEST_UTIL.startMiniCluster(SLAVES);
   }
@@ -4041,6 +4046,31 @@ public class TestFromClientSide {
   }
 
   @Test
+  public void testMultiRowMutation() throws Exception {
+    LOG.info("Starting testMultiRowMutation");
+    final byte [] TABLENAME = Bytes.toBytes("testMultiRowMutation");
+    final byte [] ROW1 = Bytes.toBytes("testRow1");
+
+    HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY);
+    List<Mutation> mrm = new ArrayList<Mutation>();
+    Put p = new Put(ROW);
+    p.add(FAMILY, QUALIFIER, VALUE);
+    mrm.add(p);
+    p = new Put(ROW1);
+    p.add(FAMILY, QUALIFIER, VALUE);
+    mrm.add(p);
+    MultiRowMutationProtocol mr = t.coprocessorProxy(
+        MultiRowMutationProtocol.class, ROW);
+    mr.mutateRows(mrm);
+    Get g = new Get(ROW);
+    Result r = t.get(g);
+    assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
+    g = new Get(ROW1);
+    r = t.get(g);
+    assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
+  }
+
+  @Test
   public void testRowMutation() throws Exception {
     LOG.info("Starting testRowMutation");
     final byte [] TABLENAME = Bytes.toBytes("testRowMutation");

Modified: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java?rev=1242037&r1=1242036&r2=1242037&view=diff
==============================================================================
--- 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
 (original)
+++ 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
 Wed Feb  8 18:30:37 2012
@@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.regionse
 
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -29,11 +32,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RowMutation;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
@@ -246,11 +251,11 @@ public class TestAtomicOperation extends
   }
 
   /**
-   * Test multi-threaded increments.
+   * Test multi-threaded row mutations.
    */
   public void testRowMutationMultiThreads() throws IOException {
 
-    LOG.info("Starting test testMutationMultiThreads");
+    LOG.info("Starting test testRowMutationMultiThreads");
     initHRegion(tableName, getName(), fam1);
 
     // create 100 threads, each will alternate between adding and
@@ -263,7 +268,52 @@ public class TestAtomicOperation extends
     AtomicInteger failures = new AtomicInteger(0);
     // create all threads
     for (int i = 0; i < numThreads; i++) {
-      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures);
+      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) 
{
+        @Override
+        public void run() {
+          boolean op = true;
+          for (int i=0; i<numOps; i++) {
+            try {
+              // throw in some flushes
+              if (r.nextFloat() < 0.001) {
+                LOG.debug("flushing");
+                region.flushcache();
+              }
+              long ts = timeStamps.incrementAndGet();
+              RowMutation rm = new RowMutation(row);
+              if (op) {
+                Put p = new Put(row, ts);
+                p.add(fam1, qual1, value1);
+                rm.add(p);
+                Delete d = new Delete(row);
+                d.deleteColumns(fam1, qual2, ts);
+                rm.add(d);
+              } else {
+                Delete d = new Delete(row);
+                d.deleteColumns(fam1, qual1, ts);
+                rm.add(d);
+                Put p = new Put(row, ts);
+                p.add(fam1, qual2, value2);
+                rm.add(p);
+              }
+              region.mutateRow(rm);
+              op ^= true;
+              // check: should always see exactly one column
+              Get g = new Get(row);
+              Result r = region.get(g, null);
+              if (r.size() != 1) {
+                LOG.debug(r);
+                failures.incrementAndGet();
+                fail();
+              }
+            } catch (IOException e) {
+              e.printStackTrace();
+              failures.incrementAndGet();
+              fail();
+            }
+          }
+        }
+      };
     }
 
     // run all threads
@@ -282,62 +332,104 @@ public class TestAtomicOperation extends
   }
 
 
+  /**
+   * Test multi-threaded region mutations.
+   */
+  public void testMultiRowMutationMultiThreads() throws IOException {
+
+    LOG.info("Starting test testMultiRowMutationMultiThreads");
+    initHRegion(tableName, getName(), fam1);
+
+    // create 100 threads, each will alternate between adding and
+    // removing a column
+    int numThreads = 100;
+    int opsPerThread = 1000;
+    AtomicOperation[] all = new AtomicOperation[numThreads];
+
+    AtomicLong timeStamps = new AtomicLong(0);
+    AtomicInteger failures = new AtomicInteger(0);
+    final List<byte[]> rowsToLock = Arrays.asList(row, row2);
+    // create all threads
+    for (int i = 0; i < numThreads; i++) {
+      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) 
{
+        @Override
+        public void run() {
+          boolean op = true;
+          for (int i=0; i<numOps; i++) {
+            try {
+              // throw in some flushes
+              if (r.nextFloat() < 0.001) {
+                LOG.debug("flushing");
+                region.flushcache();
+              }
+              long ts = timeStamps.incrementAndGet();
+              List<Mutation> mrm = new ArrayList<Mutation>();
+              if (op) {
+                Put p = new Put(row2, ts);
+                p.add(fam1, qual1, value1);
+                mrm.add(p);
+                Delete d = new Delete(row);
+                d.deleteColumns(fam1, qual1, ts);
+                mrm.add(d);
+              } else {
+                Delete d = new Delete(row2);
+                d.deleteColumns(fam1, qual1, ts);
+                mrm.add(d);
+                Put p = new Put(row, ts);
+                p.add(fam1, qual1, value2);
+                mrm.add(p);
+              }
+              region.mutateRowsWithLocks(mrm, rowsToLock);
+              op ^= true;
+              // check: should always see exactly one column
+              Scan s = new Scan(row);
+              RegionScanner rs = region.getScanner(s);
+              List<KeyValue> r = new ArrayList<KeyValue>();
+              while(rs.next(r));
+              if (r.size() != 1) {
+                LOG.debug(r);
+                failures.incrementAndGet();
+                fail();
+              }
+            } catch (IOException e) {
+              e.printStackTrace();
+              failures.incrementAndGet();
+              fail();
+            }
+          }
+        }
+      };
+    }
+
+    // run all threads
+    for (int i = 0; i < numThreads; i++) {
+      all[i].start();
+    }
+
+    // wait for all threads to finish
+    for (int i = 0; i < numThreads; i++) {
+      try {
+        all[i].join();
+      } catch (InterruptedException e) {
+      }
+    }
+    assertEquals(0, failures.get());
+  }
+
   public static class AtomicOperation extends Thread {
-    private final HRegion region;
-    private final int numOps;
-    private final AtomicLong timeStamps;
-    private final AtomicInteger failures;
-    private final Random r = new Random();
-    public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, 
AtomicInteger failures) {
+    protected final HRegion region;
+    protected final int numOps;
+    protected final AtomicLong timeStamps;
+    protected final AtomicInteger failures;
+    protected final Random r = new Random();
+
+    public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
+        AtomicInteger failures) {
       this.region = region;
       this.numOps = numOps;
       this.timeStamps = timeStamps;
       this.failures = failures;
     }
-    @Override
-    public void run() {
-      boolean op = true;
-      for (int i=0; i<numOps; i++) {
-        try {
-          // throw in some flushes
-          if (r.nextFloat() < 0.001) {
-            LOG.debug("flushing");
-            region.flushcache();
-          }
-          long ts = timeStamps.incrementAndGet();
-          RowMutation arm = new RowMutation(row);
-          if (op) {
-            Put p = new Put(row, ts);
-            p.add(fam1, qual1, value1);
-            arm.add(p);
-            Delete d = new Delete(row);
-            d.deleteColumns(fam1, qual2, ts);
-            arm.add(d);
-          } else {
-            Delete d = new Delete(row);
-            d.deleteColumns(fam1, qual1, ts);
-            arm.add(d);
-            Put p = new Put(row, ts);
-            p.add(fam1, qual2, value2);
-            arm.add(p);
-          }
-          region.mutateRow(arm, null);
-          op ^= true;
-          // check: should always see exactly one column
-          Get g = new Get(row);
-          Result r = region.get(g, null);
-          if (r.size() != 1) {
-            LOG.debug(r);
-            failures.incrementAndGet();
-            fail();
-          }
-        } catch (IOException e) {
-          e.printStackTrace();
-          failures.incrementAndGet();
-          fail();
-        }
-      }
-    }
   }
 
   @org.junit.Rule


Reply via email to