JackyYangPassion commented on code in PR #358:
URL:
https://github.com/apache/incubator-hugegraph-toolchain/pull/358#discussion_r1015398845
##########
hugegraph-loader/assembly/static/example/spark/struct.json:
##########
@@ -0,0 +1,58 @@
+{
+ "vertices": [
+ {
+ "label": "person",
+ "input": {
+ "type": "file",
+ "path":
"/Users/alanzhao/Desktop/huge-jacky/incubator-hugegraph-toolchain/hugegraph-loader/assembly/static/example/spark/vertex_person.json",
+ "format": "JSON",
+ "header": ["name", "age", "city"],
+ "charset": "UTF-8",
+ "skipped_line": {
+ "regex": "(^#|^//).*"
+ }
+ },
+ "id": "name",
+ "null_values": ["NULL", "null", ""]
+ },
+ {
+ "label": "software",
+ "input": {
+ "type": "file",
+ "path":
"/Users/alanzhao/Desktop/huge-jacky/incubator-hugegraph-toolchain/hugegraph-loader/assembly/static/example/spark/vertex_software.json",
+ "format": "JSON",
+ "header": ["id","name", "lang", "price","ISBN"],
+ "charset": "GBK"
+ },
+ "id": "name",
+ "ignored": ["ISBN"]
+ }
+ ],
+ "edges": [
+ {
+ "label": "knows",
+ "source": ["source_name"],
+ "target": ["target_name"],
+ "input": {
+ "type": "file",
+ "path":
"/Users/alanzhao/Desktop/huge-jacky/incubator-hugegraph-toolchain/hugegraph-loader/assembly/static/example/spark/edge_knows.json",
Review Comment:
ditto
##########
hugegraph-loader/assembly/static/example/spark/struct.json:
##########
@@ -0,0 +1,58 @@
+{
+ "vertices": [
+ {
+ "label": "person",
+ "input": {
+ "type": "file",
+ "path":
"/Users/alanzhao/Desktop/huge-jacky/incubator-hugegraph-toolchain/hugegraph-loader/assembly/static/example/spark/vertex_person.json",
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable,
KeyValue> {
+
+ private SinkToHBase sinkToHBase ;
+ private LoadDistributeMetrics loadDistributeMetrics;;
+
+ public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+ public HBaseDirectLoader(LoadOptions loadOptions, InputStruct struct,
LoadDistributeMetrics loadDistributeMetrics) {
+ super(loadOptions,struct);
+ this.loadDistributeMetrics=loadDistributeMetrics;
+ this.sinkToHBase=new SinkToHBase(loadOptions);
+
+ }
+
+ public String getTableName(){
+
+ String tableName = null;
+ if (struct.edges().size() > 0) {
+ tableName = this.loadOptions.edgeTablename;
+
+ } else if (struct.vertices().size() > 0) {
+ tableName = this.loadOptions.vertexTablename;
+
+ }
+ return tableName;
+ }
+
+ public Integer getTablePartitions(){
+ return struct.edges().size() > 0 ? loadOptions.edgePartitions
+ : loadOptions.vertexPartitions;
+ }
+
+ public JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildVertexAndEdge(Dataset<Row> ds) {
+ LOG.info("buildAndSer start execute >>>>");
+ JavaPairRDD<ImmutableBytesWritable, KeyValue>
tuple2KeyValueJavaPairRDD = ds.toJavaRDD().mapPartitionsToPair(
+ new PairFlatMapFunction<Iterator<Row>, ImmutableBytesWritable,
KeyValue>() {
+
+ @Override
+ public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>>
call(Iterator<Row> rowIterator) throws Exception {
+
+
+ HBaseSerializer serializer = new
HBaseSerializer(HugeClientHolder.create(loadOptions),loadOptions.vertexPartitions,loadOptions.edgePartitions);
+ List<ElementBuilder> buildersForGraphElement =
getElementBuilders();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>> result
= new LinkedList<>();
+ while (rowIterator.hasNext()) {
+ Row row = rowIterator.next();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>>
serList = buildAndSer(serializer, row,buildersForGraphElement);
+ result.addAll(serList);
+ }
+ serializer.close();
+ return result.iterator();
+ }
+ }
+ );
+ return tuple2KeyValueJavaPairRDD;
+ }
+
+
+ @Override
+ String generateFiles(JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildAndSerRdd) {
+ LOG.info("bulkload start execute>>>");
+ try {
+ Tuple2<SinkToHBase.IntPartitioner, TableDescriptor> tuple =
+
sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+ Partitioner partitioner= (Partitioner) tuple._1;
+ TableDescriptor tableDescriptor= (TableDescriptor) tuple._2;
+
+ JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRdd =
+
buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+ Configuration conf = sinkToHBase.getHBaseConfiguration().get();
+ Job job = Job.getInstance(conf);
+ HFileOutputFormat2.configureIncrementalLoadMap(job,
tableDescriptor);
+ conf.set("hbase.mapreduce.hfileoutputformat.table.name",
tableDescriptor.getTableName().getNameAsString());
+ String path= getHFilePath(job.getConfiguration());
+ repartitionedRdd.saveAsNewAPIHadoopFile(
+ path,
+ ImmutableBytesWritable.class,
+ KeyValue.class,
+ HFileOutputFormat2.class,
+ conf
+ );
+ LOG.info("Saved to HFiles to: " + path);
+ flushPermission(conf,path);
+ return path;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return Constants.EMPTY_STR;
+
+ }
+
+ public String getHFilePath(Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ long timeStr = System.currentTimeMillis();
+ String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" +
"/" + timeStr+ "/";//HFile 存储路径
Review Comment:
rm ch comment
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -44,168 +45,200 @@ public class LoadOptions implements Serializable {
public static final String HTTP_SCHEMA = "http";
private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
@Parameter(names = {"-f", "--file"}, required = true, arity = 1,
- validateWith = {FileValidator.class},
- description = "The path of the data mapping description file")
+ validateWith = {FileValidator.class},
+ description = "The path of the data mapping description file")
public String file;
@Parameter(names = {"-s", "--schema"}, arity = 1,
- validateWith = {FileValidator.class},
- description = "The schema file path which to create manually")
+ validateWith = {FileValidator.class},
+ description = "The schema file path which to create manually")
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -44,168 +45,200 @@ public class LoadOptions implements Serializable {
public static final String HTTP_SCHEMA = "http";
private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
@Parameter(names = {"-f", "--file"}, required = true, arity = 1,
- validateWith = {FileValidator.class},
- description = "The path of the data mapping description file")
+ validateWith = {FileValidator.class},
+ description = "The path of the data mapping description file")
public String file;
@Parameter(names = {"-s", "--schema"}, arity = 1,
- validateWith = {FileValidator.class},
- description = "The schema file path which to create manually")
+ validateWith = {FileValidator.class},
+ description = "The schema file path which to create manually")
public String schema;
@Parameter(names = {"-g", "--graph"}, required = true, arity = 1,
- description = "The namespace of the graph to load into")
+ description = "The namespace of the graph to load into")
public String graph;
@Parameter(names = {"-h", "--host"}, arity = 1,
- validateWith = {UrlValidator.class},
- description = "The host/IP of HugeGraphServer")
+ validateWith = {UrlValidator.class},
+ description = "The host/IP of HugeGraphServer")
public String host = "localhost";
@Parameter(names = {"-p", "--port"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "The port of HugeGraphServer")
+ validateWith = {PositiveValidator.class},
+ description = "The port of HugeGraphServer")
public int port = 8080;
@Parameter(names = {"--username"}, arity = 1,
- description = "The username of graph for authentication")
+ description = "The username of graph for authentication")
public String username = null;
@Parameter(names = {"--protocol"}, arity = 1,
- validateWith = {ProtocolValidator.class},
- description = "The protocol of HugeGraphServer, " +
- "allowed values are: http or https")
+ validateWith = {ProtocolValidator.class},
+ description = "The protocol of HugeGraphServer, " +
+ "allowed values are: http or https")
public String protocol = "http";
@Parameter(names = {"--trust-store-file"}, arity = 1,
- description = "The path of client truststore file used " +
- "when https protocol is enabled")
+ description = "The path of client truststore file used " +
+ "when https protocol is enabled")
public String trustStoreFile = null;
@Parameter(names = {"--trust-store-password"}, arity = 1,
- description = "The password of client truststore file used " +
- "when https protocol is enabled")
+ description = "The password of client truststore file used " +
+ "when https protocol is enabled")
public String trustStoreToken = null;
@Parameter(names = {"--token"}, arity = 1,
- description = "The token of graph for authentication")
+ description = "The token of graph for authentication")
public String token = null;
@Parameter(names = {"--clear-all-data"}, arity = 1,
- description = "Whether to clear all old data before loading")
+ description = "Whether to clear all old data before loading")
public boolean clearAllData = false;
@Parameter(names = {"--clear-timeout"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "The timeout waiting for clearing all data ")
+ validateWith = {PositiveValidator.class},
+ description = "The timeout waiting for clearing all data ")
public int clearTimeout = 240;
@Parameter(names = {"--incremental-mode"}, arity = 1,
- description = "Load data from the breakpoint of last time")
+ description = "Load data from the breakpoint of last time")
public boolean incrementalMode = false;
@Parameter(names = {"--failure-mode"}, arity = 1,
- description = "Load data from the failure records, in this " +
- "mode, only full load is supported, any read " +
- "or parsing errors will cause load task stop")
+ description = "Load data from the failure records, in this " +
+ "mode, only full load is supported, any read " +
+ "or parsing errors will cause load task stop")
public boolean failureMode = false;
@Parameter(names = {"--batch-insert-threads"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "The number of threads to execute batch insert")
+ validateWith = {PositiveValidator.class},
+ description = "The number of threads to execute batch insert")
public int batchInsertThreads = CPUS;
@Parameter(names = {"--single-insert-threads"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "The number of threads to execute single insert")
+ validateWith = {PositiveValidator.class},
+ description = "The number of threads to execute single insert")
public int singleInsertThreads = 8;
@Parameter(names = {"--max-conn"}, arity = 1,
- description = "Max number of HTTP connections to server")
+ description = "Max number of HTTP connections to server")
public int maxConnections = CPUS * 4;
@Parameter(names = {"--max-conn-per-route"}, arity = 1,
- description = "Max number of HTTP connections to each route")
+ description = "Max number of HTTP connections to each route")
public int maxConnectionsPerRoute = CPUS * 2;
@Parameter(names = {"--batch-size"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "The number of lines in each submit")
+ validateWith = {PositiveValidator.class},
+ description = "The number of lines in each submit")
public int batchSize = 500;
@Parameter(names = {"--cdc-flush-interval"}, arity = 1,
- description = "The flush interval for flink cdc")
+ description = "The flush interval for flink cdc")
public int flushIntervalMs = 30000;
@Parameter(names = {"--cdc-sink-parallelism"}, arity = 1,
- description = "The sink parallelism for flink cdc")
+ description = "The sink parallelism for flink cdc")
public int sinkParallelism = 1;
@Parameter(names = {"--shutdown-timeout"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "The timeout of awaitTermination in seconds")
+ validateWith = {PositiveValidator.class},
+ description = "The timeout of awaitTermination in seconds")
public int shutdownTimeout = 10;
@Parameter(names = {"--check-vertex"}, arity = 1,
- description = "Check vertices exists while inserting edges")
+ description = "Check vertices exists while inserting edges")
public boolean checkVertex = false;
@Parameter(names = {"--max-read-errors"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "The maximum number of lines that read error " +
- "before exiting")
+ validateWith = {PositiveValidator.class},
+ description = "The maximum number of lines that read error " +
+ "before exiting")
public int maxReadErrors = 1;
@Parameter(names = {"--max-parse-errors"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "The maximum number of lines that parse error " +
- "before exiting")
+ validateWith = {PositiveValidator.class},
+ description = "The maximum number of lines that parse error " +
+ "before exiting")
public int maxParseErrors = 1;
@Parameter(names = {"--max-insert-errors"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "The maximum number of lines that insert error " +
- "before exiting")
+ validateWith = {PositiveValidator.class},
+ description = "The maximum number of lines that insert error " +
+ "before exiting")
public int maxInsertErrors = 500;
@Parameter(names = {"--timeout"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "The timeout of HugeClient request")
+ validateWith = {PositiveValidator.class},
+ description = "The timeout of HugeClient request")
public int timeout = 60;
@Parameter(names = {"--retry-times"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "Setting the max retry times when loading
timeout")
+ validateWith = {PositiveValidator.class},
+ description = "Setting the max retry times when loading timeout")
public int retryTimes = 3;
@Parameter(names = {"--retry-interval"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "Setting the interval time before retrying")
+ validateWith = {PositiveValidator.class},
+ description = "Setting the interval time before retrying")
public int retryInterval = 10;
@Parameter(names = {"--max-read-lines"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "The maximum number of read lines, when reached "
+
- "this number, the load task will stop")
+ validateWith = {PositiveValidator.class},
+ description = "The maximum number of read lines, when reached " +
+ "this number, the load task will stop")
public long maxReadLines = -1L;
@Parameter(names = {"--dry-run"}, arity = 1,
- description = "Dry run means that only parse but doesn't load")
+ description = "Dry run means that only parse but doesn't load")
public boolean dryRun = false;
@Parameter(names = {"--print-progress"}, arity = 1,
- description = "Whether to print real-time load progress")
+ description = "Whether to print real-time load progress")
public boolean printProgress = true;
@Parameter(names = {"--test-mode"}, arity = 1,
- description = "Whether the hugegraph-loader work in test mode")
+ description = "Whether the hugegraph-loader work in test mode")
public boolean testMode = false;
@Parameter(names = {"--help"}, help = true,
- description = "Print usage of HugeGraphLoader")
+ description = "Print usage of HugeGraphLoader")
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -44,168 +45,200 @@ public class LoadOptions implements Serializable {
public static final String HTTP_SCHEMA = "http";
private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
@Parameter(names = {"-f", "--file"}, required = true, arity = 1,
- validateWith = {FileValidator.class},
- description = "The path of the data mapping description file")
+ validateWith = {FileValidator.class},
+ description = "The path of the data mapping description file")
public String file;
@Parameter(names = {"-s", "--schema"}, arity = 1,
- validateWith = {FileValidator.class},
- description = "The schema file path which to create manually")
+ validateWith = {FileValidator.class},
+ description = "The schema file path which to create manually")
public String schema;
@Parameter(names = {"-g", "--graph"}, required = true, arity = 1,
- description = "The namespace of the graph to load into")
+ description = "The namespace of the graph to load into")
public String graph;
@Parameter(names = {"-h", "--host"}, arity = 1,
- validateWith = {UrlValidator.class},
- description = "The host/IP of HugeGraphServer")
+ validateWith = {UrlValidator.class},
+ description = "The host/IP of HugeGraphServer")
public String host = "localhost";
@Parameter(names = {"-p", "--port"}, arity = 1,
- validateWith = {PositiveValidator.class},
- description = "The port of HugeGraphServer")
+ validateWith = {PositiveValidator.class},
+ description = "The port of HugeGraphServer")
public int port = 8080;
@Parameter(names = {"--username"}, arity = 1,
- description = "The username of graph for authentication")
+ description = "The username of graph for authentication")
public String username = null;
@Parameter(names = {"--protocol"}, arity = 1,
- validateWith = {ProtocolValidator.class},
- description = "The protocol of HugeGraphServer, " +
- "allowed values are: http or https")
+ validateWith = {ProtocolValidator.class},
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -149,17 +227,18 @@ private Dataset<Row> read(SparkSession ss, InputStruct
struct) {
switch (input.type()) {
case FILE:
case HDFS:
- FileSource fileSource = input.asFileSource();
- String[] header = fileSource.header();
- String delimiter = fileSource.delimiter();
- String path = fileSource.path();
- FileFilter filter = fileSource.filter();
- FileFormat format = fileSource.format();
- String dateFormat = fileSource.dateFormat();
- String timeZone = fileSource.timeZone();
- SkippedLine skippedLine = fileSource.skippedLine();
- Compression compression = fileSource.compression();
- int batchSize = fileSource.batchSize();
+ FileSource fileSource = input.asFileSource();
Review Comment:
prefer to keep the origin alignment
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -172,7 +251,7 @@ private Dataset<Row> read(SparkSession ss, InputStruct
struct) {
break;
default:
throw new IllegalStateException(
- "Unexpected format value: " + format);
+ "Unexpected format value: " + format);
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -249,15 +328,15 @@ private void flush(Map.Entry<ElementBuilder,
List<GraphElement>> builderMap,
BatchVertexRequest.Builder req =
new BatchVertexRequest.Builder();
req.vertices((List<Vertex>) (Object) graphElements)
- .updatingStrategies(updateStrategyMap)
- .createIfNotExist(true);
+ .updatingStrategies(updateStrategyMap)
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -216,11 +249,12 @@ public String workModeString() {
}
}
+
public static LoadOptions parseOptions(String[] args) {
LoadOptions options = new LoadOptions();
JCommander commander = JCommander.newBuilder()
- .addObject(options)
- .build();
+ .addObject(options)
+ .build();
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -44,168 +45,200 @@ public class LoadOptions implements Serializable {
public static final String HTTP_SCHEMA = "http";
private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
@Parameter(names = {"-f", "--file"}, required = true, arity = 1,
- validateWith = {FileValidator.class},
- description = "The path of the data mapping description file")
+ validateWith = {FileValidator.class},
Review Comment:
prefer to keep the origin alignment
##########
hugegraph-loader/assembly/static/example/spark/struct.json:
##########
@@ -0,0 +1,58 @@
+{
+ "vertices": [
+ {
+ "label": "person",
+ "input": {
+ "type": "file",
+ "path":
"/Users/alanzhao/Desktop/huge-jacky/incubator-hugegraph-toolchain/hugegraph-loader/assembly/static/example/spark/vertex_person.json",
+ "format": "JSON",
+ "header": ["name", "age", "city"],
+ "charset": "UTF-8",
+ "skipped_line": {
+ "regex": "(^#|^//).*"
+ }
+ },
+ "id": "name",
+ "null_values": ["NULL", "null", ""]
+ },
+ {
+ "label": "software",
+ "input": {
+ "type": "file",
+ "path":
"/Users/alanzhao/Desktop/huge-jacky/incubator-hugegraph-toolchain/hugegraph-loader/assembly/static/example/spark/vertex_software.json",
Review Comment:
write relative path
for example : example/spark/vertex_software.json
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.util;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.spark.Partitioner;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+
+
+public class SinkToHBase implements Serializable {
+ private LoadOptions loadOptions;
+ public static final Logger LOG = Log.logger(SinkToHBase.class);
+
+ public SinkToHBase(LoadOptions loadOptions) {
+ this.loadOptions=loadOptions;
+ }
+
+
+ public Optional<Configuration> getHBaseConfiguration(){
+ Configuration baseConf = HBaseConfiguration.create();
+ baseConf.set("hbase.zookeeper.quorum", this.loadOptions.hbaseZKQuorum);
+ baseConf.set("hbase.zookeeper.property.clientPort",
this.loadOptions.hbaseZKPort);
+ baseConf.set("zookeeper.znode.parent", this.loadOptions.hbaseZKParent);
+ return Optional.ofNullable(baseConf);
+ }
+
+ private Optional<Connection> getConnection(){
+ Optional<Configuration> baseConf = getHBaseConfiguration();
+ Connection conn=null;
+ try {
+ conn = ConnectionFactory.createConnection(baseConf.get());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return Optional.ofNullable(conn);
+ }
+ public Tuple2<IntPartitioner, TableDescriptor> getPartitionerByTableName
(int numPartitions, String tableName) throws IOException {
+ Optional<Connection> optionalConnection = getConnection();
+ TableDescriptor descriptor = optionalConnection
+ .get()
+ .getTable(TableName.valueOf(tableName))
+ .getDescriptor();
+ LOG.debug("getPartitionerByTableName get TableDescriptor
》》》"+descriptor.getTableName());
+ optionalConnection.get().close();
+ return new Tuple2<IntPartitioner,TableDescriptor>(new
IntPartitioner(numPartitions, tableName),descriptor);
+ }
+
+ public void loadHfiles (String path, String tableName) throws Exception {
+ Connection conn = getConnection().get();
+ Table table = conn.getTable(TableName.valueOf(tableName));
+ Configuration conf = conn.getConfiguration();
+ BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(conf);
+ bulkLoadHFilesTool.bulkLoad(table.getName(), new Path(path));
+ table.close();
+ conn.close();
+
+ }
+
+
+
+ public class IntPartitioner extends Partitioner {
+ private final int numPartitions;
+ public Map<List<String>, Integer> rangeMap = new HashMap<>();
+ private String tableName;
+
+ public IntPartitioner(int numPartitions, String tableName) throws
IOException {
+ this.numPartitions = numPartitions;
+ this.rangeMap = getRangeMap(tableName);
+ this.tableName = tableName;
+ }
+
+
+ private Map<List<String>, Integer> getRangeMap(String tableName)
throws IOException {
+ Connection conn = getConnection().get();
+
+ HRegionLocator locator = (HRegionLocator)
conn.getRegionLocator(TableName.valueOf(tableName));
+
+ Pair<byte[][], byte[][]> startEndKeys = locator.getStartEndKeys();
+
+ Map<List<String>, Integer> rangeMap = new HashMap<>();
+ for (int i = 0; i < startEndKeys.getFirst().length; i++) {
+ String startKey = Bytes.toString(startEndKeys.getFirst()[i]);
+ String endKey = Bytes.toString(startEndKeys.getSecond()[i]);
+
+ rangeMap.put(new ArrayList<>(Arrays.asList(startKey, endKey)),
i);
+ }
+ conn.close();
+ return rangeMap;
+ }
+
+ @Override
+ public int numPartitions() {
+ return numPartitions;
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ if (key instanceof ImmutableBytesWritable) {
+
+ try {
+ ImmutableBytesWritable immutableBytesWritableKey =
(ImmutableBytesWritable) key;
+
+ if (rangeMap == null || rangeMap.isEmpty()) {
+ rangeMap = getRangeMap(this.tableName);
+ }
+
+ String keyString =
Bytes.toString(immutableBytesWritableKey.get());
+ for (List<String> range : rangeMap.keySet()) {
+ if (keyString.compareToIgnoreCase(range.get(0)) >= 0
+ &&
((keyString.compareToIgnoreCase(range.get(1)) < 0)
+ || range.get(1).equals(""))) {
+ return rangeMap.get(range);
+ }
+ }
+ LOG.error("Didn't find proper key in rangeMap, so
returning 0 ...");
+ return 0;
+ } catch (Exception e) {
+ LOG.error("When trying to get partitionID, "
+ + "encountered exception: " + e + "\t key = " +
key);
+ return 0;
+ }
+ } else {
+ LOG.error("key is NOT ImmutableBytesWritable type ...");
+ return 0;
+ }
+ }
+ }
+
+}
+
Review Comment:
unused blank line
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -44,168 +45,200 @@ public class LoadOptions implements Serializable {
public static final String HTTP_SCHEMA = "http";
private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
Review Comment:
unused blank line
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -263,17 +297,27 @@ public static LoadOptions parseOptions(String[] args) {
return options;
}
+ public void copyBackendStoreInfo(BackendStoreInfo backendStoreInfo){
+ E.checkArgument(null != backendStoreInfo,"the backendStoreInfo is null
");
+ this.edgeTablename=backendStoreInfo.getEdgeTablename();
+ this.vertexTablename=backendStoreInfo.getVertexTablename();
+ this.hbaseZKParent=backendStoreInfo.getHbaseZKParent();
+ this.hbaseZKPort=backendStoreInfo.getHbaseZKPort();
+ this.hbaseZKQuorum=backendStoreInfo.getHbaseZKQuorum();
+
+ }
+
public static class UrlValidator implements IParameterValidator {
@Override
public void validate(String name, String value) {
String regex = "^((http)(s?)://)?" +
- "(([0-9]{1,3}\\.){3}[0-9]{1,3}" + // IP URL
- "|" + // Or domain name
- "([0-9a-z_!~*'()-]+\\.)*[0-9a-z_!~*'()-]+)$";
+ "(([0-9]{1,3}\\.){3}[0-9]{1,3}" + // IP URL
+ "|" + // Or domain name
+ "([0-9a-z_!~*'()-]+\\.)*[0-9a-z_!~*'()-]+)$";
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -327,8 +371,8 @@ public void validate(String name, String value) {
int retry = Integer.parseInt(value);
if (retry <= 0) {
throw new ParameterException(String.format(
- "Parameter '%s' should be positive, but got '%s'",
- name, value));
+ "Parameter '%s' should be positive, but got '%s'",
+ name, value));
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/metrics/LoadDistributeMetrics.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.metrics;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.LongAccumulator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class LoadDistributeMetrics implements Serializable {
+
+ private final InputStruct struct;
+
+ private Map<String, Metrics> vertexDisMetrics;
+ private Map<String, Metrics> edgeDisMetrics;
+
+
Review Comment:
unused blank line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]