javeme commented on code in PR #358:
URL:
https://github.com/apache/incubator-hugegraph-toolchain/pull/358#discussion_r1016586983
##########
hugegraph-loader/assembly/static/example/spark/struct.json:
##########
@@ -0,0 +1,58 @@
+{
+ "vertices": [
+ {
+ "label": "person",
+ "input": {
+ "type": "file",
+ "path": "example/spark/vertex_person.json",
+ "format": "JSON",
+ "header": ["name", "age", "city"],
+ "charset": "UTF-8",
+ "skipped_line": {
+ "regex": "(^#|^//).*"
+ }
+ },
+ "id": "name",
+ "null_values": ["NULL", "null", ""]
+ },
+ {
+ "label": "software",
+ "input": {
+ "type": "file",
+ "path": "example/spark/vertex_software.json",
+ "format": "JSON",
+ "header": ["id","name", "lang", "price","ISBN"],
+ "charset": "GBK"
+ },
+ "id": "name",
+ "ignored": ["ISBN"]
+ }
+ ],
+ "edges": [
+ {
+ "label": "knows",
+ "source": ["source_name"],
+ "target": ["target_name"],
+ "input": {
+ "type": "file",
+ "path": "example/spark/edge_knows.json",
+ "format": "JSON",
+ "date_format": "yyyyMMdd",
+ "header": ["source_name","target_name", "date", "weight"]
+ },
+ "field_mapping": {
+ "source_name": "name",
+ "target_name": "name"
+ }
+ }
+ ],
+ "backendStoreInfo":
+ {
+ "edge_tablename": "hugegraph:g_oe",
+ "vertex_tablename": "hugegraph:g_v",
+ "hbase_zookeeper_quorum":"127.0.0.1",
+ "hbase_zookeeper_property_clientPort":"2181",
+ "zookeeper_znode_parent":"/hbase"
+
Review Comment:
remove the blank line
##########
hugegraph-loader/assembly/static/example/spark/struct.json:
##########
@@ -0,0 +1,58 @@
+{
+ "vertices": [
+ {
+ "label": "person",
+ "input": {
+ "type": "file",
+ "path": "example/spark/vertex_person.json",
+ "format": "JSON",
+ "header": ["name", "age", "city"],
+ "charset": "UTF-8",
+ "skipped_line": {
+ "regex": "(^#|^//).*"
+ }
+ },
+ "id": "name",
+ "null_values": ["NULL", "null", ""]
+ },
+ {
+ "label": "software",
+ "input": {
+ "type": "file",
+ "path": "example/spark/vertex_software.json",
+ "format": "JSON",
+ "header": ["id","name", "lang", "price","ISBN"],
+ "charset": "GBK"
+ },
+ "id": "name",
+ "ignored": ["ISBN"]
+ }
+ ],
+ "edges": [
+ {
+ "label": "knows",
+ "source": ["source_name"],
+ "target": ["target_name"],
+ "input": {
+ "type": "file",
+ "path": "example/spark/edge_knows.json",
+ "format": "JSON",
+ "date_format": "yyyyMMdd",
+ "header": ["source_name","target_name", "date", "weight"]
+ },
+ "field_mapping": {
+ "source_name": "name",
+ "target_name": "name"
+ }
+ }
+ ],
+ "backendStoreInfo":
+ {
+ "edge_tablename": "hugegraph:g_oe",
+ "vertex_tablename": "hugegraph:g_v",
+ "hbase_zookeeper_quorum":"127.0.0.1",
+ "hbase_zookeeper_property_clientPort":"2181",
+ "zookeeper_znode_parent":"/hbase"
Review Comment:
add a space after ":"
##########
hugegraph-loader/assembly/static/example/spark/vertex_software.json:
##########
@@ -0,0 +1,3 @@
+{ "name": "lop", "lang": "java","price": "328","ISBN":
"ISBN978-7-107-18618-5"}
+{ "name": "ripple", "lang": "java","price": "199","ISBN":
"ISBN978-7-100-13678-5"}
+
Review Comment:
remove the blank line
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/EdgeBuilder.java:
##########
@@ -109,6 +110,53 @@ public List<Edge> build(String[] names, Object[] values) {
}
return edges;
}
+
+ @Override
+ public List<Edge> build(Row row) {
+ String[] names = row.schema().fieldNames();
+ Object[] values = new Object[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ values[i] = row.get(i);
+ }
+ if (this.vertexIdsIndex == null ||
+ !Arrays.equals(this.lastNames, names)) {
+ this.vertexIdsIndex = this.extractVertexIdsIndex(names);
+ }
+
+ this.lastNames = names;
+ EdgeKVPairs kvPairs = this.newEdgeKVPairs();
+ kvPairs.source.extractFromEdge(names, values,
+ this.vertexIdsIndex.sourceIndexes);
+ kvPairs.target.extractFromEdge(names, values,
+ this.vertexIdsIndex.targetIndexes);
+ kvPairs.extractProperties(names, values);
+
+ List<Vertex> sources = kvPairs.source.buildVertices(false);
+ List<Vertex> targets = kvPairs.target.buildVertices(false);
+ if (sources.isEmpty() || targets.isEmpty()) {
+ return ImmutableList.of();
+ }
+ E.checkArgument(sources.size() == 1 || targets.size() == 1 ||
+ sources.size() == targets.size(),
+ "The elements number of source and target must be: " +
+ "1 to n, n to 1, n to n");
+ int size = Math.max(sources.size(), targets.size());
+ List<Edge> edges = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ Vertex source = i < sources.size() ?
+ sources.get(i) : sources.get(0);
Review Comment:
align with "i <"
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/DirectLoader.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+
+public abstract class DirectLoader<T,R> implements Serializable {
+ LoadOptions loadOptions;
Review Comment:
add a blank line after class define
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable,
KeyValue> {
+
+ private SinkToHBase sinkToHBase;
+ private LoadDistributeMetrics loadDistributeMetrics;
+
+ public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+ public HBaseDirectLoader(LoadOptions loadOptions,
+ InputStruct struct,
+ LoadDistributeMetrics loadDistributeMetrics) {
+ super(loadOptions,struct);
+ this.loadDistributeMetrics = loadDistributeMetrics;
+ this.sinkToHBase = new SinkToHBase(loadOptions);
+
+ }
+
+ public String getTableName() {
+
+ String tableName = null;
+ if (struct.edges().size() > 0) {
+ tableName = this.loadOptions.edgeTablename;
+
+ } else if (struct.vertices().size() > 0) {
+ tableName = this.loadOptions.vertexTablename;
+
+ }
+ return tableName;
+ }
+
+ public Integer getTablePartitions () {
+ return struct.edges().size() > 0 ?
+ loadOptions.edgePartitions :
+ loadOptions.vertexPartitions;
+ }
+
+ public JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildVertexAndEdge(Dataset<Row> ds) {
+ LOG.info("buildAndSer start execute >>>>");
+ JavaPairRDD<ImmutableBytesWritable, KeyValue>
tuple2KeyValueJavaPairRDD =
+ ds.toJavaRDD().mapPartitionsToPair(
+ new PairFlatMapFunction<Iterator<Row>,
ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>>
call(
+ Iterator<Row> rowIterator) throws Exception {
+
+ HBaseSerializer serializer = new HBaseSerializer(
+ HugeClientHolder.create(loadOptions),
+
loadOptions.vertexPartitions,loadOptions.edgePartitions);
+ List<ElementBuilder> buildersForGraphElement =
getElementBuilders();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>>
result =
+ new LinkedList<>();
+ while (rowIterator.hasNext()) {
+ Row row = rowIterator.next();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>>
serList =
+ buildAndSer(serializer,
row,buildersForGraphElement);
+ result.addAll(serList);
+ }
+ serializer.close();
+ return result.iterator();
+ }
+ }
+ );
+ return tuple2KeyValueJavaPairRDD;
+ }
+
+ @Override
+ String generateFiles(JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildAndSerRdd) {
+ LOG.info("bulkload start execute>>>");
+ try {
+ Tuple2<SinkToHBase.IntPartitioner, TableDescriptor> tuple =
+
sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+ Partitioner partitioner = (Partitioner) tuple._1;
+ TableDescriptor tableDescriptor = (TableDescriptor) tuple._2;
+
+ JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRdd =
+
buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+ Configuration conf = sinkToHBase.getHBaseConfiguration().get();
+ Job job = Job.getInstance(conf);
+ HFileOutputFormat2.configureIncrementalLoadMap(job,
tableDescriptor);
+ conf.set("hbase.mapreduce.hfileoutputformat.table.name",
+ tableDescriptor.getTableName().getNameAsString());
+ String path = getHFilePath(job.getConfiguration());
+ repartitionedRdd.saveAsNewAPIHadoopFile(
+ path,
+ ImmutableBytesWritable.class,
+ KeyValue.class,
+ HFileOutputFormat2.class,
+ conf
+ );
+ LOG.info("Saved to HFiles to: " + path);
Review Comment:
improve message and use format with "{}":
`LOG.info("Saved HFiles to: '{}'", path);`
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable,
KeyValue> {
+
+ private SinkToHBase sinkToHBase;
+ private LoadDistributeMetrics loadDistributeMetrics;
+
+ public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+ public HBaseDirectLoader(LoadOptions loadOptions,
+ InputStruct struct,
+ LoadDistributeMetrics loadDistributeMetrics) {
+ super(loadOptions,struct);
+ this.loadDistributeMetrics = loadDistributeMetrics;
+ this.sinkToHBase = new SinkToHBase(loadOptions);
+
+ }
+
+ public String getTableName() {
+
+ String tableName = null;
+ if (struct.edges().size() > 0) {
+ tableName = this.loadOptions.edgeTablename;
+
+ } else if (struct.vertices().size() > 0) {
+ tableName = this.loadOptions.vertexTablename;
+
+ }
+ return tableName;
+ }
+
+ public Integer getTablePartitions () {
+ return struct.edges().size() > 0 ?
+ loadOptions.edgePartitions :
+ loadOptions.vertexPartitions;
+ }
+
+ public JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildVertexAndEdge(Dataset<Row> ds) {
+ LOG.info("buildAndSer start execute >>>>");
+ JavaPairRDD<ImmutableBytesWritable, KeyValue>
tuple2KeyValueJavaPairRDD =
+ ds.toJavaRDD().mapPartitionsToPair(
+ new PairFlatMapFunction<Iterator<Row>,
ImmutableBytesWritable, KeyValue>() {
+ @Override
Review Comment:
improve alignment
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable,
KeyValue> {
+
+ private SinkToHBase sinkToHBase;
+ private LoadDistributeMetrics loadDistributeMetrics;
+
+ public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+ public HBaseDirectLoader(LoadOptions loadOptions,
+ InputStruct struct,
+ LoadDistributeMetrics loadDistributeMetrics) {
+ super(loadOptions,struct);
+ this.loadDistributeMetrics = loadDistributeMetrics;
+ this.sinkToHBase = new SinkToHBase(loadOptions);
+
+ }
+
+ public String getTableName() {
+
+ String tableName = null;
+ if (struct.edges().size() > 0) {
+ tableName = this.loadOptions.edgeTablename;
+
+ } else if (struct.vertices().size() > 0) {
+ tableName = this.loadOptions.vertexTablename;
+
+ }
+ return tableName;
+ }
+
+ public Integer getTablePartitions () {
+ return struct.edges().size() > 0 ?
+ loadOptions.edgePartitions :
+ loadOptions.vertexPartitions;
+ }
+
+ public JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildVertexAndEdge(Dataset<Row> ds) {
+ LOG.info("buildAndSer start execute >>>>");
+ JavaPairRDD<ImmutableBytesWritable, KeyValue>
tuple2KeyValueJavaPairRDD =
+ ds.toJavaRDD().mapPartitionsToPair(
+ new PairFlatMapFunction<Iterator<Row>,
ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>>
call(
+ Iterator<Row> rowIterator) throws Exception {
+
+ HBaseSerializer serializer = new HBaseSerializer(
+ HugeClientHolder.create(loadOptions),
+
loadOptions.vertexPartitions,loadOptions.edgePartitions);
+ List<ElementBuilder> buildersForGraphElement =
getElementBuilders();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>>
result =
+ new LinkedList<>();
+ while (rowIterator.hasNext()) {
+ Row row = rowIterator.next();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>>
serList =
+ buildAndSer(serializer,
row,buildersForGraphElement);
+ result.addAll(serList);
+ }
+ serializer.close();
+ return result.iterator();
+ }
+ }
+ );
+ return tuple2KeyValueJavaPairRDD;
+ }
+
+ @Override
+ String generateFiles(JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildAndSerRdd) {
+ LOG.info("bulkload start execute>>>");
+ try {
+ Tuple2<SinkToHBase.IntPartitioner, TableDescriptor> tuple =
+
sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+ Partitioner partitioner = (Partitioner) tuple._1;
+ TableDescriptor tableDescriptor = (TableDescriptor) tuple._2;
+
+ JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRdd =
+
buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+ Configuration conf = sinkToHBase.getHBaseConfiguration().get();
+ Job job = Job.getInstance(conf);
+ HFileOutputFormat2.configureIncrementalLoadMap(job,
tableDescriptor);
+ conf.set("hbase.mapreduce.hfileoutputformat.table.name",
+ tableDescriptor.getTableName().getNameAsString());
+ String path = getHFilePath(job.getConfiguration());
+ repartitionedRdd.saveAsNewAPIHadoopFile(
+ path,
+ ImmutableBytesWritable.class,
+ KeyValue.class,
+ HFileOutputFormat2.class,
+ conf
+ );
+ LOG.info("Saved to HFiles to: " + path);
+ flushPermission(conf,path);
+ return path;
+ } catch (IOException e) {
+ e.printStackTrace();
Review Comment:
please rethrow it or LOG.error(xx)
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable,
KeyValue> {
+
+ private SinkToHBase sinkToHBase;
+ private LoadDistributeMetrics loadDistributeMetrics;
+
+ public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+ public HBaseDirectLoader(LoadOptions loadOptions,
+ InputStruct struct,
+ LoadDistributeMetrics loadDistributeMetrics) {
+ super(loadOptions,struct);
+ this.loadDistributeMetrics = loadDistributeMetrics;
+ this.sinkToHBase = new SinkToHBase(loadOptions);
+
+ }
+
+ public String getTableName() {
+
+ String tableName = null;
+ if (struct.edges().size() > 0) {
+ tableName = this.loadOptions.edgeTablename;
+
+ } else if (struct.vertices().size() > 0) {
+ tableName = this.loadOptions.vertexTablename;
+
+ }
+ return tableName;
+ }
+
+ public Integer getTablePartitions () {
+ return struct.edges().size() > 0 ?
+ loadOptions.edgePartitions :
+ loadOptions.vertexPartitions;
+ }
+
+ public JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildVertexAndEdge(Dataset<Row> ds) {
+ LOG.info("buildAndSer start execute >>>>");
+ JavaPairRDD<ImmutableBytesWritable, KeyValue>
tuple2KeyValueJavaPairRDD =
+ ds.toJavaRDD().mapPartitionsToPair(
+ new PairFlatMapFunction<Iterator<Row>,
ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>>
call(
+ Iterator<Row> rowIterator) throws Exception {
+
+ HBaseSerializer serializer = new HBaseSerializer(
+ HugeClientHolder.create(loadOptions),
+
loadOptions.vertexPartitions,loadOptions.edgePartitions);
+ List<ElementBuilder> buildersForGraphElement =
getElementBuilders();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>>
result =
+ new LinkedList<>();
+ while (rowIterator.hasNext()) {
+ Row row = rowIterator.next();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>>
serList =
+ buildAndSer(serializer,
row,buildersForGraphElement);
+ result.addAll(serList);
+ }
+ serializer.close();
+ return result.iterator();
+ }
+ }
+ );
+ return tuple2KeyValueJavaPairRDD;
+ }
+
+ @Override
+ String generateFiles(JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildAndSerRdd) {
+ LOG.info("bulkload start execute>>>");
+ try {
+ Tuple2<SinkToHBase.IntPartitioner, TableDescriptor> tuple =
+
sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+ Partitioner partitioner = (Partitioner) tuple._1;
+ TableDescriptor tableDescriptor = (TableDescriptor) tuple._2;
+
+ JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRdd =
+
buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+ Configuration conf = sinkToHBase.getHBaseConfiguration().get();
+ Job job = Job.getInstance(conf);
+ HFileOutputFormat2.configureIncrementalLoadMap(job,
tableDescriptor);
+ conf.set("hbase.mapreduce.hfileoutputformat.table.name",
+ tableDescriptor.getTableName().getNameAsString());
+ String path = getHFilePath(job.getConfiguration());
+ repartitionedRdd.saveAsNewAPIHadoopFile(
+ path,
+ ImmutableBytesWritable.class,
+ KeyValue.class,
+ HFileOutputFormat2.class,
+ conf
+ );
+ LOG.info("Saved to HFiles to: " + path);
+ flushPermission(conf,path);
+ return path;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return Constants.EMPTY_STR;
+
+ }
+
+ public String getHFilePath (Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ long timeStr = System.currentTimeMillis();
+ String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" +
"/" + timeStr + "/";
+ Path hfileGenPath = new Path(pathStr);
+ if (fs.exists(hfileGenPath)) {
+ LOG.info("\n delete hfile path \n");
+ fs.delete(hfileGenPath,true);
+ }
+ return pathStr;
+ }
+
+ @Override
+ public void loadFiles(String path) {
+ try {
+ sinkToHBase.loadHfiles(path, getTableName());// BulkLoad HFile to
HBase
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void flushPermission (Configuration conf, String path) {
+ FsShell shell = new FsShell(conf);
+ try {
+ LOG.info("shell start execute");
+ shell.run(new String[]{"-chmod", "-R", "777", path});
+ shell.close();
+ } catch (Exception e) {
+ LOG.error("Couldnt change the file permissions " + e +
+ " Please run command:" +
+ "hbase
org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles " + path +
+ " '" + "test" + "'\n" + " to load generated HFiles into
HBase table");
+ }
+ }
+
+ List<Tuple2<ImmutableBytesWritable, KeyValue>> buildAndSer
(HBaseSerializer serializer,
+ Row row,
+
List<ElementBuilder> builders) {
+ List<GraphElement> elementsElement;
+
+ List<Tuple2<ImmutableBytesWritable, KeyValue>> result = new
LinkedList<>();
+
+ for (ElementBuilder builder : builders) {
+ ElementMapping elementMapping = builder.mapping();
+ if (elementMapping.skip()) {
+ continue;
+ }
+ if ("".equals(row.mkString())) {
+ break;
+ }
+ switch (struct.input().type()) {
+ case FILE:
+ case HDFS:
+
+ elementsElement = builder.build(row);
+
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.util;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.spark.Partitioner;
+import org.slf4j.Logger;
+import scala.Tuple2;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.io.IOException;
+import java.util.HashMap;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class SinkToHBase implements Serializable {
+
+ private LoadOptions loadOptions;
+ public static final Logger LOG = Log.logger(SinkToHBase.class);
+
+ public SinkToHBase(LoadOptions loadOptions) {
+ this.loadOptions = loadOptions;
+ }
+
+ public Optional<Configuration> getHBaseConfiguration() {
+ Configuration baseConf = HBaseConfiguration.create();
+ baseConf.set("hbase.zookeeper.quorum", this.loadOptions.hbaseZKQuorum);
+ baseConf.set("hbase.zookeeper.property.clientPort",
this.loadOptions.hbaseZKPort);
+ baseConf.set("zookeeper.znode.parent", this.loadOptions.hbaseZKParent);
+ return Optional.ofNullable(baseConf);
+ }
+
+ private Optional<Connection> getConnection() {
+ Optional<Configuration> baseConf = getHBaseConfiguration();
+ Connection conn = null;
+ try {
+ conn = ConnectionFactory.createConnection(baseConf.get());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return Optional.ofNullable(conn);
+ }
+
+ public Tuple2<IntPartitioner, TableDescriptor>
+ getPartitionerByTableName (int numPartitions, String tableName)
throws IOException {
+ Optional<Connection> optionalConnection = getConnection();
+ TableDescriptor descriptor = optionalConnection
+ .get()
+ .getTable(TableName.valueOf(tableName))
+ .getDescriptor();
+ LOG.debug("getPartitionerByTableName get TableDescriptor " +
+ descriptor.getTableName());
+ optionalConnection.get().close();
+ return new Tuple2<IntPartitioner,TableDescriptor>(
+ new IntPartitioner(numPartitions, tableName),descriptor);
+ }
+
+ public void loadHfiles (String path, String tableName) throws Exception {
+ Connection conn = getConnection().get();
+ Table table = conn.getTable(TableName.valueOf(tableName));
+ Configuration conf = conn.getConfiguration();
+ BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(conf);
+ bulkLoadHFilesTool.bulkLoad(table.getName(), new Path(path));
+ table.close();
+ conn.close();
+
+ }
+
+ public class IntPartitioner extends Partitioner {
+ private final int numPartitions;
+ public Map<List<String>, Integer> rangeMap = new HashMap<>();
+ private String tableName;
+
+ public IntPartitioner(int numPartitions, String tableName) throws
IOException {
+ this.numPartitions = numPartitions;
+ this.rangeMap = getRangeMap(tableName);
+ this.tableName = tableName;
+ }
+
+ private Map<List<String>, Integer> getRangeMap(String tableName)
throws IOException {
+ Connection conn = getConnection().get();
+ HRegionLocator locator =
+ (HRegionLocator)
conn.getRegionLocator(TableName.valueOf(tableName));
+ Pair<byte[][], byte[][]> startEndKeys = locator.getStartEndKeys();
+ Map<List<String>, Integer> rangeMap = new HashMap<>();
+ for (int i = 0; i < startEndKeys.getFirst().length; i++) {
+ String startKey = Bytes.toString(startEndKeys.getFirst()[i]);
+ String endKey = Bytes.toString(startEndKeys.getSecond()[i]);
+ rangeMap.put(new ArrayList<>(Arrays.asList(startKey, endKey)),
i);
+ }
+ conn.close();
+ return rangeMap;
+ }
+
+ @Override
+ public int numPartitions() {
+ return numPartitions;
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ if (key instanceof ImmutableBytesWritable) {
+
+ try {
+ ImmutableBytesWritable immutableBytesWritableKey =
(ImmutableBytesWritable) key;
+
+ if (rangeMap == null || rangeMap.isEmpty()) {
+ rangeMap = getRangeMap(this.tableName);
+ }
+
+ String keyString =
Bytes.toString(immutableBytesWritableKey.get());
+ for (List<String> range : rangeMap.keySet()) {
+ if (keyString.compareToIgnoreCase(range.get(0)) >= 0 &&
+ ((keyString.compareToIgnoreCase(range.get(1))
< 0) ||
Review Comment:
align with keyString
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable,
KeyValue> {
+
+ private SinkToHBase sinkToHBase;
+ private LoadDistributeMetrics loadDistributeMetrics;
+
+ public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+ public HBaseDirectLoader(LoadOptions loadOptions,
+ InputStruct struct,
+ LoadDistributeMetrics loadDistributeMetrics) {
+ super(loadOptions,struct);
+ this.loadDistributeMetrics = loadDistributeMetrics;
+ this.sinkToHBase = new SinkToHBase(loadOptions);
+
+ }
+
+ public String getTableName() {
+
+ String tableName = null;
+ if (struct.edges().size() > 0) {
+ tableName = this.loadOptions.edgeTablename;
+
+ } else if (struct.vertices().size() > 0) {
+ tableName = this.loadOptions.vertexTablename;
+
+ }
+ return tableName;
+ }
+
+ public Integer getTablePartitions () {
+ return struct.edges().size() > 0 ?
+ loadOptions.edgePartitions :
+ loadOptions.vertexPartitions;
+ }
+
+ public JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildVertexAndEdge(Dataset<Row> ds) {
+ LOG.info("buildAndSer start execute >>>>");
+ JavaPairRDD<ImmutableBytesWritable, KeyValue>
tuple2KeyValueJavaPairRDD =
+ ds.toJavaRDD().mapPartitionsToPair(
+ new PairFlatMapFunction<Iterator<Row>,
ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>>
call(
+ Iterator<Row> rowIterator) throws Exception {
+
+ HBaseSerializer serializer = new HBaseSerializer(
+ HugeClientHolder.create(loadOptions),
+
loadOptions.vertexPartitions,loadOptions.edgePartitions);
+ List<ElementBuilder> buildersForGraphElement =
getElementBuilders();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>>
result =
+ new LinkedList<>();
+ while (rowIterator.hasNext()) {
+ Row row = rowIterator.next();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>>
serList =
+ buildAndSer(serializer,
row,buildersForGraphElement);
+ result.addAll(serList);
+ }
+ serializer.close();
+ return result.iterator();
+ }
+ }
+ );
+ return tuple2KeyValueJavaPairRDD;
+ }
+
+ @Override
+ String generateFiles(JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildAndSerRdd) {
+ LOG.info("bulkload start execute>>>");
+ try {
+ Tuple2<SinkToHBase.IntPartitioner, TableDescriptor> tuple =
+
sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+ Partitioner partitioner = (Partitioner) tuple._1;
+ TableDescriptor tableDescriptor = (TableDescriptor) tuple._2;
+
+ JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRdd =
+
buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+ Configuration conf = sinkToHBase.getHBaseConfiguration().get();
+ Job job = Job.getInstance(conf);
+ HFileOutputFormat2.configureIncrementalLoadMap(job,
tableDescriptor);
+ conf.set("hbase.mapreduce.hfileoutputformat.table.name",
+ tableDescriptor.getTableName().getNameAsString());
+ String path = getHFilePath(job.getConfiguration());
+ repartitionedRdd.saveAsNewAPIHadoopFile(
+ path,
+ ImmutableBytesWritable.class,
+ KeyValue.class,
+ HFileOutputFormat2.class,
+ conf
+ );
+ LOG.info("Saved to HFiles to: " + path);
+ flushPermission(conf,path);
+ return path;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return Constants.EMPTY_STR;
+
+ }
+
+ public String getHFilePath (Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ long timeStr = System.currentTimeMillis();
+ String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" +
"/" + timeStr + "/";
+ Path hfileGenPath = new Path(pathStr);
+ if (fs.exists(hfileGenPath)) {
+ LOG.info("\n delete hfile path \n");
+ fs.delete(hfileGenPath,true);
+ }
+ return pathStr;
+ }
+
+ @Override
+ public void loadFiles(String path) {
+ try {
+ sinkToHBase.loadHfiles(path, getTableName());// BulkLoad HFile to
HBase
+ } catch (Exception e) {
+ e.printStackTrace();
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/LoadMapping.java:
##########
@@ -94,7 +108,7 @@ public void check() throws IllegalArgumentException {
Set<String> uniqueIds = this.structs.stream().map(InputStruct::id)
.collect(Collectors.toSet());
E.checkArgument(this.structs.size() == uniqueIds.size(),
- "The structs cannot contain the same id mapping");
+ "The structs cannot contain the same id mapping");
Review Comment:
keep the origin style
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/LoadMapping.java:
##########
@@ -143,7 +157,7 @@ public List<InputStruct> structsForFailure(LoadOptions
options) {
private Map<String, FailureFile> groupFailureFiles(File pathDir) {
File[] subFiles = pathDir.listFiles();
E.checkArgument(subFiles != null && subFiles.length >= 1,
- "Every input struct should have a failure data file, "
+
+ "Every input struct should have a failure data file, " +
Review Comment:
keep the origin style
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -89,29 +94,98 @@ public HugeGraphSparkLoader(String[] args) {
public void load() {
LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
List<InputStruct> structs = mapping.structs();
+ boolean sinkType = this.loadOptions.sinkType;
+ if(!sinkType){
+
this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
+ }
+ SparkConf conf = new SparkConf()
+ .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")// kryo序列化
+ .set("spark.kryo.registrationRequired", "true");
+ try {
+ conf.registerKryoClasses (
Review Comment:
remove the extra space in "registerKryoClasses ("
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -89,29 +94,98 @@ public HugeGraphSparkLoader(String[] args) {
public void load() {
LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
List<InputStruct> structs = mapping.structs();
+ boolean sinkType = this.loadOptions.sinkType;
+ if(!sinkType){
+
this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
+ }
+ SparkConf conf = new SparkConf()
+ .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")// kryo序列化
+ .set("spark.kryo.registrationRequired", "true");
+ try {
+ conf.registerKryoClasses (
+ new Class[]
+ {
+
org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
+ org.apache.hadoop.hbase.KeyValue.class,
+ org.apache.spark.sql.types.StructType.class,
+ StructField[].class,
+ StructField.class,
+ org.apache.spark.sql.types.LongType$.class,
+ org.apache.spark.sql.types.Metadata.class,
+ org.apache.spark.sql.types.StringType$.class,
+ Class.forName(
+
"org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
+ Class.forName("scala.reflect.ClassTag$$anon$1"),
+ Class.forName("scala.collection.immutable.Set$EmptySet$"),
+ Class.forName("org.apache.spark.sql.types.DoubleType$")
+ });
+ } catch (ClassNotFoundException e) {
+ LOG.error("spark kryo serialized registration failed");
+ }
+ SparkSession session = SparkSession.builder()
+ .config(conf)
+ .getOrCreate();
+ SparkContext sc = session.sparkContext();
- SparkSession session = SparkSession.builder().getOrCreate();
+ LongAccumulator totalInsertSuccess =
sc.longAccumulator("totalInsertSuccess");
for (InputStruct struct : structs) {
+ LOG.info("\n init" + struct.input().asFileSource().path() +
+ " distribute metrics---- \n");
+ LoadDistributeMetrics loadDistributeMetrics = new
LoadDistributeMetrics(struct);
+ loadDistributeMetrics.init(sc);
+ LOG.info("\n load dat info: \t" +
struct.input().asFileSource().path() +
+ "\n start load data ; \n");
Dataset<Row> ds = read(session, struct);
- ds.foreachPartition((Iterator<Row> p) -> {
- LoadContext context = initPartition(this.loadOptions, struct);
- p.forEachRemaining((Row row) -> {
- loadRow(struct, row, p, context);
+ if (sinkType) {
+ LOG.info("\n ------ spark api start load data ------ \n");
+ ds.foreachPartition((Iterator<Row> p) -> {
+ LoadContext context = initPartition(this.loadOptions,
struct);
+ p.forEachRemaining((Row row) -> {
+ loadRow(struct, row, p, context);
+ });
+ context.close();
});
- context.close();
- });
+
+ } else {
+ LOG.info("\n spark bulkload gen hfile start \n");
+ // gen-hfile
+ HBaseDirectLoader directLoader = new
HBaseDirectLoader(loadOptions,
+ struct,loadDistributeMetrics);
+ directLoader.bulkload(ds);
+
+ }
+ collectLoadMetrics(loadDistributeMetrics,totalInsertSuccess);
+ LOG.info(" \n load data info : \t" +
struct.input().asFileSource().path() +
+ "\n load data finish!!!; \n");
}
+ Long totalInsertSuccessCnt = totalInsertSuccess.value();
+ LOG.info("\n ------------The data import task is
complete-------------------\n" +
+ "\n insertSuccess cnt:\t" + totalInsertSuccess + " \n" +
+ "\n ---------------------------------------------\n"
+ );
+
+ sc.stop();
session.close();
session.stop();
+
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -89,29 +94,98 @@ public HugeGraphSparkLoader(String[] args) {
public void load() {
LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
List<InputStruct> structs = mapping.structs();
+ boolean sinkType = this.loadOptions.sinkType;
+ if(!sinkType){
+
this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
+ }
+ SparkConf conf = new SparkConf()
+ .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")// kryo序列化
+ .set("spark.kryo.registrationRequired", "true");
+ try {
+ conf.registerKryoClasses (
+ new Class[]
+ {
+
org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
+ org.apache.hadoop.hbase.KeyValue.class,
+ org.apache.spark.sql.types.StructType.class,
+ StructField[].class,
+ StructField.class,
+ org.apache.spark.sql.types.LongType$.class,
+ org.apache.spark.sql.types.Metadata.class,
+ org.apache.spark.sql.types.StringType$.class,
+ Class.forName(
+
"org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
+ Class.forName("scala.reflect.ClassTag$$anon$1"),
+ Class.forName("scala.collection.immutable.Set$EmptySet$"),
+ Class.forName("org.apache.spark.sql.types.DoubleType$")
+ });
+ } catch (ClassNotFoundException e) {
+ LOG.error("spark kryo serialized registration failed");
+ }
+ SparkSession session = SparkSession.builder()
+ .config(conf)
+ .getOrCreate();
+ SparkContext sc = session.sparkContext();
- SparkSession session = SparkSession.builder().getOrCreate();
+ LongAccumulator totalInsertSuccess =
sc.longAccumulator("totalInsertSuccess");
for (InputStruct struct : structs) {
+ LOG.info("\n init" + struct.input().asFileSource().path() +
Review Comment:
use log format please
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable,
KeyValue> {
+
+ private SinkToHBase sinkToHBase;
+ private LoadDistributeMetrics loadDistributeMetrics;
+
+ public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+ public HBaseDirectLoader(LoadOptions loadOptions,
+ InputStruct struct,
+ LoadDistributeMetrics loadDistributeMetrics) {
+ super(loadOptions,struct);
+ this.loadDistributeMetrics = loadDistributeMetrics;
+ this.sinkToHBase = new SinkToHBase(loadOptions);
+
+ }
+
+ public String getTableName() {
+
+ String tableName = null;
+ if (struct.edges().size() > 0) {
+ tableName = this.loadOptions.edgeTablename;
+
+ } else if (struct.vertices().size() > 0) {
+ tableName = this.loadOptions.vertexTablename;
+
+ }
+ return tableName;
+ }
+
+ public Integer getTablePartitions () {
+ return struct.edges().size() > 0 ?
+ loadOptions.edgePartitions :
+ loadOptions.vertexPartitions;
+ }
+
+ public JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildVertexAndEdge(Dataset<Row> ds) {
+ LOG.info("buildAndSer start execute >>>>");
+ JavaPairRDD<ImmutableBytesWritable, KeyValue>
tuple2KeyValueJavaPairRDD =
+ ds.toJavaRDD().mapPartitionsToPair(
+ new PairFlatMapFunction<Iterator<Row>,
ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>>
call(
+ Iterator<Row> rowIterator) throws Exception {
+
+ HBaseSerializer serializer = new HBaseSerializer(
+ HugeClientHolder.create(loadOptions),
+
loadOptions.vertexPartitions,loadOptions.edgePartitions);
+ List<ElementBuilder> buildersForGraphElement =
getElementBuilders();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>>
result =
+ new LinkedList<>();
+ while (rowIterator.hasNext()) {
+ Row row = rowIterator.next();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>>
serList =
+ buildAndSer(serializer,
row,buildersForGraphElement);
+ result.addAll(serList);
+ }
+ serializer.close();
+ return result.iterator();
+ }
+ }
+ );
+ return tuple2KeyValueJavaPairRDD;
+ }
+
+ @Override
+ String generateFiles(JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildAndSerRdd) {
+ LOG.info("bulkload start execute>>>");
+ try {
+ Tuple2<SinkToHBase.IntPartitioner, TableDescriptor> tuple =
+
sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+ Partitioner partitioner = (Partitioner) tuple._1;
+ TableDescriptor tableDescriptor = (TableDescriptor) tuple._2;
+
+ JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRdd =
+
buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+ Configuration conf = sinkToHBase.getHBaseConfiguration().get();
+ Job job = Job.getInstance(conf);
+ HFileOutputFormat2.configureIncrementalLoadMap(job,
tableDescriptor);
+ conf.set("hbase.mapreduce.hfileoutputformat.table.name",
+ tableDescriptor.getTableName().getNameAsString());
+ String path = getHFilePath(job.getConfiguration());
+ repartitionedRdd.saveAsNewAPIHadoopFile(
+ path,
+ ImmutableBytesWritable.class,
+ KeyValue.class,
+ HFileOutputFormat2.class,
+ conf
+ );
+ LOG.info("Saved to HFiles to: " + path);
+ flushPermission(conf,path);
+ return path;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return Constants.EMPTY_STR;
+
+ }
+
+ public String getHFilePath (Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ long timeStr = System.currentTimeMillis();
+ String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" +
"/" + timeStr + "/";
+ Path hfileGenPath = new Path(pathStr);
+ if (fs.exists(hfileGenPath)) {
+ LOG.info("\n delete hfile path \n");
Review Comment:
improve it
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/EdgeBuilder.java:
##########
@@ -68,7 +69,7 @@ public EdgeBuilder(LoadContext context, InputStruct struct,
public EdgeMapping mapping() {
return this.mapping;
}
-
+
Review Comment:
seems still not updated
##########
hugegraph-loader/assembly/static/example/spark/vertex_person.json:
##########
@@ -0,0 +1,7 @@
+{"name": "marko", "age": "29", "city": "Beijing"}
+{"name": "vadas", "age": "27", "city": "Hongkong"}
+{"name": "josh", "age": "32", "city": "Beijing"}
+{"name": "peter", "age": "35", "city": "Shanghai"}
+{"name": "li,nary", "age": "26", "city": "Wu,han"}
+{"name": "tom", "age": "null", "city": "NULL"}
+
Review Comment:
remove the blank line
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/DirectLoader.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+
+public abstract class DirectLoader<T,R> implements Serializable {
+ LoadOptions loadOptions;
+ InputStruct struct;
+
+ public DirectLoader(LoadOptions loadOptions,
+ InputStruct struct) {
+ this.loadOptions = loadOptions;
+ this.struct = struct;
+ }
+
+ public final void bulkload(Dataset<Row> ds) {
Review Comment:
trim the spaces in `"public final void"`
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/LoadMapping.java:
##########
@@ -122,7 +136,7 @@ public List<InputStruct> structsForFailure(LoadOptions
options) {
String json;
try {
json = FileUtils.readFileToString(failureFile.headerFile,
- charset);
+ charset);
Review Comment:
keep the origin style
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.util;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.spark.Partitioner;
+import org.slf4j.Logger;
+import scala.Tuple2;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.io.IOException;
+import java.util.HashMap;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class SinkToHBase implements Serializable {
+
+ private LoadOptions loadOptions;
+ public static final Logger LOG = Log.logger(SinkToHBase.class);
+
+ public SinkToHBase(LoadOptions loadOptions) {
+ this.loadOptions = loadOptions;
+ }
+
+ public Optional<Configuration> getHBaseConfiguration() {
+ Configuration baseConf = HBaseConfiguration.create();
+ baseConf.set("hbase.zookeeper.quorum", this.loadOptions.hbaseZKQuorum);
+ baseConf.set("hbase.zookeeper.property.clientPort",
this.loadOptions.hbaseZKPort);
+ baseConf.set("zookeeper.znode.parent", this.loadOptions.hbaseZKParent);
+ return Optional.ofNullable(baseConf);
+ }
+
+ private Optional<Connection> getConnection() {
+ Optional<Configuration> baseConf = getHBaseConfiguration();
+ Connection conn = null;
+ try {
+ conn = ConnectionFactory.createConnection(baseConf.get());
+ } catch (IOException e) {
+ e.printStackTrace();
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable,
KeyValue> {
+
+ private SinkToHBase sinkToHBase;
+ private LoadDistributeMetrics loadDistributeMetrics;
+
+ public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+ public HBaseDirectLoader(LoadOptions loadOptions,
+ InputStruct struct,
+ LoadDistributeMetrics loadDistributeMetrics) {
+ super(loadOptions,struct);
+ this.loadDistributeMetrics = loadDistributeMetrics;
+ this.sinkToHBase = new SinkToHBase(loadOptions);
+
+ }
+
+ public String getTableName() {
+
+ String tableName = null;
+ if (struct.edges().size() > 0) {
+ tableName = this.loadOptions.edgeTablename;
+
+ } else if (struct.vertices().size() > 0) {
+ tableName = this.loadOptions.vertexTablename;
+
+ }
+ return tableName;
+ }
+
+ public Integer getTablePartitions () {
+ return struct.edges().size() > 0 ?
+ loadOptions.edgePartitions :
+ loadOptions.vertexPartitions;
+ }
+
+ public JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildVertexAndEdge(Dataset<Row> ds) {
+ LOG.info("buildAndSer start execute >>>>");
+ JavaPairRDD<ImmutableBytesWritable, KeyValue>
tuple2KeyValueJavaPairRDD =
+ ds.toJavaRDD().mapPartitionsToPair(
+ new PairFlatMapFunction<Iterator<Row>,
ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>>
call(
+ Iterator<Row> rowIterator) throws Exception {
+
+ HBaseSerializer serializer = new HBaseSerializer(
+ HugeClientHolder.create(loadOptions),
+
loadOptions.vertexPartitions,loadOptions.edgePartitions);
+ List<ElementBuilder> buildersForGraphElement =
getElementBuilders();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>>
result =
+ new LinkedList<>();
+ while (rowIterator.hasNext()) {
+ Row row = rowIterator.next();
+ List<Tuple2<ImmutableBytesWritable, KeyValue>>
serList =
+ buildAndSer(serializer,
row,buildersForGraphElement);
+ result.addAll(serList);
+ }
+ serializer.close();
+ return result.iterator();
+ }
+ }
+ );
+ return tuple2KeyValueJavaPairRDD;
+ }
+
+ @Override
+ String generateFiles(JavaPairRDD<ImmutableBytesWritable, KeyValue>
buildAndSerRdd) {
+ LOG.info("bulkload start execute>>>");
+ try {
+ Tuple2<SinkToHBase.IntPartitioner, TableDescriptor> tuple =
+
sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+ Partitioner partitioner = (Partitioner) tuple._1;
+ TableDescriptor tableDescriptor = (TableDescriptor) tuple._2;
+
+ JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRdd =
+
buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+ Configuration conf = sinkToHBase.getHBaseConfiguration().get();
+ Job job = Job.getInstance(conf);
+ HFileOutputFormat2.configureIncrementalLoadMap(job,
tableDescriptor);
+ conf.set("hbase.mapreduce.hfileoutputformat.table.name",
+ tableDescriptor.getTableName().getNameAsString());
+ String path = getHFilePath(job.getConfiguration());
+ repartitionedRdd.saveAsNewAPIHadoopFile(
+ path,
+ ImmutableBytesWritable.class,
+ KeyValue.class,
+ HFileOutputFormat2.class,
+ conf
+ );
+ LOG.info("Saved to HFiles to: " + path);
+ flushPermission(conf,path);
+ return path;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return Constants.EMPTY_STR;
+
+ }
+
+ public String getHFilePath (Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ long timeStr = System.currentTimeMillis();
+ String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" +
"/" + timeStr + "/";
+ Path hfileGenPath = new Path(pathStr);
+ if (fs.exists(hfileGenPath)) {
+ LOG.info("\n delete hfile path \n");
+ fs.delete(hfileGenPath,true);
+ }
+ return pathStr;
+ }
+
+ @Override
+ public void loadFiles(String path) {
+ try {
+ sinkToHBase.loadHfiles(path, getTableName());// BulkLoad HFile to
HBase
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void flushPermission (Configuration conf, String path) {
+ FsShell shell = new FsShell(conf);
+ try {
+ LOG.info("shell start execute");
+ shell.run(new String[]{"-chmod", "-R", "777", path});
+ shell.close();
+ } catch (Exception e) {
+ LOG.error("Couldnt change the file permissions " + e +
+ " Please run command:" +
+ "hbase
org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles " + path +
+ " '" + "test" + "'\n" + " to load generated HFiles into
HBase table");
+ }
+ }
+
+ List<Tuple2<ImmutableBytesWritable, KeyValue>> buildAndSer
(HBaseSerializer serializer,
+ Row row,
+
List<ElementBuilder> builders) {
+ List<GraphElement> elementsElement;
+
+ List<Tuple2<ImmutableBytesWritable, KeyValue>> result = new
LinkedList<>();
+
+ for (ElementBuilder builder : builders) {
+ ElementMapping elementMapping = builder.mapping();
+ if (elementMapping.skip()) {
+ continue;
+ }
+ if ("".equals(row.mkString())) {
+ break;
+ }
+ switch (struct.input().type()) {
+ case FILE:
+ case HDFS:
+
Review Comment:
remove the blank line
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.util;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.spark.Partitioner;
+import org.slf4j.Logger;
+import scala.Tuple2;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.io.IOException;
+import java.util.HashMap;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class SinkToHBase implements Serializable {
+
+ private LoadOptions loadOptions;
+ public static final Logger LOG = Log.logger(SinkToHBase.class);
+
+ public SinkToHBase(LoadOptions loadOptions) {
+ this.loadOptions = loadOptions;
+ }
+
+ public Optional<Configuration> getHBaseConfiguration() {
+ Configuration baseConf = HBaseConfiguration.create();
+ baseConf.set("hbase.zookeeper.quorum", this.loadOptions.hbaseZKQuorum);
+ baseConf.set("hbase.zookeeper.property.clientPort",
this.loadOptions.hbaseZKPort);
+ baseConf.set("zookeeper.znode.parent", this.loadOptions.hbaseZKParent);
+ return Optional.ofNullable(baseConf);
+ }
+
+ private Optional<Connection> getConnection() {
+ Optional<Configuration> baseConf = getHBaseConfiguration();
+ Connection conn = null;
+ try {
+ conn = ConnectionFactory.createConnection(baseConf.get());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return Optional.ofNullable(conn);
+ }
+
+ public Tuple2<IntPartitioner, TableDescriptor>
+ getPartitionerByTableName (int numPartitions, String tableName)
throws IOException {
+ Optional<Connection> optionalConnection = getConnection();
+ TableDescriptor descriptor = optionalConnection
+ .get()
+ .getTable(TableName.valueOf(tableName))
+ .getDescriptor();
+ LOG.debug("getPartitionerByTableName get TableDescriptor " +
+ descriptor.getTableName());
+ optionalConnection.get().close();
+ return new Tuple2<IntPartitioner,TableDescriptor>(
+ new IntPartitioner(numPartitions, tableName),descriptor);
+ }
+
+ public void loadHfiles (String path, String tableName) throws Exception {
+ Connection conn = getConnection().get();
+ Table table = conn.getTable(TableName.valueOf(tableName));
+ Configuration conf = conn.getConfiguration();
+ BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(conf);
+ bulkLoadHFilesTool.bulkLoad(table.getName(), new Path(path));
+ table.close();
+ conn.close();
+
+ }
+
+ public class IntPartitioner extends Partitioner {
+ private final int numPartitions;
+ public Map<List<String>, Integer> rangeMap = new HashMap<>();
+ private String tableName;
+
+ public IntPartitioner(int numPartitions, String tableName) throws
IOException {
+ this.numPartitions = numPartitions;
+ this.rangeMap = getRangeMap(tableName);
+ this.tableName = tableName;
+ }
+
+ private Map<List<String>, Integer> getRangeMap(String tableName)
throws IOException {
+ Connection conn = getConnection().get();
+ HRegionLocator locator =
+ (HRegionLocator)
conn.getRegionLocator(TableName.valueOf(tableName));
+ Pair<byte[][], byte[][]> startEndKeys = locator.getStartEndKeys();
+ Map<List<String>, Integer> rangeMap = new HashMap<>();
+ for (int i = 0; i < startEndKeys.getFirst().length; i++) {
+ String startKey = Bytes.toString(startEndKeys.getFirst()[i]);
+ String endKey = Bytes.toString(startEndKeys.getSecond()[i]);
+ rangeMap.put(new ArrayList<>(Arrays.asList(startKey, endKey)),
i);
+ }
+ conn.close();
+ return rangeMap;
+ }
+
+ @Override
+ public int numPartitions() {
+ return numPartitions;
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ if (key instanceof ImmutableBytesWritable) {
+
+ try {
+ ImmutableBytesWritable immutableBytesWritableKey =
(ImmutableBytesWritable) key;
+
+ if (rangeMap == null || rangeMap.isEmpty()) {
+ rangeMap = getRangeMap(this.tableName);
+ }
+
+ String keyString =
Bytes.toString(immutableBytesWritableKey.get());
+ for (List<String> range : rangeMap.keySet()) {
+ if (keyString.compareToIgnoreCase(range.get(0)) >= 0 &&
+ ((keyString.compareToIgnoreCase(range.get(1))
< 0) ||
+ range.get(1).equals(""))) {
+ return rangeMap.get(range);
+ }
+ }
+ LOG.error("Didn't find proper key in rangeMap, so
returning 0 ...");
+ return 0;
+ } catch (Exception e) {
+ LOG.error("When trying to get partitionID, encountered
exception: " + e +
+ "\t key = " + key);
+ return 0;
+ }
+ } else {
+ LOG.error("key is NOT ImmutableBytesWritable type ...");
+ return 0;
+ }
+ }
+ }
+
+}
Review Comment:
expect a blank line
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/EdgeBuilder.java:
##########
@@ -109,6 +110,53 @@ public List<Edge> build(String[] names, Object[] values) {
}
return edges;
}
+
+ @Override
+ public List<Edge> build(Row row) {
+ String[] names = row.schema().fieldNames();
+ Object[] values = new Object[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ values[i] = row.get(i);
+ }
+ if (this.vertexIdsIndex == null ||
+ !Arrays.equals(this.lastNames, names)) {
+ this.vertexIdsIndex = this.extractVertexIdsIndex(names);
+ }
+
+ this.lastNames = names;
+ EdgeKVPairs kvPairs = this.newEdgeKVPairs();
+ kvPairs.source.extractFromEdge(names, values,
+ this.vertexIdsIndex.sourceIndexes);
+ kvPairs.target.extractFromEdge(names, values,
+ this.vertexIdsIndex.targetIndexes);
+ kvPairs.extractProperties(names, values);
+
+ List<Vertex> sources = kvPairs.source.buildVertices(false);
+ List<Vertex> targets = kvPairs.target.buildVertices(false);
+ if (sources.isEmpty() || targets.isEmpty()) {
+ return ImmutableList.of();
+ }
+ E.checkArgument(sources.size() == 1 || targets.size() == 1 ||
+ sources.size() == targets.size(),
+ "The elements number of source and target must be: " +
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/metrics/LoadDistributeMetrics.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.metrics;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.LongAccumulator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class LoadDistributeMetrics implements Serializable {
+
+ private final InputStruct struct;
+ private Map<String, Metrics> vertexDisMetrics;
+ private Map<String, Metrics> edgeDisMetrics;
+
+ public LoadDistributeMetrics(InputStruct struct) {
+ this.struct = struct;
+ this.vertexDisMetrics = new HashMap<>();
+ this.edgeDisMetrics = new HashMap<>();
+ for (VertexMapping mapping : struct.vertices()) {
+ this.vertexDisMetrics.put(mapping.label(), new Metrics());
+ }
+ for (EdgeMapping mapping : struct.edges()) {
+ this.edgeDisMetrics.put(mapping.label(), new Metrics());
+ }
+ }
+
+ public void init(SparkContext sc) {
+ for (VertexMapping mapping : this.struct.vertices()) {
+ Metrics metrics = this.vertexDisMetrics.get(mapping.label());
+ metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_INSERT_SUFFIX);
+ metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_PARSE_SUFFIX);
+
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/LoadMapping.java:
##########
@@ -157,7 +171,7 @@ private Map<String, FailureFile> groupFailureFiles(File
pathDir) {
failureFile.dataFile = subFile;
} else {
E.checkArgument(Constants.HEADER_SUFFIX.equals(suffix),
- "The failure data file must end with %s or %s",
+ "The failure data file must end with %s or %s",
Review Comment:
keep the origin style
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/metrics/LoadDistributeMetrics.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.metrics;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.LongAccumulator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class LoadDistributeMetrics implements Serializable {
+
+ private final InputStruct struct;
+ private Map<String, Metrics> vertexDisMetrics;
+ private Map<String, Metrics> edgeDisMetrics;
+
+ public LoadDistributeMetrics(InputStruct struct) {
+ this.struct = struct;
+ this.vertexDisMetrics = new HashMap<>();
+ this.edgeDisMetrics = new HashMap<>();
+ for (VertexMapping mapping : struct.vertices()) {
+ this.vertexDisMetrics.put(mapping.label(), new Metrics());
+ }
+ for (EdgeMapping mapping : struct.edges()) {
+ this.edgeDisMetrics.put(mapping.label(), new Metrics());
+ }
+ }
+
+ public void init(SparkContext sc) {
+ for (VertexMapping mapping : this.struct.vertices()) {
+ Metrics metrics = this.vertexDisMetrics.get(mapping.label());
+ metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_INSERT_SUFFIX);
+ metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_PARSE_SUFFIX);
+
+ }
+ for (EdgeMapping mapping : this.struct.edges()) {
+ Metrics metrics = this.edgeDisMetrics.get(mapping.label());
+ metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_INSERT_SUFFIX);
+ metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_PARSE_SUFFIX);
+
+ }
+ }
+
+ public void increaseDisVertexParseSuccess(ElementMapping mapping) {
+
+ this.disMetrics(mapping).parseSuccess.add(1);
+ }
+
+ public void pluseDisVertexParseSuccess(ElementMapping mapping, Long count)
{
+ this.disMetrics(mapping).parseSuccess.add(count);
+ }
+
+ public void increaseDisVertexInsertSuccess(ElementMapping mapping) {
+ this.disMetrics(mapping).insertSuccess.add(1);
+ }
+
+ public void plusDisVertexInsertSuccess(ElementMapping mapping, Long count)
{
+ this.disMetrics(mapping).insertSuccess.add(count);
+ }
+
+ public void increaseDisEdgeParseSuccess(ElementMapping mapping) {
+
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/metrics/LoadDistributeMetrics.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.metrics;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.LongAccumulator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class LoadDistributeMetrics implements Serializable {
+
+ private final InputStruct struct;
+ private Map<String, Metrics> vertexDisMetrics;
+ private Map<String, Metrics> edgeDisMetrics;
+
+ public LoadDistributeMetrics(InputStruct struct) {
+ this.struct = struct;
+ this.vertexDisMetrics = new HashMap<>();
+ this.edgeDisMetrics = new HashMap<>();
+ for (VertexMapping mapping : struct.vertices()) {
+ this.vertexDisMetrics.put(mapping.label(), new Metrics());
+ }
+ for (EdgeMapping mapping : struct.edges()) {
+ this.edgeDisMetrics.put(mapping.label(), new Metrics());
+ }
+ }
+
+ public void init(SparkContext sc) {
+ for (VertexMapping mapping : this.struct.vertices()) {
+ Metrics metrics = this.vertexDisMetrics.get(mapping.label());
+ metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_INSERT_SUFFIX);
+ metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_PARSE_SUFFIX);
+
+ }
+ for (EdgeMapping mapping : this.struct.edges()) {
+ Metrics metrics = this.edgeDisMetrics.get(mapping.label());
+ metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_INSERT_SUFFIX);
+ metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_PARSE_SUFFIX);
+
+ }
+ }
+
+ public void increaseDisVertexParseSuccess(ElementMapping mapping) {
+
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -89,29 +94,98 @@ public HugeGraphSparkLoader(String[] args) {
public void load() {
LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
List<InputStruct> structs = mapping.structs();
+ boolean sinkType = this.loadOptions.sinkType;
+ if(!sinkType){
+
this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
+ }
+ SparkConf conf = new SparkConf()
+ .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")// kryo序列化
+ .set("spark.kryo.registrationRequired", "true");
+ try {
+ conf.registerKryoClasses (
+ new Class[]
+ {
+
org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
+ org.apache.hadoop.hbase.KeyValue.class,
+ org.apache.spark.sql.types.StructType.class,
+ StructField[].class,
+ StructField.class,
+ org.apache.spark.sql.types.LongType$.class,
+ org.apache.spark.sql.types.Metadata.class,
+ org.apache.spark.sql.types.StringType$.class,
+ Class.forName(
+
"org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
+ Class.forName("scala.reflect.ClassTag$$anon$1"),
+ Class.forName("scala.collection.immutable.Set$EmptySet$"),
+ Class.forName("org.apache.spark.sql.types.DoubleType$")
+ });
+ } catch (ClassNotFoundException e) {
+ LOG.error("spark kryo serialized registration failed");
+ }
+ SparkSession session = SparkSession.builder()
+ .config(conf)
+ .getOrCreate();
+ SparkContext sc = session.sparkContext();
- SparkSession session = SparkSession.builder().getOrCreate();
+ LongAccumulator totalInsertSuccess =
sc.longAccumulator("totalInsertSuccess");
for (InputStruct struct : structs) {
+ LOG.info("\n init" + struct.input().asFileSource().path() +
+ " distribute metrics---- \n");
+ LoadDistributeMetrics loadDistributeMetrics = new
LoadDistributeMetrics(struct);
+ loadDistributeMetrics.init(sc);
+ LOG.info("\n load dat info: \t" +
struct.input().asFileSource().path() +
+ "\n start load data ; \n");
Dataset<Row> ds = read(session, struct);
- ds.foreachPartition((Iterator<Row> p) -> {
- LoadContext context = initPartition(this.loadOptions, struct);
- p.forEachRemaining((Row row) -> {
- loadRow(struct, row, p, context);
+ if (sinkType) {
+ LOG.info("\n ------ spark api start load data ------ \n");
+ ds.foreachPartition((Iterator<Row> p) -> {
+ LoadContext context = initPartition(this.loadOptions,
struct);
+ p.forEachRemaining((Row row) -> {
+ loadRow(struct, row, p, context);
+ });
+ context.close();
});
- context.close();
- });
+
+ } else {
+ LOG.info("\n spark bulkload gen hfile start \n");
+ // gen-hfile
+ HBaseDirectLoader directLoader = new
HBaseDirectLoader(loadOptions,
+ struct,loadDistributeMetrics);
+ directLoader.bulkload(ds);
+
+ }
+ collectLoadMetrics(loadDistributeMetrics,totalInsertSuccess);
+ LOG.info(" \n load data info : \t" +
struct.input().asFileSource().path() +
+ "\n load data finish!!!; \n");
}
+ Long totalInsertSuccessCnt = totalInsertSuccess.value();
+ LOG.info("\n ------------The data import task is
complete-------------------\n" +
+ "\n insertSuccess cnt:\t" + totalInsertSuccess + " \n" +
+ "\n ---------------------------------------------\n"
+ );
+
+ sc.stop();
session.close();
session.stop();
+
+ }
+
+ private void collectLoadMetrics (LoadDistributeMetrics loadMetrics,
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/metrics/LoadDistributeMetrics.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.metrics;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.LongAccumulator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class LoadDistributeMetrics implements Serializable {
+
+ private final InputStruct struct;
+ private Map<String, Metrics> vertexDisMetrics;
+ private Map<String, Metrics> edgeDisMetrics;
+
+ public LoadDistributeMetrics(InputStruct struct) {
+ this.struct = struct;
+ this.vertexDisMetrics = new HashMap<>();
+ this.edgeDisMetrics = new HashMap<>();
+ for (VertexMapping mapping : struct.vertices()) {
+ this.vertexDisMetrics.put(mapping.label(), new Metrics());
+ }
+ for (EdgeMapping mapping : struct.edges()) {
+ this.edgeDisMetrics.put(mapping.label(), new Metrics());
+ }
+ }
+
+ public void init(SparkContext sc) {
+ for (VertexMapping mapping : this.struct.vertices()) {
+ Metrics metrics = this.vertexDisMetrics.get(mapping.label());
+ metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_INSERT_SUFFIX);
+ metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_PARSE_SUFFIX);
+
+ }
+ for (EdgeMapping mapping : this.struct.edges()) {
+ Metrics metrics = this.edgeDisMetrics.get(mapping.label());
+ metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_INSERT_SUFFIX);
+ metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_PARSE_SUFFIX);
+
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/metrics/LoadDistributeMetrics.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.metrics;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.LongAccumulator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class LoadDistributeMetrics implements Serializable {
+
+ private final InputStruct struct;
+ private Map<String, Metrics> vertexDisMetrics;
+ private Map<String, Metrics> edgeDisMetrics;
+
+ public LoadDistributeMetrics(InputStruct struct) {
+ this.struct = struct;
+ this.vertexDisMetrics = new HashMap<>();
+ this.edgeDisMetrics = new HashMap<>();
+ for (VertexMapping mapping : struct.vertices()) {
+ this.vertexDisMetrics.put(mapping.label(), new Metrics());
+ }
+ for (EdgeMapping mapping : struct.edges()) {
+ this.edgeDisMetrics.put(mapping.label(), new Metrics());
+ }
+ }
+
+ public void init(SparkContext sc) {
+ for (VertexMapping mapping : this.struct.vertices()) {
+ Metrics metrics = this.vertexDisMetrics.get(mapping.label());
+ metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_INSERT_SUFFIX);
+ metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_PARSE_SUFFIX);
+
+ }
+ for (EdgeMapping mapping : this.struct.edges()) {
+ Metrics metrics = this.edgeDisMetrics.get(mapping.label());
+ metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_INSERT_SUFFIX);
+ metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_PARSE_SUFFIX);
+
+ }
+ }
+
+ public void increaseDisVertexParseSuccess(ElementMapping mapping) {
+
+ this.disMetrics(mapping).parseSuccess.add(1);
+ }
+
+ public void pluseDisVertexParseSuccess(ElementMapping mapping, Long count)
{
+ this.disMetrics(mapping).parseSuccess.add(count);
+ }
+
+ public void increaseDisVertexInsertSuccess(ElementMapping mapping) {
+ this.disMetrics(mapping).insertSuccess.add(1);
+ }
+
+ public void plusDisVertexInsertSuccess(ElementMapping mapping, Long count)
{
+ this.disMetrics(mapping).insertSuccess.add(count);
+ }
+
+ public void increaseDisEdgeParseSuccess(ElementMapping mapping) {
+
+ this.disMetrics(mapping).parseSuccess.add(1);
+ }
+
+ public void pluseDisEdgeParseSuccess(ElementMapping mapping, Long count) {
+ this.disMetrics(mapping).parseSuccess.add(count);
+ }
+
+ public void increaseDisEdgeInsertSuccess(ElementMapping mapping) {
+
Review Comment:
ditto
##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/metrics/LoadDistributeMetrics.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.metrics;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.LongAccumulator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class LoadDistributeMetrics implements Serializable {
+
+ private final InputStruct struct;
+ private Map<String, Metrics> vertexDisMetrics;
+ private Map<String, Metrics> edgeDisMetrics;
+
+ public LoadDistributeMetrics(InputStruct struct) {
+ this.struct = struct;
+ this.vertexDisMetrics = new HashMap<>();
+ this.edgeDisMetrics = new HashMap<>();
+ for (VertexMapping mapping : struct.vertices()) {
+ this.vertexDisMetrics.put(mapping.label(), new Metrics());
+ }
+ for (EdgeMapping mapping : struct.edges()) {
+ this.edgeDisMetrics.put(mapping.label(), new Metrics());
+ }
+ }
+
+ public void init(SparkContext sc) {
+ for (VertexMapping mapping : this.struct.vertices()) {
+ Metrics metrics = this.vertexDisMetrics.get(mapping.label());
+ metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_INSERT_SUFFIX);
+ metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_PARSE_SUFFIX);
+
+ }
+ for (EdgeMapping mapping : this.struct.edges()) {
+ Metrics metrics = this.edgeDisMetrics.get(mapping.label());
+ metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_INSERT_SUFFIX);
+ metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR +
Constants.LOAD_DATA_PARSE_SUFFIX);
+
+ }
+ }
+
+ public void increaseDisVertexParseSuccess(ElementMapping mapping) {
+
+ this.disMetrics(mapping).parseSuccess.add(1);
+ }
+
+ public void pluseDisVertexParseSuccess(ElementMapping mapping, Long count)
{
+ this.disMetrics(mapping).parseSuccess.add(count);
+ }
+
+ public void increaseDisVertexInsertSuccess(ElementMapping mapping) {
+ this.disMetrics(mapping).insertSuccess.add(1);
+ }
+
+ public void plusDisVertexInsertSuccess(ElementMapping mapping, Long count)
{
+ this.disMetrics(mapping).insertSuccess.add(count);
+ }
+
+ public void increaseDisEdgeParseSuccess(ElementMapping mapping) {
+
+ this.disMetrics(mapping).parseSuccess.add(1);
+ }
+
+ public void pluseDisEdgeParseSuccess(ElementMapping mapping, Long count) {
+ this.disMetrics(mapping).parseSuccess.add(count);
+ }
+
+ public void increaseDisEdgeInsertSuccess(ElementMapping mapping) {
+
+ this.disMetrics(mapping).insertSuccess.add(1);
+ }
+
+ public void plusDisEdgeInsertSuccess(ElementMapping mapping, Long count) {
+ this.disMetrics(mapping).insertSuccess.add(count);
+ }
+
+ public Long readVertexInsertSuccess() {
+ Long totalCnt = 0L;
+ Collection<Metrics> values = vertexDisMetrics.values();
+ for (Metrics metrics : values) {
+ totalCnt += metrics.insertSuccess();
+ }
+ return totalCnt;
+ }
+
+ public Long readEdgeInsertSuccess() {
+ Long totalCnt = 0L;
+ Collection<Metrics> values = edgeDisMetrics.values();
+ for (Metrics metrics : values) {
+ totalCnt += metrics.insertSuccess();
+ }
+ return totalCnt;
+ }
+
+ private Metrics disMetrics(ElementMapping mapping) {
+ if (mapping.type().isVertex()) {
+ return this.vertexDisMetrics.get(mapping.label());
+ } else {
+ return this.edgeDisMetrics.get(mapping.label());
+ }
+ }
+
+ public static class Metrics implements Serializable {
+
+ private LongAccumulator parseSuccess;
+ private LongAccumulator parseFailure;
+ private LongAccumulator insertSuccess;
+ private LongAccumulator insertFailure;
+
+ public Metrics() {
+
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]