javeme commented on code in PR #358:
URL: 
https://github.com/apache/incubator-hugegraph-toolchain/pull/358#discussion_r1017893990


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import org.apache.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable, 
KeyValue> {
+
+    private SinkToHBase sinkToHBase;
+    private LoadDistributeMetrics loadDistributeMetrics;
+
+    public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+    public HBaseDirectLoader(LoadOptions loadOptions,
+                             InputStruct struct,
+                             LoadDistributeMetrics loadDistributeMetrics) {
+        super(loadOptions,struct);
+        this.loadDistributeMetrics = loadDistributeMetrics;
+        this.sinkToHBase = new SinkToHBase(loadOptions);
+
+    }
+
+    public String getTableName() {
+
+        String tableName = null;
+        if (struct.edges().size() > 0) {
+            tableName = this.loadOptions.edgeTablename;
+
+        } else if (struct.vertices().size() > 0) {
+            tableName = this.loadOptions.vertexTablename;
+
+        }
+        return tableName;
+    }
+
+    public Integer getTablePartitions () {
+        return struct.edges().size() > 0 ?
+               loadOptions.edgePartitions :
+               loadOptions.vertexPartitions;
+    }
+
+    public JavaPairRDD<ImmutableBytesWritable, KeyValue> 
buildVertexAndEdge(Dataset<Row> ds) {
+        LOG.info("Start build vertexs and edges");
+        JavaPairRDD<ImmutableBytesWritable, KeyValue> 
tuple2KeyValueJavaPairRDD =
+                ds.toJavaRDD().mapPartitionsToPair(
+                new PairFlatMapFunction<Iterator<Row>, ImmutableBytesWritable, 
KeyValue>() {
+                    @Override
+                    public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> 
call(
+                           Iterator<Row> rowIterator) throws Exception {
+
+                            HBaseSerializer serializer = new HBaseSerializer(
+                                    HugeClientHolder.create(loadOptions),
+                                    
loadOptions.vertexPartitions,loadOptions.edgePartitions);
+                            List<ElementBuilder> buildersForGraphElement = 
getElementBuilders();
+                            List<Tuple2<ImmutableBytesWritable,  KeyValue>> 
result =
+                                    new LinkedList<>();
+                            while (rowIterator.hasNext()) {
+                                Row row = rowIterator.next();
+                                List<Tuple2<ImmutableBytesWritable, KeyValue>> 
serList =
+                                        buildAndSer(serializer, 
row,buildersForGraphElement);
+                                result.addAll(serList);
+                            }
+                                serializer.close();
+                                return result.iterator();
+                    }
+                }
+        );
+        return tuple2KeyValueJavaPairRDD;
+    }
+
+    @Override
+    String generateFiles(JavaPairRDD<ImmutableBytesWritable, KeyValue> 
buildAndSerRdd) {
+        LOG.info("Start to generate hfile");
+        try {
+            Tuple2<SinkToHBase.IntPartitioner, TableDescriptor> tuple =
+                    
sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+            Partitioner partitioner = (Partitioner) tuple._1;
+            TableDescriptor tableDescriptor = (TableDescriptor) tuple._2;
+
+            JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRdd =
+                    
buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+            Configuration conf = sinkToHBase.getHBaseConfiguration().get();
+            Job job = Job.getInstance(conf);
+            HFileOutputFormat2.configureIncrementalLoadMap(job, 
tableDescriptor);
+            conf.set("hbase.mapreduce.hfileoutputformat.table.name",
+                    tableDescriptor.getTableName().getNameAsString());
+            String path = getHFilePath(job.getConfiguration());
+            repartitionedRdd.saveAsNewAPIHadoopFile(
+                    path,
+                    ImmutableBytesWritable.class,
+                    KeyValue.class,
+                    HFileOutputFormat2.class,
+                    conf
+            );
+            LOG.info("Saved HFiles to: '{}'", path);
+            flushPermission(conf,path);
+            return path;
+        } catch (IOException e) {
+            LOG.error("Failed to generate files", e);
+        }
+        return Constants.EMPTY_STR;
+    }
+
+    public String   getHFilePath (Configuration conf) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        long timeStr = System.currentTimeMillis();
+        String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" + 
"/" + timeStr + "/";
+        Path hfileGenPath = new Path(pathStr);
+        if (fs.exists(hfileGenPath)) {
+            LOG.info("\n Delete the path where the hfile is generated,path {} 
", pathStr);
+            fs.delete(hfileGenPath,true);
+        }
+        return pathStr;
+    }
+
+    @Override
+    public void loadFiles(String path) {
+        try {
+            sinkToHBase.loadHfiles(path, getTableName());// BulkLoad HFile to 
HBase
+        } catch (Exception e) {
+            LOG.error(" Failed to load hfiles", e);
+        }
+    }
+
+    private void flushPermission (Configuration conf, String path) {
+        FsShell shell = new FsShell(conf);
+        try {
+            LOG.info("Chmod hfile directory permission");
+            shell.run(new String[]{"-chmod", "-R", "777", path});
+            shell.close();
+        } catch (Exception e) {
+            LOG.error("Couldnt change the file permissions " + e +
+                    " Please run command:" +
+                    "hbase 
org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles " + path +
+                    " '" + "test" + "'\n" + " to load generated HFiles into 
HBase table");
+        }
+    }
+
+    List<Tuple2<ImmutableBytesWritable,  KeyValue>> buildAndSer 
(HBaseSerializer serializer,
+                                                                 Row row,
+                                                                 
List<ElementBuilder> builders) {
+        List<GraphElement> elementsElement;
+
+        List<Tuple2<ImmutableBytesWritable,  KeyValue>> result = new 
LinkedList<>();
+
+        for (ElementBuilder builder : builders) {
+            ElementMapping elementMapping = builder.mapping();
+            if (elementMapping.skip()) {
+                continue;
+            }
+            if ("".equals(row.mkString())) {
+                break;
+            }
+            switch (struct.input().type()) {
+                case FILE:
+                case HDFS:
+                    elementsElement = builder.build(row);
+                    break;
+                default:
+                    throw new AssertionError(String.format(
+                            "Unsupported input source '%s'",
+                            struct.input().type()));
+            }
+
+            boolean isVertex = builder.mapping().type().isVertex();
+            if (isVertex) {
+                for (Vertex vertex : (List<Vertex>) (Object) elementsElement) {
+                    LOG.debug("vertex already build done {} ", 
vertex.toString());
+                    Tuple2<ImmutableBytesWritable, KeyValue> tuple2 =
+                            vertexSerialize(serializer,vertex);
+                    
loadDistributeMetrics.increaseDisVertexInsertSuccess(builder.mapping());
+                    result.add(tuple2);
+                }
+            } else {
+                for (Edge edge : (List<Edge>) (Object) elementsElement) {
+                    LOG.debug("edge already build done {}", edge.toString());
+                    Tuple2<ImmutableBytesWritable, KeyValue> tuple2 =
+                            edgeSerialize(serializer,edge);
+                    
loadDistributeMetrics.increaseDisEdgeInsertSuccess(builder.mapping());
+                    result.add(tuple2);
+
+                }
+            }
+        }
+        return result;
+    }
+
+    private Tuple2<ImmutableBytesWritable, KeyValue> edgeSerialize 
(HBaseSerializer serializer,
+                                                                    Edge edge) 
{
+        LOG.debug("edge start serialize {}", edge.toString());
+        byte[] rowkey = serializer.getKeyBytes(edge);
+        byte[] values = serializer.getValueBytes(edge);
+        ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
+        rowKey.set(rowkey);
+        KeyValue keyValue = new KeyValue(rowkey,
+                Bytes.toBytes(Constants.HBASE_COL_FAMILY),
+                Bytes.toBytes(Constants.EMPTY_STR),
+                values);
+        return new Tuple2<>(rowKey,keyValue);
+    }
+
+    private Tuple2<ImmutableBytesWritable, KeyValue> vertexSerialize 
(HBaseSerializer serializer,
+                                                                      Vertex 
vertex) {
+        LOG.debug("vertex start serialize {}", vertex.toString());
+        byte[] rowkey = serializer.getKeyBytes(vertex);
+        byte[] values = serializer.getValueBytes(vertex);
+        ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
+        rowKey.set(rowkey);
+        KeyValue keyValue = new KeyValue(rowkey,
+                Bytes.toBytes(Constants.HBASE_COL_FAMILY),
+                Bytes.toBytes(Constants.EMPTY_STR),
+                values);
+        return new Tuple2<>(rowKey,keyValue);
+    }
+

Review Comment:
   remove this blank line



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.util;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hugegraph.util.Log;
+import org.apache.spark.Partitioner;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+
+import scala.Tuple2;
+
+public class SinkToHBase implements Serializable {
+
+    private LoadOptions loadOptions;
+    public static final Logger LOG = Log.logger(SinkToHBase.class);
+
+    public SinkToHBase(LoadOptions loadOptions) {
+        this.loadOptions = loadOptions;
+    }
+
+    public Optional<Configuration> getHBaseConfiguration() {
+        Configuration baseConf = HBaseConfiguration.create();
+        baseConf.set("hbase.zookeeper.quorum", this.loadOptions.hbaseZKQuorum);
+        baseConf.set("hbase.zookeeper.property.clientPort", 
this.loadOptions.hbaseZKPort);
+        baseConf.set("zookeeper.znode.parent", this.loadOptions.hbaseZKParent);
+        return Optional.ofNullable(baseConf);
+    }
+
+    private Optional<Connection> getConnection() {
+        Optional<Configuration> baseConf = getHBaseConfiguration();
+        Connection conn = null;
+        try {
+            conn = ConnectionFactory.createConnection(baseConf.get());
+        } catch (IOException e) {
+            LOG.error("get hbase connection failed",e);
+        }
+        return Optional.ofNullable(conn);
+    }
+
+    public Tuple2<IntPartitioner, TableDescriptor>
+            getPartitionerByTableName (int numPartitions, String tableName) 
throws IOException {
+        Optional<Connection> optionalConnection = getConnection();
+        TableDescriptor descriptor = optionalConnection
+                .get()
+                .getTable(TableName.valueOf(tableName))
+                .getDescriptor();
+        LOG.debug("getPartitionerByTableName get TableDescriptor " +
+                descriptor.getTableName());
+        optionalConnection.get().close();
+        return new Tuple2<IntPartitioner,TableDescriptor>(
+                new IntPartitioner(numPartitions, tableName),descriptor);
+    }
+
+    public void loadHfiles (String path, String tableName) throws Exception {
+        Connection conn = getConnection().get();
+        Table table = conn.getTable(TableName.valueOf(tableName));
+        Configuration conf = conn.getConfiguration();
+        BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(conf);
+        bulkLoadHFilesTool.bulkLoad(table.getName(), new Path(path));
+        table.close();
+        conn.close();
+    }
+
+    public class IntPartitioner extends Partitioner {
+
+        private final int numPartitions;
+        public Map<List<String>, Integer> rangeMap = new HashMap<>();
+        private String tableName;
+
+        public IntPartitioner(int numPartitions, String tableName) throws 
IOException {
+            this.numPartitions = numPartitions;
+            this.rangeMap = getRangeMap(tableName);
+            this.tableName = tableName;
+        }
+
+        private Map<List<String>, Integer> getRangeMap(String tableName) 
throws IOException {
+            Connection conn = getConnection().get();
+            HRegionLocator locator =
+                    (HRegionLocator) 
conn.getRegionLocator(TableName.valueOf(tableName));
+            Pair<byte[][], byte[][]> startEndKeys = locator.getStartEndKeys();
+            Map<List<String>, Integer> rangeMap = new HashMap<>();
+            for (int i = 0; i < startEndKeys.getFirst().length; i++) {
+                String startKey = Bytes.toString(startEndKeys.getFirst()[i]);
+                String endKey = Bytes.toString(startEndKeys.getSecond()[i]);
+                rangeMap.put(new ArrayList<>(Arrays.asList(startKey, endKey)), 
i);
+            }
+            conn.close();
+            return rangeMap;
+        }
+
+        @Override
+        public int numPartitions() {
+            return numPartitions;
+        }
+
+        @Override
+        public int getPartition(Object key) {
+            if (key instanceof ImmutableBytesWritable) {
+
+                try {
+                    ImmutableBytesWritable immutableBytesWritableKey = 
(ImmutableBytesWritable) key;
+
+                    if (rangeMap == null || rangeMap.isEmpty()) {
+                        rangeMap = getRangeMap(this.tableName);
+                    }
+
+                    String keyString = 
Bytes.toString(immutableBytesWritableKey.get());
+                    for (List<String> range : rangeMap.keySet()) {
+                        if (keyString.compareToIgnoreCase(range.get(0)) >= 0 &&
+                           ((keyString.compareToIgnoreCase(range.get(1)) < 0) 
||
+                           range.get(1).equals(""))) {
+                            return rangeMap.get(range);
+                        }
+                    }
+                    LOG.error("Didn't find proper key in rangeMap, so 
returning 0 ...");
+                    return 0;
+                } catch (Exception e) {
+                    LOG.error("When trying to get partitionID, encountered 
exception: " + e +

Review Comment:
   format log message with "{}"



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/LoadMapping.java:
##########
@@ -81,6 +87,14 @@ public LoadMapping(@JsonProperty("structs") 
List<InputStruct> structs) {
         this.structs = structs;
     }
 
+    @JsonCreator
+    public LoadMapping(@JsonProperty("structs") List<InputStruct> structs,
+                          @JsonProperty("backendStoreInfo") BackendStoreInfo 
backendStoreInfo) {

Review Comment:
   align with `@JsonProperty("structs")`



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -263,6 +292,15 @@ public static LoadOptions parseOptions(String[] args) {
         return options;
     }
 
+    public void copyBackendStoreInfo (BackendStoreInfo backendStoreInfo) {
+        E.checkArgument(null != backendStoreInfo,"the backendStoreInfo is null 
");

Review Comment:
   "the backendStoreInfo is null " => "The backendStoreInfo can't be null"



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -89,29 +94,95 @@ public HugeGraphSparkLoader(String[] args) {
     public void load() {
         LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
         List<InputStruct> structs = mapping.structs();
+        boolean sinkType = this.loadOptions.sinkType;
+        if(!sinkType){
+            
this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
+        }
+        SparkConf conf = new SparkConf()
+                .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")// kryo序列化
+                .set("spark.kryo.registrationRequired", "true");
+        try {
+            conf.registerKryoClasses(
+                    new Class[]
+                    {
+                        
org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
+                        org.apache.hadoop.hbase.KeyValue.class,
+                        org.apache.spark.sql.types.StructType.class,
+                        StructField[].class,
+                        StructField.class,
+                        org.apache.spark.sql.types.LongType$.class,
+                        org.apache.spark.sql.types.Metadata.class,
+                        org.apache.spark.sql.types.StringType$.class,
+                Class.forName(
+                    
"org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
+                Class.forName("scala.reflect.ClassTag$$anon$1"),
+                Class.forName("scala.collection.immutable.Set$EmptySet$"),
+                Class.forName("org.apache.spark.sql.types.DoubleType$")
+                    });
+        } catch (ClassNotFoundException e) {
+            LOG.error("spark kryo serialized registration failed");
+        }
+        SparkSession session = SparkSession.builder()
+                                           .config(conf)
+                                           .getOrCreate();
+        SparkContext sc = session.sparkContext();
 
-        SparkSession session = SparkSession.builder().getOrCreate();
+        LongAccumulator totalInsertSuccess = 
sc.longAccumulator("totalInsertSuccess");
         for (InputStruct struct : structs) {
+            LOG.info("\n Initializes the accumulator corresponding to the  {} 
",
+                    struct.input().asFileSource().path());
+            LoadDistributeMetrics loadDistributeMetrics = new 
LoadDistributeMetrics(struct);
+            loadDistributeMetrics.init(sc);
+            LOG.info("\n   Start to load data, data info is: \t {} ",
+                    struct.input().asFileSource().path());

Review Comment:
   align with `"`



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -89,29 +94,95 @@ public HugeGraphSparkLoader(String[] args) {
     public void load() {
         LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
         List<InputStruct> structs = mapping.structs();
+        boolean sinkType = this.loadOptions.sinkType;
+        if(!sinkType){
+            
this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
+        }
+        SparkConf conf = new SparkConf()
+                .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")// kryo序列化
+                .set("spark.kryo.registrationRequired", "true");
+        try {
+            conf.registerKryoClasses(
+                    new Class[]
+                    {
+                        
org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
+                        org.apache.hadoop.hbase.KeyValue.class,
+                        org.apache.spark.sql.types.StructType.class,
+                        StructField[].class,
+                        StructField.class,
+                        org.apache.spark.sql.types.LongType$.class,
+                        org.apache.spark.sql.types.Metadata.class,
+                        org.apache.spark.sql.types.StringType$.class,
+                Class.forName(
+                    
"org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
+                Class.forName("scala.reflect.ClassTag$$anon$1"),
+                Class.forName("scala.collection.immutable.Set$EmptySet$"),
+                Class.forName("org.apache.spark.sql.types.DoubleType$")
+                    });
+        } catch (ClassNotFoundException e) {
+            LOG.error("spark kryo serialized registration failed");
+        }
+        SparkSession session = SparkSession.builder()
+                                           .config(conf)
+                                           .getOrCreate();
+        SparkContext sc = session.sparkContext();
 
-        SparkSession session = SparkSession.builder().getOrCreate();
+        LongAccumulator totalInsertSuccess = 
sc.longAccumulator("totalInsertSuccess");
         for (InputStruct struct : structs) {
+            LOG.info("\n Initializes the accumulator corresponding to the  {} 
",
+                    struct.input().asFileSource().path());

Review Comment:
   align with `"`



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -89,29 +94,95 @@ public HugeGraphSparkLoader(String[] args) {
     public void load() {
         LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
         List<InputStruct> structs = mapping.structs();
+        boolean sinkType = this.loadOptions.sinkType;
+        if(!sinkType){

Review Comment:
   add a space to "){"



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.util;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hugegraph.util.Log;
+import org.apache.spark.Partitioner;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+
+import scala.Tuple2;
+
+public class SinkToHBase implements Serializable {
+
+    private LoadOptions loadOptions;
+    public static final Logger LOG = Log.logger(SinkToHBase.class);
+
+    public SinkToHBase(LoadOptions loadOptions) {
+        this.loadOptions = loadOptions;
+    }
+
+    public Optional<Configuration> getHBaseConfiguration() {
+        Configuration baseConf = HBaseConfiguration.create();
+        baseConf.set("hbase.zookeeper.quorum", this.loadOptions.hbaseZKQuorum);
+        baseConf.set("hbase.zookeeper.property.clientPort", 
this.loadOptions.hbaseZKPort);
+        baseConf.set("zookeeper.znode.parent", this.loadOptions.hbaseZKParent);
+        return Optional.ofNullable(baseConf);
+    }
+
+    private Optional<Connection> getConnection() {
+        Optional<Configuration> baseConf = getHBaseConfiguration();
+        Connection conn = null;
+        try {
+            conn = ConnectionFactory.createConnection(baseConf.get());
+        } catch (IOException e) {
+            LOG.error("get hbase connection failed",e);
+        }
+        return Optional.ofNullable(conn);
+    }
+
+    public Tuple2<IntPartitioner, TableDescriptor>
+            getPartitionerByTableName (int numPartitions, String tableName) 
throws IOException {
+        Optional<Connection> optionalConnection = getConnection();
+        TableDescriptor descriptor = optionalConnection
+                .get()
+                .getTable(TableName.valueOf(tableName))
+                .getDescriptor();
+        LOG.debug("getPartitionerByTableName get TableDescriptor " +
+                descriptor.getTableName());
+        optionalConnection.get().close();
+        return new Tuple2<IntPartitioner,TableDescriptor>(
+                new IntPartitioner(numPartitions, tableName),descriptor);
+    }
+
+    public void loadHfiles (String path, String tableName) throws Exception {
+        Connection conn = getConnection().get();
+        Table table = conn.getTable(TableName.valueOf(tableName));
+        Configuration conf = conn.getConfiguration();
+        BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(conf);
+        bulkLoadHFilesTool.bulkLoad(table.getName(), new Path(path));
+        table.close();
+        conn.close();
+    }
+
+    public class IntPartitioner extends Partitioner {
+
+        private final int numPartitions;
+        public Map<List<String>, Integer> rangeMap = new HashMap<>();
+        private String tableName;
+
+        public IntPartitioner(int numPartitions, String tableName) throws 
IOException {
+            this.numPartitions = numPartitions;
+            this.rangeMap = getRangeMap(tableName);
+            this.tableName = tableName;
+        }
+
+        private Map<List<String>, Integer> getRangeMap(String tableName) 
throws IOException {
+            Connection conn = getConnection().get();
+            HRegionLocator locator =
+                    (HRegionLocator) 
conn.getRegionLocator(TableName.valueOf(tableName));
+            Pair<byte[][], byte[][]> startEndKeys = locator.getStartEndKeys();
+            Map<List<String>, Integer> rangeMap = new HashMap<>();
+            for (int i = 0; i < startEndKeys.getFirst().length; i++) {
+                String startKey = Bytes.toString(startEndKeys.getFirst()[i]);
+                String endKey = Bytes.toString(startEndKeys.getSecond()[i]);
+                rangeMap.put(new ArrayList<>(Arrays.asList(startKey, endKey)), 
i);
+            }
+            conn.close();
+            return rangeMap;
+        }
+
+        @Override
+        public int numPartitions() {
+            return numPartitions;
+        }
+
+        @Override
+        public int getPartition(Object key) {
+            if (key instanceof ImmutableBytesWritable) {
+

Review Comment:
   ditto



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -263,6 +292,15 @@ public static LoadOptions parseOptions(String[] args) {
         return options;
     }
 
+    public void copyBackendStoreInfo (BackendStoreInfo backendStoreInfo) {
+        E.checkArgument(null != backendStoreInfo,"the backendStoreInfo is null 
");

Review Comment:
   add a space after `,"`



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.util;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hugegraph.util.Log;
+import org.apache.spark.Partitioner;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+
+import scala.Tuple2;
+
+public class SinkToHBase implements Serializable {
+
+    private LoadOptions loadOptions;
+    public static final Logger LOG = Log.logger(SinkToHBase.class);
+
+    public SinkToHBase(LoadOptions loadOptions) {
+        this.loadOptions = loadOptions;
+    }
+
+    public Optional<Configuration> getHBaseConfiguration() {
+        Configuration baseConf = HBaseConfiguration.create();
+        baseConf.set("hbase.zookeeper.quorum", this.loadOptions.hbaseZKQuorum);
+        baseConf.set("hbase.zookeeper.property.clientPort", 
this.loadOptions.hbaseZKPort);
+        baseConf.set("zookeeper.znode.parent", this.loadOptions.hbaseZKParent);
+        return Optional.ofNullable(baseConf);
+    }
+
+    private Optional<Connection> getConnection() {
+        Optional<Configuration> baseConf = getHBaseConfiguration();
+        Connection conn = null;
+        try {
+            conn = ConnectionFactory.createConnection(baseConf.get());
+        } catch (IOException e) {
+            LOG.error("get hbase connection failed",e);
+        }
+        return Optional.ofNullable(conn);
+    }
+
+    public Tuple2<IntPartitioner, TableDescriptor>
+            getPartitionerByTableName (int numPartitions, String tableName) 
throws IOException {
+        Optional<Connection> optionalConnection = getConnection();
+        TableDescriptor descriptor = optionalConnection
+                .get()
+                .getTable(TableName.valueOf(tableName))
+                .getDescriptor();
+        LOG.debug("getPartitionerByTableName get TableDescriptor " +
+                descriptor.getTableName());
+        optionalConnection.get().close();
+        return new Tuple2<IntPartitioner,TableDescriptor>(
+                new IntPartitioner(numPartitions, tableName),descriptor);
+    }
+
+    public void loadHfiles (String path, String tableName) throws Exception {

Review Comment:
   remove the space in "loadHfiles ("



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -89,29 +94,95 @@ public HugeGraphSparkLoader(String[] args) {
     public void load() {
         LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
         List<InputStruct> structs = mapping.structs();
+        boolean sinkType = this.loadOptions.sinkType;
+        if(!sinkType){
+            
this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
+        }
+        SparkConf conf = new SparkConf()
+                .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")// kryo序列化
+                .set("spark.kryo.registrationRequired", "true");
+        try {
+            conf.registerKryoClasses(

Review Comment:
   prefer the following style
   ```java
   conf.registerKryoClasses(new Class[] {
           org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
           org.apache.hadoop.hbase.KeyValue.class,
           org.apache.spark.sql.types.StructType.class,
           StructField[].class,
           StructField.class,
           org.apache.spark.sql.types.LongType$.class,
           org.apache.spark.sql.types.Metadata.class,
           org.apache.spark.sql.types.StringType$.class,
           
Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
           Class.forName("scala.reflect.ClassTag$$anon$1"),
           Class.forName("scala.collection.immutable.Set$EmptySet$"),
           Class.forName("org.apache.spark.sql.types.DoubleType$")
       });
   ```



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/util/MappingUtil.java:
##########
@@ -129,7 +129,7 @@ private static LoadMapping parseV1(String json) {
             inputStruct.id(String.valueOf(++id));
             inputStructs.add(inputStruct);
         }
-        return new LoadMapping(inputStructs);
+        return new LoadMapping(inputStructs,graphStruct.getBackendStoreInfo());

Review Comment:
   expect a space after a comma ", "



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -89,29 +94,95 @@ public HugeGraphSparkLoader(String[] args) {
     public void load() {
         LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
         List<InputStruct> structs = mapping.structs();
+        boolean sinkType = this.loadOptions.sinkType;
+        if(!sinkType){
+            
this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
+        }
+        SparkConf conf = new SparkConf()
+                .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")// kryo序列化
+                .set("spark.kryo.registrationRequired", "true");
+        try {
+            conf.registerKryoClasses(
+                    new Class[]
+                    {
+                        
org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
+                        org.apache.hadoop.hbase.KeyValue.class,
+                        org.apache.spark.sql.types.StructType.class,
+                        StructField[].class,
+                        StructField.class,
+                        org.apache.spark.sql.types.LongType$.class,
+                        org.apache.spark.sql.types.Metadata.class,
+                        org.apache.spark.sql.types.StringType$.class,
+                Class.forName(
+                    
"org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
+                Class.forName("scala.reflect.ClassTag$$anon$1"),
+                Class.forName("scala.collection.immutable.Set$EmptySet$"),
+                Class.forName("org.apache.spark.sql.types.DoubleType$")
+                    });
+        } catch (ClassNotFoundException e) {
+            LOG.error("spark kryo serialized registration failed");
+        }
+        SparkSession session = SparkSession.builder()
+                                           .config(conf)
+                                           .getOrCreate();
+        SparkContext sc = session.sparkContext();
 
-        SparkSession session = SparkSession.builder().getOrCreate();
+        LongAccumulator totalInsertSuccess = 
sc.longAccumulator("totalInsertSuccess");
         for (InputStruct struct : structs) {
+            LOG.info("\n Initializes the accumulator corresponding to the  {} 
",
+                    struct.input().asFileSource().path());
+            LoadDistributeMetrics loadDistributeMetrics = new 
LoadDistributeMetrics(struct);
+            loadDistributeMetrics.init(sc);
+            LOG.info("\n   Start to load data, data info is: \t {} ",
+                    struct.input().asFileSource().path());
             Dataset<Row> ds = read(session, struct);
-            ds.foreachPartition((Iterator<Row> p) -> {
-                LoadContext context = initPartition(this.loadOptions, struct);
-                p.forEachRemaining((Row row) -> {
-                    loadRow(struct, row, p, context);
+            if (sinkType) {
+                LOG.info("\n  Start to load data using spark apis  \n");
+                ds.foreachPartition((Iterator<Row> p) -> {
+                    LoadContext context = initPartition(this.loadOptions, 
struct);
+                    p.forEachRemaining((Row row) -> {
+                        loadRow(struct, row, p, context);
+                    });
+                    context.close();
                 });
-                context.close();
-            });
+
+            } else {
+                LOG.info("\n Start to load data using spark bulkload     \n");
+                // gen-hfile
+                HBaseDirectLoader directLoader = new 
HBaseDirectLoader(loadOptions,
+                        struct,loadDistributeMetrics);
+                directLoader.bulkload(ds);
+
+            }
+            collectLoadMetrics(loadDistributeMetrics,totalInsertSuccess);
+            LOG.info("    \n   Finished  load {}  data ",
+                    struct.input().asFileSource().path());
         }
+        Long totalInsertSuccessCnt = totalInsertSuccess.value();
+        LOG.info("\n ------------The data load task is 
complete-------------------\n" +
+                "\n  insertSuccesscnt:\t {}" +

Review Comment:
   align with `"`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to