HBASE-17933: [hbase-spark] Support Java api for bulkload Signed-off-by: Sean Busbey <bus...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/49f707fb Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/49f707fb Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/49f707fb Branch: refs/heads/hbase-12439 Commit: 49f707fba7c6a9f0210f387e31d1be9f108991f8 Parents: 9a1aff4 Author: Yi Liang <easyliang...@gmail.com> Authored: Fri Apr 21 18:10:03 2017 -0700 Committer: Sean Busbey <bus...@apache.org> Committed: Mon Apr 24 11:48:29 2017 -0500 ---------------------------------------------------------------------- .../hbasecontext/JavaHBaseBulkLoadExample.java | 102 ++++++++++ .../hbase/spark/FamiliesQualifiersValues.scala | 12 +- .../hadoop/hbase/spark/JavaHBaseContext.scala | 68 ++++++- .../hbase/spark/TestJavaHBaseContext.java | 201 ++++++++++++++++++- 4 files changed, 371 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/49f707fb/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java new file mode 100644 index 0000000..040546d --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.spark.FamilyHFileWriteOptions; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.spark.KeyFamilyQualifier; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * Run this example using command below: + * + * SPARK_HOME/bin/spark-submit --master local[2] --class org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkLoadExample + * path/to/hbase-spark.jar {path/to/output/HFiles} + * + * This example will output put hfiles in {path/to/output/HFiles}, and user can run + * 'hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles' to load the HFiles into table to verify this example. + */ +final public class JavaHBaseBulkLoadExample { + private JavaHBaseBulkLoadExample() {} + + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("JavaHBaseBulkLoadExample " + "{outputPath}"); + return; + } + + String tableName = "bulkload-table-test"; + String columnFamily1 = "f1"; + String columnFamily2 = "f2"; + + SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkLoadExample " + tableName); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + List<String> list= new ArrayList<String>(); + // row1 + list.add("1," + columnFamily1 + ",b,1"); + // row3 + list.add("3," + columnFamily1 + ",a,2"); + list.add("3," + columnFamily1 + ",b,1"); + list.add("3," + columnFamily2 + ",a,1"); + /* row2 */ + list.add("2," + columnFamily2 + ",a,3"); + list.add("2," + columnFamily2 + ",b,3"); + + JavaRDD<String> rdd = jsc.parallelize(list); + + Configuration conf = HBaseConfiguration.create(); + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + + + hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName),new BulkLoadFunction(), args[0], + new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE); + } finally { + jsc.stop(); + } + } + + public static class BulkLoadFunction implements Function<String, Pair<KeyFamilyQualifier, byte[]>> { + + @Override + public Pair<KeyFamilyQualifier, byte[]> call(String v1) throws Exception { + if (v1 == null) + return null; + String[] strs = v1.split(","); + if(strs.length != 4) + return null; + KeyFamilyQualifier kfq = new KeyFamilyQualifier(Bytes.toBytes(strs[0]), Bytes.toBytes(strs[1]), + Bytes.toBytes(strs[2])); + return new Pair(kfq, Bytes.toBytes(strs[3])); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/49f707fb/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala index 92bb3b7..7733802 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala @@ -55,4 +55,14 @@ class FamiliesQualifiersValues extends Serializable { qualifierValues.put(new ByteArrayWrapper(qualifier), value) } -} + + /** + * A wrapper for "+=" method above, can be used by Java + * @param family HBase column family + * @param qualifier HBase column qualifier + * @param value HBase cell value + */ + def add(family: Array[Byte], qualifier: Array[Byte], value: Array[Byte]): Unit = { + this += (family, qualifier, value) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/49f707fb/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala index 253b386..57029f3 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala @@ -17,9 +17,12 @@ package org.apache.hadoop.hbase.spark +import java.util.Map + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.TableName -import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Pair +import org.apache.hadoop.hbase.classification.InterfaceAudience import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, Scan} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} @@ -268,7 +271,6 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext, * generates a new DStream based on Gets and the results * they bring back from HBase * - * @param tableName The name of the table to get from * @param batchSize The number of gets to be batched together * @param javaDStream Original DStream with data to iterate over @@ -292,6 +294,67 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext, } /** + * A simple abstraction over the HBaseContext.bulkLoad method. + * It allow addition support for a user to take a JavaRDD and + * convert into new JavaRDD[Pair] based on MapFunction, + * and HFiles will be generated in stagingDir for bulk load + * + * @param javaRdd The javaRDD we are bulk loading from + * @param tableName The HBase table we are loading into + * @param mapFunc A Function that will convert a value in JavaRDD + * to Pair(KeyFamilyQualifier, Array[Byte]) + * @param stagingDir The location on the FileSystem to bulk load into + * @param familyHFileWriteOptionsMap Options that will define how the HFile for a + * column family is written + * @param compactionExclude Compaction excluded for the HFiles + * @param maxSize Max size for the HFiles before they roll + */ + def bulkLoad[T](javaRdd: JavaRDD[T], + tableName: TableName, + mapFunc : Function[T, Pair[KeyFamilyQualifier, Array[Byte]]], + stagingDir: String, + familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions], + compactionExclude: Boolean, + maxSize: Long): + Unit = { + hbaseContext.bulkLoad[Pair[KeyFamilyQualifier, Array[Byte]]](javaRdd.map(mapFunc).rdd, tableName, t => { + val keyFamilyQualifier = t.getFirst + val value = t.getSecond + Seq((keyFamilyQualifier, value)).iterator + }, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize) + } + + /** + * A simple abstraction over the HBaseContext.bulkLoadThinRows method. + * It allow addition support for a user to take a JavaRDD and + * convert into new JavaRDD[Pair] based on MapFunction, + * and HFiles will be generated in stagingDir for bulk load + * + * @param javaRdd The javaRDD we are bulk loading from + * @param tableName The HBase table we are loading into + * @param mapFunc A Function that will convert a value in JavaRDD + * to Pair(ByteArrayWrapper, FamiliesQualifiersValues) + * @param stagingDir The location on the FileSystem to bulk load into + * @param familyHFileWriteOptionsMap Options that will define how the HFile for a + * column family is written + * @param compactionExclude Compaction excluded for the HFiles + * @param maxSize Max size for the HFiles before they roll + */ + def bulkLoadThinRows[T](javaRdd: JavaRDD[T], + tableName: TableName, + mapFunc : Function[T, Pair[ByteArrayWrapper, FamiliesQualifiersValues]], + stagingDir: String, + familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions], + compactionExclude: Boolean, + maxSize: Long): + Unit = { + hbaseContext.bulkLoadThinRows[Pair[ByteArrayWrapper, FamiliesQualifiersValues]](javaRdd.map(mapFunc).rdd, + tableName, t => { + (t.getFirst, t.getSecond) + }, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize) + } + + /** * This function will use the native HBase TableInputFormat with the * given scan object to generate a new JavaRDD * @@ -341,4 +404,5 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext, */ private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] + } http://git-wip-us.apache.org/repos/asf/hbase/blob/49f707fb/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java ---------------------------------------------------------------------- diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java index da6b724..c3f1bcb 100644 --- a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java +++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java @@ -19,16 +19,22 @@ package org.apache.hadoop.hbase.spark; import java.io.File; import java.io.IOException; import java.io.Serializable; - -import java.util.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.HashMap; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; @@ -38,17 +44,24 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.spark.api.java.*; + +import org.apache.hadoop.hbase.util.Pair; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; -import org.junit.*; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; import org.junit.experimental.categories.Category; import scala.Tuple2; - import com.google.common.io.Files; @Category({MiscTests.class, MediumTests.class}) @@ -58,19 +71,22 @@ public class TestJavaHBaseContext implements Serializable { protected static final Log LOG = LogFactory.getLog(TestJavaHBaseContext.class); + byte[] tableName = Bytes.toBytes("t1"); byte[] columnFamily = Bytes.toBytes("c"); + byte[] columnFamily1 = Bytes.toBytes("d"); String columnFamilyStr = Bytes.toString(columnFamily); + String columnFamilyStr1 = Bytes.toString(columnFamily1); + @Before public void setUp() { jsc = new JavaSparkContext("local", "JavaHBaseContextSuite"); - jsc.addJar("spark.jar"); File tempDir = Files.createTempDir(); tempDir.deleteOnExit(); - htu = HBaseTestingUtility.createLocalHTU(); + htu = new HBaseTestingUtility(); try { LOG.info("cleaning up test dir"); @@ -91,7 +107,7 @@ public class TestJavaHBaseContext implements Serializable { LOG.info(" - creating table " + Bytes.toString(tableName)); htu.createTable(TableName.valueOf(tableName), - columnFamily); + new byte[][]{columnFamily, columnFamily1}); LOG.info(" - created table"); } catch (Exception e1) { throw new RuntimeException(e1); @@ -278,6 +294,173 @@ public class TestJavaHBaseContext implements Serializable { Assert.assertEquals(stringJavaRDD.count(), 5); } + @Test + public void testBulkLoad() throws Exception { + + Path output = htu.getDataTestDir("testBulkLoad"); + // Add cell as String: "row,falmily,qualifier,value" + List<String> list= new ArrayList<String>(); + // row1 + list.add("1," + columnFamilyStr + ",b,1"); + // row3 + list.add("3," + columnFamilyStr + ",a,2"); + list.add("3," + columnFamilyStr + ",b,1"); + list.add("3," + columnFamilyStr1 + ",a,1"); + //row2 + list.add("2," + columnFamilyStr + ",a,3"); + list.add("2," + columnFamilyStr + ",b,3"); + + JavaRDD<String> rdd = jsc.parallelize(list); + + Configuration conf = htu.getConfiguration(); + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + + + hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName), new BulkLoadFunction(), output.toUri().getPath(), + new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE); + + try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) { + Table table = conn.getTable(TableName.valueOf(tableName)); + // Do bulk load + LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf); + load.doBulkLoad(output, admin, table, conn.getRegionLocator(TableName.valueOf(tableName))); + + + + // Check row1 + List<Cell> cell1 = table.get(new Get(Bytes.toBytes("1"))).listCells(); + Assert.assertEquals(cell1.size(), 1); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell1.get(0))), columnFamilyStr); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell1.get(0))), "b"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell1.get(0))), "1"); + + // Check row3 + List<Cell> cell3 = table.get(new Get(Bytes.toBytes("3"))).listCells(); + Assert.assertEquals(cell3.size(), 3); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(0))), columnFamilyStr); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(0))), "a"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(0))), "2"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(1))), columnFamilyStr); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(1))), "b"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(1))), "1"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(2))), columnFamilyStr1); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(2))), "a"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(2))), "1"); + + // Check row2 + List<Cell> cell2 = table.get(new Get(Bytes.toBytes("2"))).listCells(); + Assert.assertEquals(cell2.size(), 2); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(0))), columnFamilyStr); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(0))), "a"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(0))), "3"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(1))), columnFamilyStr); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(1))), "b"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(1))), "3"); + } + } + + @Test + public void testBulkLoadThinRows() throws Exception { + Path output = htu.getDataTestDir("testBulkLoadThinRows"); + // because of the limitation of scala bulkLoadThinRows API + // we need to provide data as <row, all cells in that row> + List<List<String>> list= new ArrayList<List<String>>(); + // row1 + List<String> list1 = new ArrayList<String>(); + list1.add("1," + columnFamilyStr + ",b,1"); + list.add(list1); + // row3 + List<String> list3 = new ArrayList<String>(); + list3.add("3," + columnFamilyStr + ",a,2"); + list3.add("3," + columnFamilyStr + ",b,1"); + list3.add("3," + columnFamilyStr1 + ",a,1"); + list.add(list3); + //row2 + List<String> list2 = new ArrayList<String>(); + list2.add("2," + columnFamilyStr + ",a,3"); + list2.add("2," + columnFamilyStr + ",b,3"); + list.add(list2); + + JavaRDD<List<String>> rdd = jsc.parallelize(list); + + Configuration conf = htu.getConfiguration(); + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.bulkLoadThinRows(rdd, TableName.valueOf(tableName), new BulkLoadThinRowsFunction(), output.toString(), + new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE); + + + try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) { + Table table = conn.getTable(TableName.valueOf(tableName)); + // Do bulk load + LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf); + load.doBulkLoad(output, admin, table, conn.getRegionLocator(TableName.valueOf(tableName))); + + // Check row1 + List<Cell> cell1 = table.get(new Get(Bytes.toBytes("1"))).listCells(); + Assert.assertEquals(cell1.size(), 1); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell1.get(0))), columnFamilyStr); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell1.get(0))), "b"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell1.get(0))), "1"); + + // Check row3 + List<Cell> cell3 = table.get(new Get(Bytes.toBytes("3"))).listCells(); + Assert.assertEquals(cell3.size(), 3); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(0))), columnFamilyStr); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(0))), "a"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(0))), "2"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(1))), columnFamilyStr); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(1))), "b"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(1))), "1"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(2))), columnFamilyStr1); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(2))), "a"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(2))), "1"); + + // Check row2 + List<Cell> cell2 = table.get(new Get(Bytes.toBytes("2"))).listCells(); + Assert.assertEquals(cell2.size(), 2); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(0))), columnFamilyStr); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(0))), "a"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(0))), "3"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(1))), columnFamilyStr); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(1))), "b"); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(1))), "3"); + } + + } + public static class BulkLoadFunction implements Function<String, Pair<KeyFamilyQualifier, byte[]>> { + + @Override public Pair<KeyFamilyQualifier, byte[]> call(String v1) throws Exception { + if (v1 == null) + return null; + String[] strs = v1.split(","); + if(strs.length != 4) + return null; + KeyFamilyQualifier kfq = new KeyFamilyQualifier(Bytes.toBytes(strs[0]), Bytes.toBytes(strs[1]), + Bytes.toBytes(strs[2])); + return new Pair(kfq, Bytes.toBytes(strs[3])); + } + } + + public static class BulkLoadThinRowsFunction implements Function<List<String>, Pair<ByteArrayWrapper, FamiliesQualifiersValues>> { + + @Override public Pair<ByteArrayWrapper, FamiliesQualifiersValues> call(List<String> list) throws Exception { + if (list == null) + return null; + ByteArrayWrapper rowKey = null; + FamiliesQualifiersValues fqv = new FamiliesQualifiersValues(); + for (String cell : list) { + String[] strs = cell.split(","); + if (rowKey == null) { + rowKey = new ByteArrayWrapper(Bytes.toBytes(strs[0])); + } + fqv.add(Bytes.toBytes(strs[1]), Bytes.toBytes(strs[2]), Bytes.toBytes(strs[3])); + } + return new Pair(rowKey, fqv); + } + } + public static class GetFunction implements Function<byte[], Get> { private static final long serialVersionUID = 1L; @@ -335,4 +518,4 @@ public class TestJavaHBaseContext implements Serializable { } } -} \ No newline at end of file +}