This is an automated email from the ASF dual-hosted git repository.

jermy pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git


The following commit(s) were added to refs/heads/master by this push:
     new 379b2e5  feat: support jdbc source for spark-loader (#306)
379b2e5 is described below

commit 379b2e5f35389b6d833206bffc7ea477ee8f19dd
Author: Simon Cheung <[email protected]>
AuthorDate: Tue Jul 19 11:26:48 2022 +0800

    feat: support jdbc source for spark-loader (#306)
---
 .../loader/spark/HugeGraphSparkLoader.java         | 65 ++++++++++++++--------
 1 file changed, 43 insertions(+), 22 deletions(-)

diff --git 
a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
 
b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
index 733006a..c8a5e0d 100644
--- 
a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
+++ 
b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
@@ -26,29 +26,31 @@ import com.baidu.hugegraph.loader.builder.VertexBuilder;
 import com.baidu.hugegraph.loader.executor.LoadContext;
 import com.baidu.hugegraph.loader.executor.LoadOptions;
 import com.baidu.hugegraph.loader.mapping.EdgeMapping;
-import com.baidu.hugegraph.loader.mapping.VertexMapping;
 import com.baidu.hugegraph.loader.mapping.ElementMapping;
 import com.baidu.hugegraph.loader.mapping.InputStruct;
 import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
 import com.baidu.hugegraph.loader.source.InputSource;
+import com.baidu.hugegraph.loader.source.file.Compression;
 import com.baidu.hugegraph.loader.source.file.FileFilter;
 import com.baidu.hugegraph.loader.source.file.FileFormat;
 import com.baidu.hugegraph.loader.source.file.FileSource;
 import com.baidu.hugegraph.loader.source.file.SkippedLine;
-import com.baidu.hugegraph.loader.source.file.Compression;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
 import com.baidu.hugegraph.loader.util.Printer;
 import com.baidu.hugegraph.structure.GraphElement;
-import com.baidu.hugegraph.structure.graph.UpdateStrategy;
-import com.baidu.hugegraph.structure.graph.Vertex;
-import com.baidu.hugegraph.structure.graph.Edge;
 import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
 import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
 import com.baidu.hugegraph.util.Log;
 
 import org.apache.spark.sql.DataFrameReader;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructField;
 import org.slf4j.Logger;
 
 import java.io.Serializable;
@@ -57,6 +59,9 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+
+import scala.collection.JavaConverters;
 
 public class HugeGraphSparkLoader implements Serializable {
 
@@ -130,8 +135,7 @@ public class HugeGraphSparkLoader implements Serializable {
             List<GraphElement> graphElements = builderMap.getValue();
             if (graphElements.size() > elementMapping.batchSize() ||
                 (!p.hasNext() && graphElements.size() > 0)) {
-                flush(builderMap, context.client().graph(),
-                     this.loadOptions.checkVertex);
+                flush(builderMap, context.client().graph(), 
this.loadOptions.checkVertex);
             }
         }
     }
@@ -139,24 +143,22 @@ public class HugeGraphSparkLoader implements Serializable 
{
     private Dataset<Row> read(SparkSession ss, InputStruct struct) {
         InputSource input = struct.input();
         String charset = input.charset();
-        FileSource fileSource = input.asFileSource();
-
-        String[] header = fileSource.header();
-        String delimiter = fileSource.delimiter();
-        String path = fileSource.path();
-        FileFilter filter = fileSource.filter();
-        FileFormat format = fileSource.format();
-        String dateFormat = fileSource.dateFormat();
-        String timeZone = fileSource.timeZone();
-        SkippedLine skippedLine = fileSource.skippedLine();
-        Compression compression = fileSource.compression();
-        int batchSize = fileSource.batchSize();
-
         DataFrameReader reader = ss.read();
         Dataset<Row> ds;
         switch (input.type()) {
             case FILE:
             case HDFS:
+                FileSource fileSource = input.asFileSource();
+                String[] header = fileSource.header();
+                String delimiter = fileSource.delimiter();
+                String path = fileSource.path();
+                FileFilter filter = fileSource.filter();
+                FileFormat format = fileSource.format();
+                String dateFormat = fileSource.dateFormat();
+                String timeZone = fileSource.timeZone();
+                SkippedLine skippedLine = fileSource.skippedLine();
+                Compression compression = fileSource.compression();
+                int batchSize = fileSource.batchSize();
                 switch (format) {
                     case TEXT:
                         ds = reader.text(path);
@@ -173,7 +175,16 @@ public class HugeGraphSparkLoader implements Serializable {
                 }
                 break;
             case JDBC:
-                // TODO: implement jdbc
+                JDBCSource jdbcSource = (JDBCSource) struct.input();
+                String url = jdbcSource.url() + "/" + jdbcSource.database();
+                String table = jdbcSource.table();
+                String username = jdbcSource.username();
+                String password = jdbcSource.password();
+                Properties properties = new Properties();
+                properties.put("user", username);
+                properties.put("password", password);
+                ds = reader.jdbc(url, table, properties);
+                break;
             default:
                 throw new AssertionError(String.format(
                           "Unsupported input source '%s'", input.type()));
@@ -199,7 +210,17 @@ public class HugeGraphSparkLoader implements Serializable {
                                             .split(fileSource.delimiter()));
                 break;
             case JDBC:
-                //TODO: implement jdbc
+                Object[] structFields = 
JavaConverters.asJavaCollection(row.schema().toList())
+                                                      .toArray();
+                int len = row.schema().length();
+                String[] headers = new String[len];
+                Object[] values = new Object[len];
+                for (int i = 0; i < len; i++) {
+                    headers[i] = ((StructField) structFields[i]).name();
+                    values[i] = row.get(i);
+                }
+                elements = builder.build(headers, values);
+                break;
             default:
                 throw new AssertionError(String.format(
                           "Unsupported input source '%s'",

Reply via email to