Repository: hbase
Updated Branches:
  refs/heads/master dadfe7da0 -> ca1048415


HBASE-14340 Add second bulk load option to Spark Bulk Load to send puts as the 
value (Ted Malaska)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ca104841
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ca104841
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ca104841

Branch: refs/heads/master
Commit: ca1048415bd842bb725357c4005da70788a79b02
Parents: dadfe7d
Author: Andrew Purtell <apurt...@apache.org>
Authored: Tue Nov 17 13:48:47 2015 -0800
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Tue Nov 17 13:48:47 2015 -0800

----------------------------------------------------------------------
 .../hbase/spark/BulkLoadPartitioner.scala       |  14 +-
 .../hadoop/hbase/spark/ByteArrayWrapper.scala   |  45 ++
 .../hbase/spark/FamiliesQualifiersValues.scala  |  55 ++
 .../hadoop/hbase/spark/HBaseContext.scala       | 553 +++++++++++++------
 .../hadoop/hbase/spark/HBaseRDDFunctions.scala  |  52 +-
 .../hadoop/hbase/spark/BulkLoadSuite.scala      | 505 +++++++++++++++--
 6 files changed, 1008 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ca104841/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
index c51a3af..39d3403 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
@@ -36,19 +36,21 @@ class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
 
   override def getPartition(key: Any): Int = {
 
+    val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] {
+      override def compare(o1: Array[Byte], o2: Array[Byte]): Int = {
+        Bytes.compareTo(o1, o2)
+      }
+    }
+
     val rowKey:Array[Byte] =
       key match {
         case qualifier: KeyFamilyQualifier =>
           qualifier.rowKey
+        case wrapper: ByteArrayWrapper =>
+          wrapper.value
         case _ =>
           key.asInstanceOf[Array[Byte]]
       }
-
-    val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] {
-      override def compare(o1: Array[Byte], o2: Array[Byte]): Int = {
-        Bytes.compareTo(o1, o2)
-      }
-    }
     val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
     if (partition < 0) partition * -1 + -2
     else partition

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca104841/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala
new file mode 100644
index 0000000..9167f75
--- /dev/null
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark
+
+import java.io.Serializable
+
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * This is a wrapper over a byte array so it can work as
+ * a key in a hashMap
+ *
+ * @param value The Byte Array value
+ */
+class ByteArrayWrapper (var value:Array[Byte])
+  extends Comparable[ByteArrayWrapper] with Serializable {
+  override def compareTo(valueOther: ByteArrayWrapper): Int = {
+    Bytes.compareTo(value,valueOther.value)
+  }
+  override def equals(o2: Any): Boolean = {
+    o2 match {
+      case wrapper: ByteArrayWrapper =>
+        Bytes.equals(value, wrapper.value)
+      case _ =>
+        false
+    }
+  }
+  override def hashCode():Int = {
+    Bytes.hashCode(value)
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca104841/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
new file mode 100644
index 0000000..33b3609
--- /dev/null
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark
+
+import java.util
+
+/**
+ * This object is a clean way to store and sort all cells that will be bulk
+ * loaded into a single row
+ */
+class FamiliesQualifiersValues extends Serializable {
+  //Tree maps are used because we need the results to
+  // be sorted when we read them
+  val familyMap = new util.TreeMap[ByteArrayWrapper,
+    util.TreeMap[ByteArrayWrapper, Array[Byte]]]()
+
+  //normally in a row there are more columns then
+  //column families this wrapper is reused for column
+  //family look ups
+  val reusableWrapper = new ByteArrayWrapper(null)
+
+  /**
+   * Adds a new cell to an existing row
+   * @param family    HBase column family
+   * @param qualifier HBase column qualifier
+   * @param value     HBase cell value
+   */
+  def += (family: Array[Byte], qualifier: Array[Byte], value: Array[Byte]): 
Unit = {
+
+    reusableWrapper.value = family
+
+    var qualifierValues = familyMap.get(reusableWrapper)
+
+    if (qualifierValues == null) {
+      qualifierValues = new util.TreeMap[ByteArrayWrapper, Array[Byte]]()
+      familyMap.put(new ByteArrayWrapper(family), qualifierValues)
+    }
+
+    qualifierValues.put(new ByteArrayWrapper(qualifier), value)
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca104841/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index 57ae6b0..f8c24f2 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.spark
 
 import java.net.InetSocketAddress
 import java.util
+import javax.management.openmbean.KeyAlreadyExistsException
 
 import org.apache.hadoop.hbase.fs.HFileSystem
 import org.apache.hadoop.hbase._
@@ -575,7 +576,8 @@ class HBaseContext(@transient sc: SparkContext,
   def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
 
   /**
-   * A Spark Implementation of HBase Bulk load
+   * Spark Implementation of HBase Bulk load for wide rows or when
+   * values are not already combined at the time of the map process
    *
    * This will take the content from an existing RDD then sort and shuffle
    * it with respect to region splits.  The result of that sort and shuffle
@@ -616,10 +618,10 @@ class HBaseContext(@transient sc: SparkContext,
     val startKeys = regionLocator.getStartKeys
     val defaultCompressionStr = config.get("hfile.compression",
       Compression.Algorithm.NONE.getName)
-    val defaultCompression = HFileWriterImpl
+    val hfileCompression = HFileWriterImpl
       .compressionByName(defaultCompressionStr)
-    val now = System.currentTimeMillis()
-    val tableNameByteArray = tableName.getName
+    val nowTimeStamp = System.currentTimeMillis()
+    val tableRawName = tableName.getName
 
     val familyHFileWriteOptionsMapInternal =
       new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
@@ -631,53 +633,6 @@ class HBaseContext(@transient sc: SparkContext,
       familyHFileWriteOptionsMapInternal.put(new 
ByteArrayWrapper(entry.getKey), entry.getValue)
     }
 
-    /**
-     *  This will return a new HFile writer when requested
-     *
-     * @param family       column family
-     * @param conf         configuration to connect to HBase
-     * @param favoredNodes nodes that we would like to write too
-     * @param fs           FileSystem object where we will be writing the 
HFiles to
-     * @return WriterLength object
-     */
-    def getNewWriter(family: Array[Byte], conf: Configuration,
-                     favoredNodes: Array[InetSocketAddress],
-                     fs:FileSystem,
-                     familydir:Path): WriterLength = {
-
-
-      var familyOptions = familyHFileWriteOptionsMapInternal.get(new 
ByteArrayWrapper(family))
-
-      if (familyOptions == null) {
-        familyOptions = new 
FamilyHFileWriteOptions(defaultCompression.toString,
-          BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, 
DataBlockEncoding.NONE.toString)
-        familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), 
familyOptions)
-      }
-
-      val tempConf = new Configuration(conf)
-      tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
-      val contextBuilder = new HFileContextBuilder()
-        .withCompression(Algorithm.valueOf(familyOptions.compression))
-        .withChecksumType(HStore.getChecksumType(conf))
-        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
-        .withBlockSize(familyOptions.blockSize)
-      contextBuilder.withDataBlockEncoding(DataBlockEncoding.
-        valueOf(familyOptions.dataBlockEncoding))
-      val hFileContext = contextBuilder.build()
-
-      if (null == favoredNodes) {
-        new WriterLength(0, new StoreFile.WriterBuilder(conf, new 
CacheConfig(tempConf), fs)
-          
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
-          
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
-      } else {
-        new WriterLength(0,
-          new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new 
HFileSystem(fs))
-          
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
-          
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
-          .withFavoredNodes(favoredNodes).build())
-      }
-    }
-
     val regionSplitPartitioner =
       new BulkLoadPartitioner(startKeys)
 
@@ -695,150 +650,422 @@ class HBaseContext(@transient sc: SparkContext,
       val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
       var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
       var rollOverRequested = false
-
-      /**
-       * This will roll all writers
-       */
-      def rollWriters(): Unit = {
-        writerMap.values.foreach( wl => {
-          if (wl.writer != null) {
-            logDebug("Writer=" + wl.writer.getPath +
-              (if (wl.written == 0) "" else ", wrote=" + wl.written))
-            close(wl.writer)
-          }
-        })
-        writerMap.clear()
-        rollOverRequested = false
-      }
-
-      /**
-       * This function will close a given HFile writer
-       * @param w The writer to close
-       */
-      def close(w:StoreFile.Writer): Unit = {
-        if (w != null) {
-          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
-            Bytes.toBytes(System.currentTimeMillis()))
-          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
-            Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
-          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
-            Bytes.toBytes(true))
-          w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
-            Bytes.toBytes(compactionExclude))
-          w.appendTrackedTimestampsToMetadata()
-          w.close()
-        }
-      }
+      val localTableName = TableName.valueOf(tableRawName)
 
       //Here is where we finally iterate through the data in this partition of 
the
       //RDD that has been sorted and partitioned
       it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
 
-        //This will get a writer for the column family
-        //If there is no writer for a given column family then
-        //it will get created here.
-        val wl = writerMap.getOrElseUpdate(new 
ByteArrayWrapper(keyFamilyQualifier.family), {
-
-          val familyDir = new Path(stagingDir, 
Bytes.toString(keyFamilyQualifier.family))
-
-          fs.mkdirs(familyDir)
-
-          val loc:HRegionLocation = {
-            try {
-              val locator =
-                conn.getRegionLocator(TableName.valueOf(tableNameByteArray))
-              locator.getRegionLocation(keyFamilyQualifier.rowKey)
-            } catch {
-              case e: Throwable =>
-              logWarning("there's something wrong when locating rowkey: " +
-                Bytes.toString(keyFamilyQualifier.rowKey))
-                null
-            }
-          }
-          if (null == loc) {
-            if (log.isTraceEnabled) {
-              logTrace("failed to get region location, so use default writer: 
" +
-                Bytes.toString(keyFamilyQualifier.rowKey))
-            }
-            getNewWriter(family = keyFamilyQualifier.family, conf = conf, 
favoredNodes = null,
-              fs = fs, familydir = familyDir)
-          } else {
-            if (log.isDebugEnabled) {
-              logDebug("first rowkey: [" + 
Bytes.toString(keyFamilyQualifier.rowKey) + "]")
-            }
-            val initialIsa =
-              new InetSocketAddress(loc.getHostname, loc.getPort)
-            if (initialIsa.isUnresolved) {
-              if (log.isTraceEnabled) {
-                logTrace("failed to resolve bind address: " + loc.getHostname 
+ ":"
-                  + loc.getPort + ", so use default writer")
-              }
-              getNewWriter(keyFamilyQualifier.family, conf, null, fs, 
familyDir)
-            } else {
-              if(log.isDebugEnabled) {
-                logDebug("use favored nodes writer: " + 
initialIsa.getHostString)
-              }
-              getNewWriter(keyFamilyQualifier.family, conf,
-                Array[InetSocketAddress](initialIsa), fs, familyDir)
-            }
-          }
-        })
-
-        val keyValue =new KeyValue(keyFamilyQualifier.rowKey,
+        val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
           keyFamilyQualifier.family,
           keyFamilyQualifier.qualifier,
-          now,cellValue)
-
-        wl.writer.append(keyValue)
-        wl.written += keyValue.getLength
+          cellValue,
+          nowTimeStamp,
+          fs,
+          conn,
+          localTableName,
+          conf,
+          familyHFileWriteOptionsMapInternal,
+          hfileCompression,
+          writerMap,
+          stagingDir)
 
         rollOverRequested = rollOverRequested || wl.written > maxSize
 
         //This will only roll if we have at least one column family file that 
is
         //bigger then maxSize and we have finished a given row key
         if (rollOverRequested && Bytes.compareTo(previousRow, 
keyFamilyQualifier.rowKey) != 0) {
-          rollWriters()
+          rollWriters(writerMap,
+            regionSplitPartitioner,
+            previousRow,
+            compactionExclude)
+          rollOverRequested = false
         }
 
         previousRow = keyFamilyQualifier.rowKey
       }
       //We have finished all the data so lets close up the writers
-      rollWriters()
+      rollWriters(writerMap,
+        regionSplitPartitioner,
+        previousRow,
+        compactionExclude)
+      rollOverRequested = false
     })
   }
 
   /**
-   * This is a wrapper class around StoreFile.Writer.  The reason for the
-   * wrapper is to keep the length of the file along side the writer
+   * Spark Implementation of HBase Bulk load for short rows some where less 
then
+   * a 1000 columns.  This bulk load should be faster for tables will thinner
+   * rows then the other spark implementation of bulk load that puts only one
+   * value into a record going into a shuffle
    *
-   * @param written The writer to be wrapped
-   * @param writer  The number of bytes written to the writer
+   * This will take the content from an existing RDD then sort and shuffle
+   * it with respect to region splits.  The result of that sort and shuffle
+   * will be written to HFiles.
+   *
+   * After this function is executed the user will have to call
+   * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
+   *
+   * In this implementation, only the rowKey is given to the shuffle as the key
+   * and all the columns are already linked to the RowKey before the shuffle
+   * stage.  The sorting of the qualifier is done in memory out side of the
+   * shuffle stage
+   *
+   * Also make sure that incoming RDDs only have one record for every row key.
+   *
+   * @param rdd                            The RDD we are bulk loading from
+   * @param tableName                      The HBase table we are loading into
+   * @param mapFunction                    A function that will convert the 
RDD records to
+   *                                       the key value format used for the 
shuffle to prep
+   *                                       for writing to the bulk loaded 
HFiles
+   * @param stagingDir                     The location on the FileSystem to 
bulk load into
+   * @param familyHFileWriteOptionsMap     Options that will define how the 
HFile for a
+   *                                       column family is written
+   * @param compactionExclude              Compaction excluded for the HFiles
+   * @param maxSize                        Max size for the HFiles before they 
roll
+   * @tparam T                             The Type of values in the original 
RDD
    */
-  class WriterLength(var written:Long, val writer:StoreFile.Writer)
+  def bulkLoadThinRows[T](rdd:RDD[T],
+                  tableName: TableName,
+                  mapFunction: (T) =>
+                    (ByteArrayWrapper, FamiliesQualifiersValues),
+                  stagingDir:String,
+                  familyHFileWriteOptionsMap:
+                  util.Map[Array[Byte], FamilyHFileWriteOptions] =
+                  new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
+                  compactionExclude: Boolean = false,
+                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
+  Unit = {
+    val conn = ConnectionFactory.createConnection(config)
+    val regionLocator = conn.getRegionLocator(tableName)
+    val startKeys = regionLocator.getStartKeys
+    val defaultCompressionStr = config.get("hfile.compression",
+      Compression.Algorithm.NONE.getName)
+    val defaultCompression = HFileWriterImpl
+      .compressionByName(defaultCompressionStr)
+    val nowTimeStamp = System.currentTimeMillis()
+    val tableRawName = tableName.getName
+
+    val familyHFileWriteOptionsMapInternal =
+      new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
+
+    val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
+
+    while (entrySetIt.hasNext) {
+      val entry = entrySetIt.next()
+      familyHFileWriteOptionsMapInternal.put(new 
ByteArrayWrapper(entry.getKey), entry.getValue)
+    }
+
+    val regionSplitPartitioner =
+      new BulkLoadPartitioner(startKeys)
+
+    //This is where all the magic happens
+    //Here we are going to do the following things
+    // 1. FlapMap every row in the RDD into key column value tuples
+    // 2. Then we are going to repartition sort and shuffle
+    // 3. Finally we are going to write out our HFiles
+    rdd.map( r => mapFunction(r)).
+      repartitionAndSortWithinPartitions(regionSplitPartitioner).
+      hbaseForeachPartition(this, (it, conn) => {
+
+      val conf = broadcastedConf.value.value
+      val fs = FileSystem.get(conf)
+      val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
+      var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
+      var rollOverRequested = false
+      val localTableName = TableName.valueOf(tableRawName)
+
+      //Here is where we finally iterate through the data in this partition of 
the
+      //RDD that has been sorted and partitioned
+      it.foreach{ case (rowKey:ByteArrayWrapper,
+      familiesQualifiersValues:FamiliesQualifiersValues) =>
+
+
+        if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
+          throw new KeyAlreadyExistsException("The following key was sent to 
the " +
+            "HFile load more then one: " + Bytes.toString(previousRow))
+        }
+
+        //The family map is a tree map so the families will be sorted
+        val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
+        while (familyIt.hasNext) {
+          val familyEntry = familyIt.next()
+
+          val family = familyEntry.getKey.value
+
+          val qualifierIt = familyEntry.getValue.entrySet().iterator()
+
+          //The qualifier map is a tree map so the families will be sorted
+          while (qualifierIt.hasNext) {
+
+            val qualifierEntry = qualifierIt.next()
+            val qualifier = qualifierEntry.getKey
+            val cellValue = qualifierEntry.getValue
+
+            writeValueToHFile(rowKey.value,
+              family,
+              qualifier.value, // qualifier
+              cellValue, // value
+              nowTimeStamp,
+              fs,
+              conn,
+              localTableName,
+              conf,
+              familyHFileWriteOptionsMapInternal,
+              defaultCompression,
+              writerMap,
+              stagingDir)
+
+            previousRow = rowKey.value
+          }
+
+          writerMap.values.foreach( wl => {
+            rollOverRequested = rollOverRequested || wl.written > maxSize
+
+            //This will only roll if we have at least one column family file 
that is
+            //bigger then maxSize and we have finished a given row key
+            if (rollOverRequested) {
+              rollWriters(writerMap,
+                regionSplitPartitioner,
+                previousRow,
+                compactionExclude)
+              rollOverRequested = false
+            }
+          })
+        }
+      }
+
+      //This will get a writer for the column family
+      //If there is no writer for a given column family then
+      //it will get created here.
+      //We have finished all the data so lets close up the writers
+      rollWriters(writerMap,
+        regionSplitPartitioner,
+        previousRow,
+        compactionExclude)
+      rollOverRequested = false
+    })
+  }
 
   /**
-   * This is a wrapper over a byte array so it can work as
-   * a key in a hashMap
+   *  This will return a new HFile writer when requested
    *
-   * @param o1 The Byte Array value
+   * @param family       column family
+   * @param conf         configuration to connect to HBase
+   * @param favoredNodes nodes that we would like to write too
+   * @param fs           FileSystem object where we will be writing the HFiles 
to
+   * @return WriterLength object
    */
-  class ByteArrayWrapper (val o1:Array[Byte])
-    extends Comparable[ByteArrayWrapper] with Serializable {
-    override def compareTo(o2: ByteArrayWrapper): Int = {
-      Bytes.compareTo(o1,o2.o1)
+  private def getNewHFileWriter(family: Array[Byte], conf: Configuration,
+                   favoredNodes: Array[InetSocketAddress],
+                   fs:FileSystem,
+                   familydir:Path,
+                   familyHFileWriteOptionsMapInternal:
+                   util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
+                   defaultCompression:Compression.Algorithm): WriterLength = {
+
+
+    var familyOptions = familyHFileWriteOptionsMapInternal.get(new 
ByteArrayWrapper(family))
+
+    if (familyOptions == null) {
+      familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString,
+        BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, 
DataBlockEncoding.NONE.toString)
+      familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), 
familyOptions)
     }
-    override def equals(o2: Any): Boolean = {
-      o2 match {
-        case wrapper: ByteArrayWrapper =>
-          Bytes.equals(o1, wrapper.o1)
-        case _ =>
-          false
-      }
+
+    val tempConf = new Configuration(conf)
+    tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
+    val contextBuilder = new HFileContextBuilder()
+      .withCompression(Algorithm.valueOf(familyOptions.compression))
+      .withChecksumType(HStore.getChecksumType(conf))
+      .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+      .withBlockSize(familyOptions.blockSize)
+    contextBuilder.withDataBlockEncoding(DataBlockEncoding.
+      valueOf(familyOptions.dataBlockEncoding))
+    val hFileContext = contextBuilder.build()
+
+    if (null == favoredNodes) {
+      new WriterLength(0, new StoreFile.WriterBuilder(conf, new 
CacheConfig(tempConf), fs)
+        
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
+        
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
+    } else {
+      new WriterLength(0,
+        new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new 
HFileSystem(fs))
+          
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
+          
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
+          .withFavoredNodes(favoredNodes).build())
     }
-    override def hashCode():Int = {
-      Bytes.hashCode(o1)
+  }
+
+  /**
+   * Encompasses the logic to write a value to an HFile
+   *
+   * @param rowKey                             The RowKey for the record
+   * @param family                             HBase column family for the 
record
+   * @param qualifier                          HBase column qualifier for the 
record
+   * @param cellValue                          HBase cell value
+   * @param nowTimeStamp                       The cell time stamp
+   * @param fs                                 Connection to the FileSystem 
for the HFile
+   * @param conn                               Connection to HBaes
+   * @param tableName                          HBase TableName object
+   * @param conf                               Configuration to be used when 
making a new HFile
+   * @param familyHFileWriteOptionsMapInternal Extra configs for the HFile
+   * @param hfileCompression                   The compression codec for the 
new HFile
+   * @param writerMap                          HashMap of existing writers and 
their offsets
+   * @param stagingDir                         The staging directory on the 
FileSystem to store
+   *                                           the HFiles
+   * @return                                   The writer for the given HFile 
that was writen
+   *                                           too
+   */
+  private def writeValueToHFile(rowKey: Array[Byte],
+                        family: Array[Byte],
+                        qualifier: Array[Byte],
+                        cellValue:Array[Byte],
+                        nowTimeStamp: Long,
+                        fs: FileSystem,
+                        conn: Connection,
+                        tableName: TableName,
+                        conf: Configuration,
+                        familyHFileWriteOptionsMapInternal:
+                        util.HashMap[ByteArrayWrapper, 
FamilyHFileWriteOptions],
+                        hfileCompression:Compression.Algorithm,
+                        writerMap:mutable.HashMap[ByteArrayWrapper, 
WriterLength],
+                        stagingDir: String
+                         ): WriterLength = {
+
+    val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(family), {
+      val familyDir = new Path(stagingDir, Bytes.toString(family))
+
+      fs.mkdirs(familyDir)
+
+      val loc:HRegionLocation = {
+        try {
+          val locator =
+            conn.getRegionLocator(tableName)
+          locator.getRegionLocation(rowKey)
+        } catch {
+          case e: Throwable =>
+            logWarning("there's something wrong when locating rowkey: " +
+              Bytes.toString(rowKey))
+            null
+        }
+      }
+      if (null == loc) {
+        if (log.isTraceEnabled) {
+          logTrace("failed to get region location, so use default writer: " +
+            Bytes.toString(rowKey))
+        }
+        getNewHFileWriter(family = family,
+          conf = conf,
+          favoredNodes = null,
+          fs = fs,
+          familydir = familyDir,
+          familyHFileWriteOptionsMapInternal,
+          hfileCompression)
+      } else {
+        if (log.isDebugEnabled) {
+          logDebug("first rowkey: [" + Bytes.toString(rowKey) + "]")
+        }
+        val initialIsa =
+          new InetSocketAddress(loc.getHostname, loc.getPort)
+        if (initialIsa.isUnresolved) {
+          if (log.isTraceEnabled) {
+            logTrace("failed to resolve bind address: " + loc.getHostname + ":"
+              + loc.getPort + ", so use default writer")
+          }
+          getNewHFileWriter(family,
+            conf,
+            null,
+            fs,
+            familyDir,
+            familyHFileWriteOptionsMapInternal,
+            hfileCompression)
+        } else {
+          if(log.isDebugEnabled) {
+            logDebug("use favored nodes writer: " + initialIsa.getHostString)
+          }
+          getNewHFileWriter(family,
+            conf,
+            Array[InetSocketAddress](initialIsa),
+            fs,
+            familyDir,
+            familyHFileWriteOptionsMapInternal,
+            hfileCompression)
+        }
+      }
+    })
+
+    val keyValue =new KeyValue(rowKey,
+      family,
+      qualifier,
+      nowTimeStamp,cellValue)
+
+    wl.writer.append(keyValue)
+    wl.written += keyValue.getLength
+
+    wl
+  }
+
+  /**
+   * This will roll all Writers
+   * @param writerMap              HashMap that contains all the writers
+   * @param regionSplitPartitioner The partitioner with knowledge of how the
+   *                               Region's are split by row key
+   * @param previousRow            The last row to fill the HFile ending range 
metadata
+   * @param compactionExclude      The exclude compaction metadata flag for 
the HFile
+   */
+  private def rollWriters(writerMap:mutable.HashMap[ByteArrayWrapper, 
WriterLength],
+                  regionSplitPartitioner: BulkLoadPartitioner,
+                  previousRow: Array[Byte],
+                  compactionExclude: Boolean): Unit = {
+    writerMap.values.foreach( wl => {
+      if (wl.writer != null) {
+        logDebug("Writer=" + wl.writer.getPath +
+          (if (wl.written == 0) "" else ", wrote=" + wl.written))
+        closeHFileWriter(wl.writer,
+          regionSplitPartitioner,
+          previousRow,
+          compactionExclude)
+      }
+    })
+    writerMap.clear()
+
+  }
+
+  /**
+   * Function to close an HFile
+   * @param w                      HFile Writer
+   * @param regionSplitPartitioner The partitioner with knowledge of how the
+   *                               Region's are split by row key
+   * @param previousRow            The last row to fill the HFile ending range 
metadata
+   * @param compactionExclude      The exclude compaction metadata flag for 
the HFile
+   */
+  private def closeHFileWriter(w: StoreFile.Writer,
+            regionSplitPartitioner: BulkLoadPartitioner,
+            previousRow: Array[Byte],
+            compactionExclude: Boolean): Unit = {
+    if (w != null) {
+      w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+        Bytes.toBytes(System.currentTimeMillis()))
+      w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+        Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
+      w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
+        Bytes.toBytes(true))
+      w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+        Bytes.toBytes(compactionExclude))
+      w.appendTrackedTimestampsToMetadata()
+      w.close()
     }
   }
+
+  /**
+   * This is a wrapper class around StoreFile.Writer.  The reason for the
+   * wrapper is to keep the length of the file along side the writer
+   *
+   * @param written The writer to be wrapped
+   * @param writer  The number of bytes written to the writer
+   */
+  class WriterLength(var written:Long, val writer:StoreFile.Writer)
 }
 
 object LatestHBaseContextCache {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca104841/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
index 7c59145..601642a 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
@@ -19,13 +19,11 @@ package org.apache.hadoop.hbase.spark
 
 import java.util
 
-import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hbase.{HConstants, TableName}
 import org.apache.hadoop.hbase.client._
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.spark.rdd.RDD
 
-import scala.collection.immutable.HashMap
 import scala.reflect.ClassTag
 
 /**
@@ -164,8 +162,8 @@ object HBaseRDDFunctions
     }
 
     /**
-     * Implicit method that gives easy access to HBaseContext's
-     * bulkLoad method.
+     * Spark Implementation of HBase Bulk load for wide rows or when
+     * values are not already combined at the time of the map process
      *
      * A Spark Implementation of HBase Bulk load
      *
@@ -203,5 +201,51 @@ object HBaseRDDFunctions
         flatMap, stagingDir, familyHFileWriteOptionsMap,
         compactionExclude, maxSize)
     }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * bulkLoadThinRows method.
+     *
+     * Spark Implementation of HBase Bulk load for short rows some where less 
then
+     * a 1000 columns.  This bulk load should be faster for tables will thinner
+     * rows then the other spark implementation of bulk load that puts only one
+     * value into a record going into a shuffle
+     *
+     * This will take the content from an existing RDD then sort and shuffle
+     * it with respect to region splits.  The result of that sort and shuffle
+     * will be written to HFiles.
+     *
+     * After this function is executed the user will have to call
+     * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
+     *
+     * In this implementation only the rowKey is given to the shuffle as the 
key
+     * and all the columns are already linked to the RowKey before the shuffle
+     * stage.  The sorting of the qualifier is done in memory out side of the
+     * shuffle stage
+     *
+     * @param tableName                      The HBase table we are loading 
into
+     * @param mapFunction                    A function that will convert the 
RDD records to
+     *                                       the key value format used for the 
shuffle to prep
+     *                                       for writing to the bulk loaded 
HFiles
+     * @param stagingDir                     The location on the FileSystem to 
bulk load into
+     * @param familyHFileWriteOptionsMap     Options that will define how the 
HFile for a
+     *                                       column family is written
+     * @param compactionExclude              Compaction excluded for the HFiles
+     * @param maxSize                        Max size for the HFiles before 
they roll
+     */
+    def hbaseBulkLoadThinRows(hc: HBaseContext,
+                      tableName: TableName,
+                      mapFunction: (T) =>
+                        (ByteArrayWrapper, FamiliesQualifiersValues),
+                      stagingDir:String,
+                      familyHFileWriteOptionsMap:
+                      util.Map[Array[Byte], FamilyHFileWriteOptions] =
+                      new util.HashMap[Array[Byte], FamilyHFileWriteOptions](),
+                      compactionExclude: Boolean = false,
+                      maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = {
+      hc.bulkLoadThinRows(rdd, tableName,
+        mapFunction, stagingDir, familyHFileWriteOptionsMap,
+        compactionExclude, maxSize)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca104841/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala 
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
index 2e5381a..795ce6d 100644
--- 
a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
+++ 
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
@@ -65,8 +65,8 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     sc.stop()
   }
 
-  test("Basic Test multi family and multi column tests " +
-    "with all default HFile Configs") {
+  test("Wide Row Bulk Load: Test multi family and multi column tests " +
+    "with all default HFile Configs.") {
     val config = TEST_UTIL.getConfiguration
 
     logInfo(" - creating table " + tableName)
@@ -81,36 +81,38 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     // 5. There are records will a single qualifier and some with two
     val rdd = sc.parallelize(Array(
       (Bytes.toBytes("1"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo1")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo1"))),
       (Bytes.toBytes("3"),
-        Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), 
Bytes.toBytes("foo2.a")))),
+        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), 
Bytes.toBytes("foo2.a"))),
       (Bytes.toBytes("3"),
-        Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.b")))),
+        (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.b"))),
       (Bytes.toBytes("3"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.c")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.c"))),
       (Bytes.toBytes("5"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo3")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo3"))),
       (Bytes.toBytes("4"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo.1")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo.1"))),
       (Bytes.toBytes("4"),
-        Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), 
Bytes.toBytes("foo.2")))),
+        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), 
Bytes.toBytes("foo.2"))),
       (Bytes.toBytes("2"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("bar.1")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("bar.1"))),
       (Bytes.toBytes("2"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("bar.2"))))))
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("bar.2")))))
+
+
 
     val hbaseContext = new HBaseContext(sc, config)
 
     testFolder.create()
     val stagingFolder = testFolder.newFolder()
 
-    hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], 
Array[Byte])])](rdd,
+    hbaseContext.bulkLoad[(Array[Byte], (Array[Byte], Array[Byte], 
Array[Byte]))](rdd,
       TableName.valueOf(tableName),
       t => {
         val rowKey = t._1
-        val family:Array[Byte] = t._2(0)._1
-        val qualifier = t._2(0)._2
-        val value = t._2(0)._3
+        val family:Array[Byte] = t._2._1
+        val qualifier = t._2._2
+        val value:Array[Byte] = t._2._3
 
         val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, 
qualifier)
 
@@ -188,7 +190,7 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     }
   }
 
-  test("bulkLoad to test HBase client: Test Roll Over and " +
+  test("Wide Row Bulk Load: Test HBase client: Test Roll Over and " +
     "using an implicit call to bulk load") {
     val config = TEST_UTIL.getConfiguration
 
@@ -204,23 +206,23 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     // 5. There are records will a single qualifier and some with two
     val rdd = sc.parallelize(Array(
       (Bytes.toBytes("1"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo1")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo1"))),
       (Bytes.toBytes("3"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("foo2.b")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("foo2.b"))),
       (Bytes.toBytes("3"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.a")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.a"))),
       (Bytes.toBytes("3"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), 
Bytes.toBytes("foo2.c")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), 
Bytes.toBytes("foo2.c"))),
       (Bytes.toBytes("5"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo3")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo3"))),
       (Bytes.toBytes("4"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo.1")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo.1"))),
       (Bytes.toBytes("4"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("foo.2")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("foo.2"))),
       (Bytes.toBytes("2"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("bar.1")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("bar.1"))),
       (Bytes.toBytes("2"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("bar.2"))))))
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("bar.2")))))
 
     val hbaseContext = new HBaseContext(sc, config)
 
@@ -231,9 +233,9 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
       TableName.valueOf(tableName),
       t => {
         val rowKey = t._1
-        val family:Array[Byte] = t._2(0)._1
-        val qualifier = t._2(0)._2
-        val value = t._2(0)._3
+        val family:Array[Byte] = t._2._1
+        val qualifier = t._2._2
+        val value = t._2._3
 
         val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, 
qualifier)
 
@@ -314,7 +316,7 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     }
   }
 
-  test("Basic Test multi family and multi column tests" +
+  test("Wide Row Bulk Load: Test multi family and multi column tests" +
     " with one column family with custom configs plus multi region") {
     val config = TEST_UTIL.getConfiguration
 
@@ -335,23 +337,23 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     // 5. There are records will a single qualifier and some with two
     val rdd = sc.parallelize(Array(
       (Bytes.toBytes("1"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo1")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo1"))),
       (Bytes.toBytes("3"),
-        Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), 
Bytes.toBytes("foo2.a")))),
+        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), 
Bytes.toBytes("foo2.a"))),
       (Bytes.toBytes("3"),
-        Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.b")))),
+        (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.b"))),
       (Bytes.toBytes("3"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.c")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.c"))),
       (Bytes.toBytes("5"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo3")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo3"))),
       (Bytes.toBytes("4"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo.1")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo.1"))),
       (Bytes.toBytes("4"),
-        Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), 
Bytes.toBytes("foo.2")))),
+        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), 
Bytes.toBytes("foo.2"))),
       (Bytes.toBytes("2"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("bar.1")))),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("bar.1"))),
       (Bytes.toBytes("2"),
-        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("bar.2"))))))
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("bar.2")))))
 
     val hbaseContext = new HBaseContext(sc, config)
 
@@ -365,13 +367,13 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
 
     familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options)
 
-    hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], 
Array[Byte])])](rdd,
+    hbaseContext.bulkLoad[(Array[Byte], (Array[Byte], Array[Byte], 
Array[Byte]))](rdd,
       TableName.valueOf(tableName),
       t => {
         val rowKey = t._1
-        val family:Array[Byte] = t._2(0)._1
-        val qualifier = t._2(0)._2
-        val value = t._2(0)._3
+        val family:Array[Byte] = t._2._1
+        val qualifier = t._2._2
+        val value = t._2._3
 
         val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, 
qualifier)
 
@@ -473,7 +475,7 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     }
   }
 
-  test("bulkLoad partitioner tests") {
+  test("Test partitioner") {
 
     var splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](3)
     splitKeys(0) = Bytes.toBytes("")
@@ -530,8 +532,425 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     assert(5 == partitioner.getPartition(Bytes.toBytes("11")))
     assert(6 == partitioner.getPartition(Bytes.toBytes("12")))
     assert(6 == partitioner.getPartition(Bytes.toBytes("13")))
+  }
+
+  test("Thin Row Bulk Load: Test multi family and multi column tests " +
+    "with all default HFile Configs") {
+    val config = TEST_UTIL.getConfiguration
+
+    logInfo(" - creating table " + tableName)
+    TEST_UTIL.createTable(TableName.valueOf(tableName),
+      Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
+
+    //There are a number of tests in here.
+    // 1. Row keys are not in order
+    // 2. Qualifiers are not in order
+    // 3. Column Families are not in order
+    // 4. There are tests for records in one column family and some in two 
column families
+    // 5. There are records will a single qualifier and some with two
+    val rdd = sc.parallelize(Array(
+      ("1",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo1"))),
+      ("3",
+        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), 
Bytes.toBytes("foo2.a"))),
+      ("3",
+        (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.b"))),
+      ("3",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.c"))),
+      ("5",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo3"))),
+      ("4",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo.1"))),
+      ("4",
+        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), 
Bytes.toBytes("foo.2"))),
+      ("2",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("bar.1"))),
+      ("2",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("bar.2"))))).
+      groupByKey()
+
+    val hbaseContext = new HBaseContext(sc, config)
+
+    testFolder.create()
+    val stagingFolder = testFolder.newFolder()
+
+    hbaseContext.bulkLoadThinRows[(String, Iterable[(Array[Byte], Array[Byte], 
Array[Byte])])](rdd,
+      TableName.valueOf(tableName),
+      t => {
+        val rowKey = Bytes.toBytes(t._1)
+
+        val familyQualifiersValues = new FamiliesQualifiersValues
+        t._2.foreach(f => {
+          val family:Array[Byte] = f._1
+          val qualifier = f._2
+          val value:Array[Byte] = f._3
+
+          familyQualifiersValues +=(family, qualifier, value)
+        })
+        (new ByteArrayWrapper(rowKey), familyQualifiersValues)
+      },
+      stagingFolder.getPath)
+
+    val fs = FileSystem.get(config)
+    assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
+
+    val conn = ConnectionFactory.createConnection(config)
+
+    val load = new LoadIncrementalHFiles(config)
+    val table = conn.getTable(TableName.valueOf(tableName))
+    try {
+      load.doBulkLoad(new Path(stagingFolder.getPath), conn.getAdmin, table,
+        conn.getRegionLocator(TableName.valueOf(tableName)))
+
+      val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
+      assert(cells5.size == 1)
+      assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
+
+      val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
+      assert(cells4.size == 2)
+      
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
+      
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
+
+      val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
+      assert(cells3.size == 3)
+      
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
+      
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
+      
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
+
+
+      val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
+      assert(cells2.size == 2)
+      
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
+      
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
+
+      val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
+      assert(cells1.size == 1)
+      assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
+
+    } finally {
+      table.close()
+      val admin = ConnectionFactory.createConnection(config).getAdmin
+      try {
+        admin.disableTable(TableName.valueOf(tableName))
+        admin.deleteTable(TableName.valueOf(tableName))
+      } finally {
+        admin.close()
+      }
+      fs.delete(new Path(stagingFolder.getPath), true)
+
+      testFolder.delete()
+
+    }
+  }
+
+  test("Thin Row Bulk Load: Test HBase client: Test Roll Over and " +
+    "using an implicit call to bulk load") {
+    val config = TEST_UTIL.getConfiguration
+
+    logInfo(" - creating table " + tableName)
+    TEST_UTIL.createTable(TableName.valueOf(tableName),
+      Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
+
+    //There are a number of tests in here.
+    // 1. Row keys are not in order
+    // 2. Qualifiers are not in order
+    // 3. Column Families are not in order
+    // 4. There are tests for records in one column family and some in two 
column families
+    // 5. There are records will a single qualifier and some with two
+    val rdd = sc.parallelize(Array(
+      ("1",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo1"))),
+      ("3",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("foo2.b"))),
+      ("3",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.a"))),
+      ("3",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), 
Bytes.toBytes("foo2.c"))),
+      ("5",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo3"))),
+      ("4",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo.1"))),
+      ("4",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("foo.2"))),
+      ("2",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("bar.1"))),
+      ("2",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("bar.2"))))).
+      groupByKey()
+
+    val hbaseContext = new HBaseContext(sc, config)
+
+    testFolder.create()
+    val stagingFolder = testFolder.newFolder()
+
+    rdd.hbaseBulkLoadThinRows(hbaseContext,
+      TableName.valueOf(tableName),
+      t => {
+        val rowKey = t._1
+
+        val familyQualifiersValues = new FamiliesQualifiersValues
+        t._2.foreach(f => {
+          val family:Array[Byte] = f._1
+          val qualifier = f._2
+          val value:Array[Byte] = f._3
+
+          familyQualifiersValues +=(family, qualifier, value)
+        })
+        (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
+      },
+      stagingFolder.getPath,
+      new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
+      compactionExclude = false,
+      20)
+
+    val fs = FileSystem.get(config)
+    assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 1)
+
+    assert(fs.listStatus(new Path(stagingFolder.getPath+ "/f1")).length == 5)
+
+    val conn = ConnectionFactory.createConnection(config)
+
+    val load = new LoadIncrementalHFiles(config)
+    val table = conn.getTable(TableName.valueOf(tableName))
+    try {
+      load.doBulkLoad(new Path(stagingFolder.getPath),
+        conn.getAdmin, table, 
conn.getRegionLocator(TableName.valueOf(tableName)))
+
+      val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
+      assert(cells5.size == 1)
+      assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
+
+      val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
+      assert(cells4.size == 2)
+      
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
+      
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
+
+      val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
+      assert(cells3.size == 3)
+      
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.a"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
+      
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("b"))
+      
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.c"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("c"))
+
+      val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
+      assert(cells2.size == 2)
+      
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
+      
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
+
+      val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
+      assert(cells1.size == 1)
+      assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
 
+    } finally {
+      table.close()
+      val admin = ConnectionFactory.createConnection(config).getAdmin
+      try {
+        admin.disableTable(TableName.valueOf(tableName))
+        admin.deleteTable(TableName.valueOf(tableName))
+      } finally {
+        admin.close()
+      }
+      fs.delete(new Path(stagingFolder.getPath), true)
+
+      testFolder.delete()
+    }
   }
 
+  test("Thin Row Bulk Load: Test multi family and multi column tests" +
+    " with one column family with custom configs plus multi region") {
+    val config = TEST_UTIL.getConfiguration
+
+    val splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](2)
+    splitKeys(0) = Bytes.toBytes("2")
+    splitKeys(1) = Bytes.toBytes("4")
+
+    logInfo(" - creating table " + tableName)
+    TEST_UTIL.createTable(TableName.valueOf(tableName),
+      Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)),
+      splitKeys)
+
+    //There are a number of tests in here.
+    // 1. Row keys are not in order
+    // 2. Qualifiers are not in order
+    // 3. Column Families are not in order
+    // 4. There are tests for records in one column family and some in two 
column families
+    // 5. There are records will a single qualifier and some with two
+    val rdd = sc.parallelize(Array(
+      ("1",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo1"))),
+      ("3",
+        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), 
Bytes.toBytes("foo2.a"))),
+      ("3",
+        (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.b"))),
+      ("3",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo2.c"))),
+      ("5",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo3"))),
+      ("4",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo.1"))),
+      ("4",
+        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), 
Bytes.toBytes("foo.2"))),
+      ("2",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("bar.1"))),
+      ("2",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("bar.2"))))).
+      groupByKey()
+
+    val hbaseContext = new HBaseContext(sc, config)
+
+    testFolder.create()
+    val stagingFolder = testFolder.newFolder()
+
+    val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], 
FamilyHFileWriteOptions]
+
+    val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128,
+      "PREFIX")
+
+    familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options)
+
+    hbaseContext.bulkLoadThinRows[(String, Iterable[(Array[Byte], Array[Byte], 
Array[Byte])])](rdd,
+      TableName.valueOf(tableName),
+      t => {
+        val rowKey = t._1
+
+        val familyQualifiersValues = new FamiliesQualifiersValues
+        t._2.foreach(f => {
+          val family:Array[Byte] = f._1
+          val qualifier = f._2
+          val value:Array[Byte] = f._3
+
+          familyQualifiersValues +=(family, qualifier, value)
+        })
+        (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
+      },
+      stagingFolder.getPath,
+      familyHBaseWriterOptions,
+      compactionExclude = false,
+      HConstants.DEFAULT_MAX_FILE_SIZE)
+
+    val fs = FileSystem.get(config)
+    assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
+
+    val f1FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f1"))
+    for ( i <- 0 until f1FileList.length) {
+      val reader = HFile.createReader(fs, f1FileList(i).getPath,
+        new CacheConfig(config), config)
+      assert(reader.getCompressionAlgorithm.getName.equals("gz"))
+      assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
+    }
+
+    assert( 3 ==  f1FileList.length)
+
+    val f2FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f2"))
+    for ( i <- 0 until f2FileList.length) {
+      val reader = HFile.createReader(fs, f2FileList(i).getPath,
+        new CacheConfig(config), config)
+      assert(reader.getCompressionAlgorithm.getName.equals("none"))
+      assert(reader.getDataBlockEncoding.name().equals("NONE"))
+    }
+
+    assert( 2 ==  f2FileList.length)
+
+
+    val conn = ConnectionFactory.createConnection(config)
+
+    val load = new LoadIncrementalHFiles(config)
+    val table = conn.getTable(TableName.valueOf(tableName))
+    try {
+      load.doBulkLoad(new Path(stagingFolder.getPath),
+        conn.getAdmin, table, 
conn.getRegionLocator(TableName.valueOf(tableName)))
+
+      val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
+      assert(cells5.size == 1)
+      assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
 
+      val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
+      assert(cells4.size == 2)
+      
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
+      
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
+
+      val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
+      assert(cells3.size == 3)
+      
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
+      
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
+      
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
+
+
+      val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
+      assert(cells2.size == 2)
+      
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
+      
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
+
+      val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
+      assert(cells1.size == 1)
+      assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
+      
assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
+
+    } finally {
+      table.close()
+      val admin = ConnectionFactory.createConnection(config).getAdmin
+      try {
+        admin.disableTable(TableName.valueOf(tableName))
+        admin.deleteTable(TableName.valueOf(tableName))
+      } finally {
+        admin.close()
+      }
+      fs.delete(new Path(stagingFolder.getPath), true)
+
+      testFolder.delete()
+
+    }
+  }
 }

Reply via email to