This is an automated email from the ASF dual-hosted git repository.
jermy pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git
The following commit(s) were added to refs/heads/master by this push:
new 379b2e5 feat: support jdbc source for spark-loader (#306)
379b2e5 is described below
commit 379b2e5f35389b6d833206bffc7ea477ee8f19dd
Author: Simon Cheung <[email protected]>
AuthorDate: Tue Jul 19 11:26:48 2022 +0800
feat: support jdbc source for spark-loader (#306)
---
.../loader/spark/HugeGraphSparkLoader.java | 65 ++++++++++++++--------
1 file changed, 43 insertions(+), 22 deletions(-)
diff --git
a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
index 733006a..c8a5e0d 100644
---
a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
+++
b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
@@ -26,29 +26,31 @@ import com.baidu.hugegraph.loader.builder.VertexBuilder;
import com.baidu.hugegraph.loader.executor.LoadContext;
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.loader.mapping.EdgeMapping;
-import com.baidu.hugegraph.loader.mapping.VertexMapping;
import com.baidu.hugegraph.loader.mapping.ElementMapping;
import com.baidu.hugegraph.loader.mapping.InputStruct;
import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
import com.baidu.hugegraph.loader.source.InputSource;
+import com.baidu.hugegraph.loader.source.file.Compression;
import com.baidu.hugegraph.loader.source.file.FileFilter;
import com.baidu.hugegraph.loader.source.file.FileFormat;
import com.baidu.hugegraph.loader.source.file.FileSource;
import com.baidu.hugegraph.loader.source.file.SkippedLine;
-import com.baidu.hugegraph.loader.source.file.Compression;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
import com.baidu.hugegraph.loader.util.Printer;
import com.baidu.hugegraph.structure.GraphElement;
-import com.baidu.hugegraph.structure.graph.UpdateStrategy;
-import com.baidu.hugegraph.structure.graph.Vertex;
-import com.baidu.hugegraph.structure.graph.Edge;
import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
import com.baidu.hugegraph.util.Log;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructField;
import org.slf4j.Logger;
import java.io.Serializable;
@@ -57,6 +59,9 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+
+import scala.collection.JavaConverters;
public class HugeGraphSparkLoader implements Serializable {
@@ -130,8 +135,7 @@ public class HugeGraphSparkLoader implements Serializable {
List<GraphElement> graphElements = builderMap.getValue();
if (graphElements.size() > elementMapping.batchSize() ||
(!p.hasNext() && graphElements.size() > 0)) {
- flush(builderMap, context.client().graph(),
- this.loadOptions.checkVertex);
+ flush(builderMap, context.client().graph(),
this.loadOptions.checkVertex);
}
}
}
@@ -139,24 +143,22 @@ public class HugeGraphSparkLoader implements Serializable
{
private Dataset<Row> read(SparkSession ss, InputStruct struct) {
InputSource input = struct.input();
String charset = input.charset();
- FileSource fileSource = input.asFileSource();
-
- String[] header = fileSource.header();
- String delimiter = fileSource.delimiter();
- String path = fileSource.path();
- FileFilter filter = fileSource.filter();
- FileFormat format = fileSource.format();
- String dateFormat = fileSource.dateFormat();
- String timeZone = fileSource.timeZone();
- SkippedLine skippedLine = fileSource.skippedLine();
- Compression compression = fileSource.compression();
- int batchSize = fileSource.batchSize();
-
DataFrameReader reader = ss.read();
Dataset<Row> ds;
switch (input.type()) {
case FILE:
case HDFS:
+ FileSource fileSource = input.asFileSource();
+ String[] header = fileSource.header();
+ String delimiter = fileSource.delimiter();
+ String path = fileSource.path();
+ FileFilter filter = fileSource.filter();
+ FileFormat format = fileSource.format();
+ String dateFormat = fileSource.dateFormat();
+ String timeZone = fileSource.timeZone();
+ SkippedLine skippedLine = fileSource.skippedLine();
+ Compression compression = fileSource.compression();
+ int batchSize = fileSource.batchSize();
switch (format) {
case TEXT:
ds = reader.text(path);
@@ -173,7 +175,16 @@ public class HugeGraphSparkLoader implements Serializable {
}
break;
case JDBC:
- // TODO: implement jdbc
+ JDBCSource jdbcSource = (JDBCSource) struct.input();
+ String url = jdbcSource.url() + "/" + jdbcSource.database();
+ String table = jdbcSource.table();
+ String username = jdbcSource.username();
+ String password = jdbcSource.password();
+ Properties properties = new Properties();
+ properties.put("user", username);
+ properties.put("password", password);
+ ds = reader.jdbc(url, table, properties);
+ break;
default:
throw new AssertionError(String.format(
"Unsupported input source '%s'", input.type()));
@@ -199,7 +210,17 @@ public class HugeGraphSparkLoader implements Serializable {
.split(fileSource.delimiter()));
break;
case JDBC:
- //TODO: implement jdbc
+ Object[] structFields =
JavaConverters.asJavaCollection(row.schema().toList())
+ .toArray();
+ int len = row.schema().length();
+ String[] headers = new String[len];
+ Object[] values = new Object[len];
+ for (int i = 0; i < len; i++) {
+ headers[i] = ((StructField) structFields[i]).name();
+ values[i] = row.get(i);
+ }
+ elements = builder.build(headers, values);
+ break;
default:
throw new AssertionError(String.format(
"Unsupported input source '%s'",