This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 98231308b Spark etl supports doris (#5058)
98231308b is described below

commit 98231308bdaca462bf8e3232b6f0f3e6e4742320
Author: ChengJie1053 <[email protected]>
AuthorDate: Fri Dec 29 10:53:45 2023 +0800

    Spark etl supports doris (#5058)
    
    * Spark etl supports doris
    
    * Modify known-dependencies.txt
    
    * Optimized code
---
 linkis-engineconn-plugins/spark/scala-2.12/pom.xml |  10 ++
 .../spark/datacalc/TestDorisCala.scala             | 126 +++++++++++++++++++++
 .../spark/datacalc/sink/DorisSinkConfig.java       |  91 +++++++++++++++
 .../spark/datacalc/source/DorisSourceConfig.java   |  74 ++++++++++++
 .../spark/datacalc/util/PluginUtil.java            |   2 +
 .../spark/datacalc/sink/DorisSink.scala            |  57 ++++++++++
 .../spark/datacalc/source/DorisSource.scala        |  49 ++++++++
 tool/dependencies/known-dependencies.txt           |   1 +
 8 files changed, 410 insertions(+)

diff --git a/linkis-engineconn-plugins/spark/scala-2.12/pom.xml 
b/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
index 1ba1a8ad5..840b02dc8 100644
--- a/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
+++ b/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
@@ -31,6 +31,10 @@
     <delta.version>2.0.2</delta.version>
     <hudi.version>0.13.0</hudi.version>
     <solr.version>8.11.0</solr.version>
+
+    <!-- doris-spark -->
+    <spark.doris.version>3.2</spark.doris.version>
+    <spark.doris.connector.version>1.2.0</spark.doris.connector.version>
   </properties>
 
   <dependencies>
@@ -185,6 +189,12 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.doris</groupId>
+      
<artifactId>spark-doris-connector-${spark.doris.version}_${scala.binary.version}</artifactId>
+      <version>${spark.doris.connector.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>com.lucidworks.spark</groupId>
       <artifactId>spark-solr</artifactId>
diff --git 
a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala
 
b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala
new file mode 100644
index 000000000..4ddba1d2f
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.common.io.FsPath
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData
+
+import org.junit.jupiter.api.{Assertions, Test};
+
+class TestDorisCala {
+
+  val filePath = this.getClass.getResource("/").getFile
+
+  @Test
+  def testDorisWrite: Unit = {
+    // skip os: windows
+    if (!FsPath.WINDOWS) {
+      val data = 
DataCalcGroupData.getData(dorisWriteConfigJson.replace("{filePath}", filePath))
+      Assertions.assertTrue(data != null)
+
+      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(sources != null)
+      Assertions.assertTrue(transforms != null)
+      Assertions.assertTrue(sinks != null)
+    }
+  }
+
+  @Test
+  def testDorisReader: Unit = {
+    // skip os: windows
+    if (!FsPath.WINDOWS) {
+      val data =
+        DataCalcGroupData.getData(dorisReaderConfigJson.replace("{filePath}", 
filePath))
+      Assertions.assertTrue(data != null)
+
+      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(sources != null)
+      Assertions.assertTrue(transforms != null)
+      Assertions.assertTrue(sinks != null)
+    }
+  }
+
+  val dorisWriteConfigJson =
+    """
+      |{
+      |    "sources": [
+      |        {
+      |            "name": "file",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "path": "file://{filePath}/etltest.dolphin",
+      |                "serializer": "csv",
+      |                "options": {
+      |                "header":"true",
+      |                "delimiter":";"
+      |                },
+      |                "columnNames": ["name", "age"]
+      |            }
+      |        }
+      |    ],
+      |    "sinks": [
+      |        {
+      |            "name": "doris",
+      |            "type": "sink",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "url": "localhost:8030",
+      |                "user": "root",
+      |                "password": "",
+      |                "targetDatabase": "test",
+      |                "targetTable": "test"
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
+  val dorisReaderConfigJson =
+    """
+      |{
+      |    "sources": [
+      |        {
+      |            "name": "doris",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "url": "localhost:8030",
+      |                "user": "root",
+      |                "password": "",
+      |                "sourceDatabase": "test",
+      |                "sourceTable": "test"
+      |            }
+      |        }
+      |    ],
+      |    "sinks": [
+      |        {
+      |            "name": "file",
+      |            "type": "sink",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "path": "file://{filePath}/json",
+      |                "saveMode": "overwrite",
+      |                "serializer": "json"
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSinkConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSinkConfig.java
new file mode 100644
index 000000000..3c8227fae
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSinkConfig.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+public class DorisSinkConfig extends SinkConfig {
+
+  @NotBlank private String url;
+
+  @NotBlank private String user;
+
+  private String password;
+
+  @NotBlank private String targetDatabase;
+
+  @NotBlank private String targetTable;
+
+  @NotBlank
+  @Pattern(
+      regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+      message =
+          "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 
'append', 'ignore', 'error', 'errorifexists'.")
+  private String saveMode = "overwrite";
+
+  public String getUrl() {
+    return url;
+  }
+
+  public void setUrl(String url) {
+    this.url = url;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public String getTargetDatabase() {
+    return targetDatabase;
+  }
+
+  public void setTargetDatabase(String targetDatabase) {
+    this.targetDatabase = targetDatabase;
+  }
+
+  public String getTargetTable() {
+    return targetTable;
+  }
+
+  public void setTargetTable(String targetTable) {
+    this.targetTable = targetTable;
+  }
+
+  public String getSaveMode() {
+    return saveMode;
+  }
+
+  public void setSaveMode(String saveMode) {
+    this.saveMode = saveMode;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSourceConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSourceConfig.java
new file mode 100644
index 000000000..95a11d89d
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSourceConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+
+public class DorisSourceConfig extends SourceConfig {
+
+  @NotBlank private String url;
+
+  @NotBlank private String user;
+  private String password;
+
+  @NotBlank private String sourceDatabase;
+
+  @NotBlank private String sourceTable;
+
+  public String getUrl() {
+    return url;
+  }
+
+  public void setUrl(String url) {
+    this.url = url;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public String getSourceDatabase() {
+    return sourceDatabase;
+  }
+
+  public void setSourceDatabase(String sourceDatabase) {
+    this.sourceDatabase = sourceDatabase;
+  }
+
+  public String getSourceTable() {
+    return sourceTable;
+  }
+
+  public void setSourceTable(String sourceTable) {
+    this.sourceTable = sourceTable;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
index e27d110c3..2d29c1b55 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
@@ -52,6 +52,7 @@ public class PluginUtil {
     classMap.put("solr", SolrSource.class);
     classMap.put("kafka", KafkaSource.class);
     classMap.put("starrocks", StarrocksSource.class);
+    classMap.put("doris", DorisSource.class);
     return classMap;
   }
 
@@ -75,6 +76,7 @@ public class PluginUtil {
     classMap.put("solr", SolrSink.class);
     classMap.put("kafka", KafkaSink.class);
     classMap.put("starrocks", StarrocksSink.class);
+    classMap.put("doris", DorisSink.class);
     return classMap;
   }
 
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSink.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSink.scala
new file mode 100644
index 000000000..9d5301ced
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSink.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import scala.collection.JavaConverters._
+
+class DorisSink extends DataCalcSink[DorisSinkConfig] with Logging {
+
+  def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+    var options = Map(
+      "doris.fenodes" -> config.getUrl,
+      "user" -> config.getUser,
+      "password" -> config.getPassword,
+      "doris.table.identifier" -> String.format(
+        "%s.%s",
+        config.getTargetDatabase,
+        config.getTargetTable
+      )
+    )
+
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      options = config.getOptions.asScala.toMap ++ options
+    }
+
+    val writer = ds.write.format("doris")
+    if (StringUtils.isNotBlank(config.getSaveMode)) {
+      writer.mode(config.getSaveMode)
+    }
+
+    logger.info(
+      s"Save data from doris url: ${config.getUrl}, targetDatabase: 
${config.getTargetDatabase}, targetTable: ${config.getTargetTable}"
+    )
+    writer.options(options).save()
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSource.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSource.scala
new file mode 100644
index 000000000..a4819f218
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSource.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+class DorisSource extends DataCalcSource[DorisSourceConfig] with Logging {
+
+  override def getData(spark: SparkSession): Dataset[Row] = {
+    val reader = spark.read.format("doris")
+
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      reader.options(config.getOptions)
+    }
+
+    logger.info(
+      s"Load data from Doris url: ${config.getUrl}, sourceDatabase: 
${config.getSourceDatabase}, sourceTable: ${config.getSourceTable}"
+    )
+
+    reader
+      .option(
+        "doris.table.identifier",
+        String.format("%s.%s", config.getSourceDatabase, config.getSourceTable)
+      )
+      .option("doris.fenodes", config.getUrl)
+      .option("user", config.getUser)
+      .option("password", config.getPassword)
+      .load()
+  }
+
+}
diff --git a/tool/dependencies/known-dependencies.txt 
b/tool/dependencies/known-dependencies.txt
index 73f1abede..b826a6749 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -578,6 +578,7 @@ snakeyaml-1.33.jar
 snappy-java-1.1.4.jar
 snappy-java-1.1.7.7.jar
 snappy-java-1.1.8.2.jar
+spark-doris-connector-3.2_2.12-1.2.0.jar
 spark-redis_2.12-2.6.0.jar
 spring-aop-5.2.23.RELEASE.jar
 spring-beans-5.2.23.RELEASE.jar


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to