javeme commented on code in PR #358:
URL: 
https://github.com/apache/incubator-hugegraph-toolchain/pull/358#discussion_r1016725314


##########
hugegraph-loader/assembly/static/example/spark/vertex_person.json:
##########
@@ -3,5 +3,4 @@
 {"name": "josh", "age": "32",  "city": "Beijing"}
 {"name": "peter", "age": "35",  "city": "Shanghai"}
 {"name": "li,nary", "age": "26",  "city": "Wu,han"}
-{"name": "tom", "age": "null",  "city": "NULL"}
-
+{"name": "tom", "age": "null",  "city": "NULL"}

Review Comment:
   did you delete 2 blank lines?



##########
hugegraph-loader/assembly/static/example/spark/vertex_software.json:
##########
@@ -1,3 +1,2 @@
 { "name": "lop",  "lang": "java","price": "328","ISBN": 
"ISBN978-7-107-18618-5"}
-{ "name": "ripple",  "lang": "java","price": "199","ISBN": 
"ISBN978-7-100-13678-5"}
-
+{ "name": "ripple",  "lang": "java","price": "199","ISBN": 
"ISBN978-7-100-13678-5"}

Review Comment:
   ditto



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -171,7 +171,7 @@ public void loadFiles(String path) {
         try {
             sinkToHBase.loadHfiles(path, getTableName());// BulkLoad HFile to 
HBase
         } catch (Exception e) {
-            e.printStackTrace();
+            LOG.error("hfile failed to be loaded",e);

Review Comment:
   ditto



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/constant/Constants.java:
##########
@@ -83,4 +83,6 @@ public final class Constants {
     public static final String LOAD_DATA_PARSE_SUFFIX = "parse";
     public static final String LOAD_DATA_SER_SUFFIX = "ser";
     public static final String LOAD_DATA_INSERT_SUFFIX = "insert";
+    public static final String BYPASS_HFILE_PATH_ = "insert";
+    ///hfile-gen

Review Comment:
   remove unused code



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -143,11 +143,11 @@ String generateFiles(JavaPairRDD<ImmutableBytesWritable, 
KeyValue> buildAndSerRd
                     HFileOutputFormat2.class,
                     conf
             );
-            LOG.info("Saved to HFiles to: " + path);
+            LOG.info("Saved HFiles to: '{}'", path);
             flushPermission(conf,path);
             return path;
         } catch (IOException e) {
-            e.printStackTrace();
+            LOG.error("generateFiles failed",e);

Review Comment:
   add a space after ","



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -143,11 +143,11 @@ String generateFiles(JavaPairRDD<ImmutableBytesWritable, 
KeyValue> buildAndSerRd
                     HFileOutputFormat2.class,
                     conf
             );
-            LOG.info("Saved to HFiles to: " + path);
+            LOG.info("Saved HFiles to: '{}'", path);
             flushPermission(conf,path);
             return path;
         } catch (IOException e) {
-            e.printStackTrace();
+            LOG.error("generateFiles failed",e);

Review Comment:
   improve message like `Failed to generate files`, could you also update other 
messages?



##########
hugegraph-loader/assembly/static/example/spark/schema.groovy:
##########
@@ -0,0 +1,36 @@
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
+schema.propertyKey("weight").asDouble().ifNotExist().create();
+schema.propertyKey("lang").asText().ifNotExist().create();
+schema.propertyKey("date").asText().ifNotExist().create();
+schema.propertyKey("price").asDouble().ifNotExist().create();
+
+schema.vertexLabel("person")
+      .properties("name", "age", "city")
+      .primaryKeys("name")
+      .nullableKeys("age", "city")
+      .ifNotExist()
+      .create();
+schema.vertexLabel("software")
+        .properties("name", "lang", "price")
+        .primaryKeys("name")
+      .ifNotExist()
+      .create();
+
+

Review Comment:
   one blank line is ok



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable, 
KeyValue> {
+
+    private SinkToHBase sinkToHBase;
+    private LoadDistributeMetrics loadDistributeMetrics;
+
+    public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+    public HBaseDirectLoader(LoadOptions loadOptions,
+                             InputStruct struct,
+                             LoadDistributeMetrics loadDistributeMetrics) {
+        super(loadOptions,struct);
+        this.loadDistributeMetrics = loadDistributeMetrics;
+        this.sinkToHBase = new SinkToHBase(loadOptions);
+
+    }
+
+    public String getTableName() {
+
+        String tableName = null;
+        if (struct.edges().size() > 0) {
+            tableName = this.loadOptions.edgeTablename;
+
+        } else if (struct.vertices().size() > 0) {
+            tableName = this.loadOptions.vertexTablename;
+
+        }
+        return tableName;
+    }
+
+    public Integer getTablePartitions () {
+        return   struct.edges().size() > 0 ?
+                                            loadOptions.edgePartitions :
+                                            loadOptions.vertexPartitions;

Review Comment:
   ```java
   return struct.edges().size() > 0 ?
          loadOptions.edgePartitions :
          loadOptions.vertexPartitions;
   ```



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java:
##########
@@ -162,5 +162,4 @@ public int getPartition(Object key) {
             }
         }
     }
-
 }

Review Comment:
   expect a blank line at the end of file



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable, 
KeyValue> {
+
+    private SinkToHBase sinkToHBase;
+    private LoadDistributeMetrics loadDistributeMetrics;
+
+    public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+    public HBaseDirectLoader(LoadOptions loadOptions,
+                             InputStruct struct,
+                             LoadDistributeMetrics loadDistributeMetrics) {
+        super(loadOptions,struct);
+        this.loadDistributeMetrics = loadDistributeMetrics;
+        this.sinkToHBase = new SinkToHBase(loadOptions);
+
+    }
+
+    public String getTableName() {
+
+        String tableName = null;
+        if (struct.edges().size() > 0) {
+            tableName = this.loadOptions.edgeTablename;
+
+        } else if (struct.vertices().size() > 0) {
+            tableName = this.loadOptions.vertexTablename;
+
+        }
+        return tableName;
+    }
+
+    public Integer getTablePartitions () {
+        return   struct.edges().size() > 0 ?

Review Comment:
   trim spaces in `"return    struct"`



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable, 
KeyValue> {
+
+    private SinkToHBase sinkToHBase;
+    private LoadDistributeMetrics loadDistributeMetrics;
+
+    public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+    public HBaseDirectLoader(LoadOptions loadOptions,
+                             InputStruct struct,
+                             LoadDistributeMetrics loadDistributeMetrics) {
+        super(loadOptions,struct);
+        this.loadDistributeMetrics = loadDistributeMetrics;
+        this.sinkToHBase = new SinkToHBase(loadOptions);
+
+    }
+
+    public String getTableName() {
+
+        String tableName = null;
+        if (struct.edges().size() > 0) {
+            tableName = this.loadOptions.edgeTablename;
+
+        } else if (struct.vertices().size() > 0) {
+            tableName = this.loadOptions.vertexTablename;
+
+        }
+        return tableName;
+    }
+
+    public Integer getTablePartitions () {
+        return   struct.edges().size() > 0 ?
+                                            loadOptions.edgePartitions :
+                                            loadOptions.vertexPartitions;
+    }
+
+    public JavaPairRDD<ImmutableBytesWritable, KeyValue> 
buildVertexAndEdge(Dataset<Row> ds) {
+        LOG.info("buildAndSer start execute >>>>");
+        JavaPairRDD<ImmutableBytesWritable, KeyValue> 
tuple2KeyValueJavaPairRDD =
+                ds.toJavaRDD().mapPartitionsToPair(
+                new PairFlatMapFunction<Iterator<Row>, ImmutableBytesWritable, 
KeyValue>() {
+                    @Override
+                    public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> 
call(
+                           Iterator<Row> rowIterator) throws Exception {
+
+                            HBaseSerializer serializer = new HBaseSerializer(
+                                    HugeClientHolder.create(loadOptions),
+                                    
loadOptions.vertexPartitions,loadOptions.edgePartitions);
+                            List<ElementBuilder> buildersForGraphElement = 
getElementBuilders();
+                            List<Tuple2<ImmutableBytesWritable,  KeyValue>> 
result =
+                                    new LinkedList<>();
+                            while (rowIterator.hasNext()) {
+                                Row row = rowIterator.next();
+                                List<Tuple2<ImmutableBytesWritable, KeyValue>> 
serList =
+                                        buildAndSer(serializer, 
row,buildersForGraphElement);
+                                result.addAll(serList);
+                            }
+                                serializer.close();
+                                return result.iterator();
+                    }
+                }
+        );
+        return tuple2KeyValueJavaPairRDD;
+    }
+
+    @Override
+    String generateFiles(JavaPairRDD<ImmutableBytesWritable, KeyValue> 
buildAndSerRdd) {
+        LOG.info("bulkload start execute>>>");
+        try {
+            Tuple2<SinkToHBase.IntPartitioner, TableDescriptor> tuple =
+                    
sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+            Partitioner partitioner = (Partitioner) tuple._1;
+            TableDescriptor tableDescriptor = (TableDescriptor) tuple._2;
+
+            JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRdd =
+                    
buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+            Configuration conf = sinkToHBase.getHBaseConfiguration().get();
+            Job job = Job.getInstance(conf);
+            HFileOutputFormat2.configureIncrementalLoadMap(job, 
tableDescriptor);
+            conf.set("hbase.mapreduce.hfileoutputformat.table.name",
+                    tableDescriptor.getTableName().getNameAsString());
+            String path = getHFilePath(job.getConfiguration());
+            repartitionedRdd.saveAsNewAPIHadoopFile(
+                    path,
+                    ImmutableBytesWritable.class,
+                    KeyValue.class,
+                    HFileOutputFormat2.class,
+                    conf
+            );
+            LOG.info("Saved HFiles to: '{}'", path);
+            flushPermission(conf,path);
+            return path;
+        } catch (IOException e) {
+            LOG.error("generateFiles failed",e);
+        }
+
+        return Constants.EMPTY_STR;
+

Review Comment:
   ditto



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -263,17 +293,27 @@ public static LoadOptions parseOptions(String[] args) {
         return options;
     }
 
+    public void copyBackendStoreInfo (BackendStoreInfo backendStoreInfo) {
+        E.checkArgument(null != backendStoreInfo,"the backendStoreInfo is null 
");
+        this.edgeTablename = backendStoreInfo.getEdgeTablename();
+        this.vertexTablename = backendStoreInfo.getVertexTablename();
+        this.hbaseZKParent = backendStoreInfo.getHbaseZKParent();
+        this.hbaseZKPort = backendStoreInfo.getHbaseZKPort();
+        this.hbaseZKQuorum = backendStoreInfo.getHbaseZKQuorum();
+

Review Comment:
   ditto



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable, 
KeyValue> {
+
+    private SinkToHBase sinkToHBase;
+    private LoadDistributeMetrics loadDistributeMetrics;
+
+    public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+    public HBaseDirectLoader(LoadOptions loadOptions,
+                             InputStruct struct,
+                             LoadDistributeMetrics loadDistributeMetrics) {
+        super(loadOptions,struct);
+        this.loadDistributeMetrics = loadDistributeMetrics;
+        this.sinkToHBase = new SinkToHBase(loadOptions);
+
+    }
+
+    public String getTableName() {
+
+        String tableName = null;
+        if (struct.edges().size() > 0) {
+            tableName = this.loadOptions.edgeTablename;
+
+        } else if (struct.vertices().size() > 0) {
+            tableName = this.loadOptions.vertexTablename;
+
+        }
+        return tableName;
+    }
+
+    public Integer getTablePartitions () {
+        return   struct.edges().size() > 0 ?
+                                            loadOptions.edgePartitions :
+                                            loadOptions.vertexPartitions;
+    }
+
+    public JavaPairRDD<ImmutableBytesWritable, KeyValue> 
buildVertexAndEdge(Dataset<Row> ds) {
+        LOG.info("buildAndSer start execute >>>>");
+        JavaPairRDD<ImmutableBytesWritable, KeyValue> 
tuple2KeyValueJavaPairRDD =
+                ds.toJavaRDD().mapPartitionsToPair(
+                new PairFlatMapFunction<Iterator<Row>, ImmutableBytesWritable, 
KeyValue>() {
+                    @Override
+                    public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> 
call(
+                           Iterator<Row> rowIterator) throws Exception {
+
+                            HBaseSerializer serializer = new HBaseSerializer(
+                                    HugeClientHolder.create(loadOptions),
+                                    
loadOptions.vertexPartitions,loadOptions.edgePartitions);
+                            List<ElementBuilder> buildersForGraphElement = 
getElementBuilders();
+                            List<Tuple2<ImmutableBytesWritable,  KeyValue>> 
result =
+                                    new LinkedList<>();
+                            while (rowIterator.hasNext()) {
+                                Row row = rowIterator.next();
+                                List<Tuple2<ImmutableBytesWritable, KeyValue>> 
serList =
+                                        buildAndSer(serializer, 
row,buildersForGraphElement);
+                                result.addAll(serList);
+                            }
+                                serializer.close();
+                                return result.iterator();

Review Comment:
   align



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.util;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.spark.Partitioner;
+import org.slf4j.Logger;
+import scala.Tuple2;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.io.IOException;
+import java.util.HashMap;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class SinkToHBase implements Serializable {
+
+    private LoadOptions loadOptions;
+    public static final Logger LOG = Log.logger(SinkToHBase.class);
+
+    public SinkToHBase(LoadOptions loadOptions) {
+        this.loadOptions = loadOptions;
+    }
+
+    public Optional<Configuration> getHBaseConfiguration() {
+        Configuration baseConf = HBaseConfiguration.create();
+        baseConf.set("hbase.zookeeper.quorum", this.loadOptions.hbaseZKQuorum);
+        baseConf.set("hbase.zookeeper.property.clientPort", 
this.loadOptions.hbaseZKPort);
+        baseConf.set("zookeeper.znode.parent", this.loadOptions.hbaseZKParent);
+        return Optional.ofNullable(baseConf);
+    }
+
+    private Optional<Connection> getConnection() {
+        Optional<Configuration> baseConf = getHBaseConfiguration();
+        Connection conn = null;
+        try {
+            conn = ConnectionFactory.createConnection(baseConf.get());
+        } catch (IOException e) {
+            LOG.error("get hbase connection failed",e);
+        }
+        return Optional.ofNullable(conn);
+    }
+
+    public Tuple2<IntPartitioner, TableDescriptor>
+            getPartitionerByTableName (int numPartitions, String tableName) 
throws IOException {
+        Optional<Connection> optionalConnection = getConnection();
+        TableDescriptor descriptor = optionalConnection
+                .get()
+                .getTable(TableName.valueOf(tableName))
+                .getDescriptor();
+        LOG.debug("getPartitionerByTableName get TableDescriptor " +
+                descriptor.getTableName());
+        optionalConnection.get().close();
+        return new Tuple2<IntPartitioner,TableDescriptor>(
+                new IntPartitioner(numPartitions, tableName),descriptor);
+    }
+
+    public void loadHfiles (String path, String tableName) throws Exception {
+        Connection conn = getConnection().get();
+        Table table = conn.getTable(TableName.valueOf(tableName));
+        Configuration conf = conn.getConfiguration();
+        BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(conf);
+        bulkLoadHFilesTool.bulkLoad(table.getName(), new Path(path));
+        table.close();
+        conn.close();
+
+    }
+
+    public class IntPartitioner extends Partitioner {
+        private final int numPartitions;

Review Comment:
   ditto



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -301,8 +341,8 @@ public void validate(String name, String value) {
             File file = new File(value);
             if (!file.exists() || !file.isDirectory()) {
                 throw new ParameterException(String.format(
-                          "Ensure the directory exists and is indeed a " +
-                          "directory instead of a file: '%s'", value));
+                        "Ensure the directory exists and is indeed a " +

Review Comment:
   ditto



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/BackendStoreInfo.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.mapping;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+
+@JsonPropertyOrder({"edge_tablename", "vertex_tablename", 
"hbase_zookeeper_quorum",
+                       "hbase_zookeeper_property_clientPort", 
"zookeeper_znode_parent"})

Review Comment:
   ditto



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/BackendStoreInfo.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.mapping;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+
+@JsonPropertyOrder({"edge_tablename", "vertex_tablename", 
"hbase_zookeeper_quorum",
+                       "hbase_zookeeper_property_clientPort", 
"zookeeper_znode_parent"})
+public class BackendStoreInfo {
+    @JsonProperty("edge_tablename")

Review Comment:
   ditto



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -89,29 +94,96 @@ public HugeGraphSparkLoader(String[] args) {
     public void load() {
         LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
         List<InputStruct> structs = mapping.structs();
+        boolean sinkType = this.loadOptions.sinkType;
+        if(!sinkType){
+            
this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
+        }
+        SparkConf conf = new SparkConf()
+                .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")// kryo序列化
+                .set("spark.kryo.registrationRequired", "true");
+        try {
+            conf.registerKryoClasses(
+                    new Class[]
+                    {
+                        
org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
+                        org.apache.hadoop.hbase.KeyValue.class,
+                        org.apache.spark.sql.types.StructType.class,
+                        StructField[].class,
+                        StructField.class,
+                        org.apache.spark.sql.types.LongType$.class,
+                        org.apache.spark.sql.types.Metadata.class,
+                        org.apache.spark.sql.types.StringType$.class,
+                Class.forName(
+                    
"org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
+                Class.forName("scala.reflect.ClassTag$$anon$1"),
+                Class.forName("scala.collection.immutable.Set$EmptySet$"),
+                Class.forName("org.apache.spark.sql.types.DoubleType$")
+                    });
+        } catch (ClassNotFoundException e) {
+            LOG.error("spark kryo serialized registration failed");
+        }
+        SparkSession session = SparkSession.builder()
+                                           .config(conf)
+                                           .getOrCreate();
+        SparkContext sc = session.sparkContext();
 
-        SparkSession session = SparkSession.builder().getOrCreate();
+        LongAccumulator totalInsertSuccess = 
sc.longAccumulator("totalInsertSuccess");
         for (InputStruct struct : structs) {
+            LOG.info("\n init {} distribute metrics---- \n",
+                    struct.input().asFileSource().path());
+            LoadDistributeMetrics loadDistributeMetrics = new 
LoadDistributeMetrics(struct);
+            loadDistributeMetrics.init(sc);
+            LOG.info("\n   load dat info: \t {} \n start load data ; \n",
+                    struct.input().asFileSource().path());
             Dataset<Row> ds = read(session, struct);
-            ds.foreachPartition((Iterator<Row> p) -> {
-                LoadContext context = initPartition(this.loadOptions, struct);
-                p.forEachRemaining((Row row) -> {
-                    loadRow(struct, row, p, context);
+            if (sinkType) {
+                LOG.info("\n ------ spark api start load data ------ \n");
+                ds.foreachPartition((Iterator<Row> p) -> {
+                    LoadContext context = initPartition(this.loadOptions, 
struct);
+                    p.forEachRemaining((Row row) -> {
+                        loadRow(struct, row, p, context);
+                    });
+                    context.close();
                 });
-                context.close();
-            });
+
+            } else {
+                LOG.info("\n        spark bulkload gen hfile start     \n");
+                // gen-hfile
+                HBaseDirectLoader directLoader = new 
HBaseDirectLoader(loadOptions,
+                        struct,loadDistributeMetrics);
+                directLoader.bulkload(ds);
+
+            }
+            collectLoadMetrics(loadDistributeMetrics,totalInsertSuccess);
+            LOG.info("    \n   load data info : \t" + 
struct.input().asFileSource().path() +
+                    "\n load data finish!!!; \n");
         }
+        Long totalInsertSuccessCnt = totalInsertSuccess.value();
+        LOG.info("\n ------------The data import task is 
complete-------------------\n" +
+                "\n  insertSuccess cnt:\t" + totalInsertSuccess + "     \n" +
+                "\n ---------------------------------------------\n"
+        );
+
+        sc.stop();
         session.close();
         session.stop();
     }
 
+    private void collectLoadMetrics (LoadDistributeMetrics loadMetrics,

Review Comment:
   remove the space in "collectLoadMetrics (" 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to