This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new 7bf9c4465 spark etl support redis and jedis2.9.2 upgrade 3.9.0 (#4534)
7bf9c4465 is described below

commit 7bf9c4465e0bb38bce575033534f971b508fe0dc
Author: ChengJie1053 <[email protected]>
AuthorDate: Thu May 18 14:13:47 2023 +0800

    spark etl support redis and jedis2.9.2 upgrade 3.9.0 (#4534)
---
 .../apache/linkis/server/ticket/RedisClient.scala  |   4 +-
 linkis-engineconn-plugins/spark/pom.xml            |  11 ++
 .../spark/datacalc/sink/RedisSinkConfig.java       |  90 ++++++++++
 .../spark/datacalc/source/RedisSourceConfig.java   |  94 ++++++++++
 .../spark/datacalc/util/PluginUtil.java            |   2 +
 .../spark/datacalc/sink/RedisSink.scala            |  53 ++++++
 .../spark/datacalc/source/RedisSource.scala        |  50 ++++++
 .../spark/datacalc/TestRedisCalc.scala             | 200 +++++++++++++++++++++
 pom.xml                                            |   2 +-
 tool/dependencies/known-dependencies.txt           |   3 +-
 10 files changed, 505 insertions(+), 4 deletions(-)

diff --git 
a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/ticket/RedisClient.scala
 
b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/ticket/RedisClient.scala
index 3ec7495f1..8f09139e0 100644
--- 
a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/ticket/RedisClient.scala
+++ 
b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/ticket/RedisClient.scala
@@ -98,8 +98,8 @@ object RedisClient {
       maxIdle: Int,
       minIdle: Int,
       maxWaitMillis: Long
-  ): GenericObjectPoolConfig[Nothing] = {
-    val poolConfig = new GenericObjectPoolConfig
+  ): GenericObjectPoolConfig[Jedis] = {
+    val poolConfig = new GenericObjectPoolConfig[Jedis]()
     poolConfig.setMaxTotal(maxTotal)
     poolConfig.setMaxIdle(maxIdle)
     poolConfig.setMinIdle(minIdle)
diff --git a/linkis-engineconn-plugins/spark/pom.xml 
b/linkis-engineconn-plugins/spark/pom.xml
index 4897a0c1b..f28b036e2 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -449,6 +449,17 @@
       <version>${spark.hadoop.version}</version>
       <scope>${spark.hadoop.scope}</scope>
     </dependency>
+    <dependency>
+      <groupId>com.redislabs</groupId>
+      <artifactId>spark-redis_${scala.binary.version}</artifactId>
+      <version>2.6.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>redis.clients</groupId>
+          <artifactId>jedis</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/RedisSinkConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/RedisSinkConfig.java
new file mode 100644
index 000000000..cb87b9080
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/RedisSinkConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+public class RedisSinkConfig extends SinkConfig {
+
+  @NotBlank private String host;
+
+  @NotBlank private String port;
+
+  private String dbNum = "0";
+  private String auth = "password";
+
+  @NotBlank private String targetTable;
+
+  @NotBlank
+  @Pattern(
+      regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+      message =
+          "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 
'append', 'ignore', 'error', 'errorifexists'.")
+  private String saveMode = "overwrite";
+
+  public String getSaveMode() {
+    return saveMode;
+  }
+
+  public void setSaveMode(String saveMode) {
+    this.saveMode = saveMode;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public void setHost(String host) {
+    this.host = host;
+  }
+
+  public String getPort() {
+    return port;
+  }
+
+  public void setPort(String port) {
+    this.port = port;
+  }
+
+  public String getDbNum() {
+    return dbNum;
+  }
+
+  public void setDbNum(String dbNum) {
+    this.dbNum = dbNum;
+  }
+
+  public String getTargetTable() {
+    return targetTable;
+  }
+
+  public void setTargetTable(String targetTable) {
+    this.targetTable = targetTable;
+  }
+
+  public String getAuth() {
+    return auth;
+  }
+
+  public void setAuth(String auth) {
+    this.auth = auth;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/RedisSourceConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/RedisSourceConfig.java
new file mode 100644
index 000000000..d13628c82
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/RedisSourceConfig.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+
+public class RedisSourceConfig extends SourceConfig {
+
+  @NotBlank private String host;
+
+  @NotBlank private String port;
+
+  @NotBlank private String serializer = "table";
+
+  private String keysPattern;
+
+  private String sourceTable;
+
+  private String dbNum = "0";
+  private String auth = "password";
+
+  public String getHost() {
+    return host;
+  }
+
+  public void setHost(String host) {
+    this.host = host;
+  }
+
+  public String getPort() {
+    return port;
+  }
+
+  public void setPort(String port) {
+    this.port = port;
+  }
+
+  public String getKeysPattern() {
+    return keysPattern;
+  }
+
+  public void setKeysPattern(String keysPattern) {
+    this.keysPattern = keysPattern;
+  }
+
+  public String getDbNum() {
+    return dbNum;
+  }
+
+  public void setDbNum(String dbNum) {
+    this.dbNum = dbNum;
+  }
+
+  public String getAuth() {
+    return auth;
+  }
+
+  public void setAuth(String auth) {
+    this.auth = auth;
+  }
+
+  public String getSourceTable() {
+    return sourceTable;
+  }
+
+  public void setSourceTable(String sourceTable) {
+    this.sourceTable = sourceTable;
+  }
+
+  public String getSerializer() {
+    return serializer;
+  }
+
+  public void setSerializer(String serializer) {
+    this.serializer = serializer;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
index 8cb7beb4f..9af366430 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
@@ -44,6 +44,7 @@ public class PluginUtil {
     classMap.put("managed_jdbc", ManagedJdbcSource.class);
     classMap.put("jdbc", JdbcSource.class);
     classMap.put("file", FileSource.class);
+    classMap.put("redis", RedisSource.class);
     return classMap;
   }
 
@@ -59,6 +60,7 @@ public class PluginUtil {
     classMap.put("jdbc", JdbcSink.class);
     classMap.put("hive", HiveSink.class);
     classMap.put("file", FileSink.class);
+    classMap.put("redis", RedisSink.class);
     return classMap;
   }
 
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/RedisSink.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/RedisSink.scala
new file mode 100644
index 000000000..be03cac74
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/RedisSink.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import scala.collection.JavaConverters._
+
+class RedisSink extends DataCalcSink[RedisSinkConfig] with Logging {
+
+  def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+    var options = Map(
+      "host" -> config.getHost,
+      "port" -> config.getPort,
+      "dbNum" -> config.getDbNum,
+      "auth" -> config.getAuth,
+      "table" -> config.getTargetTable
+    )
+
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      options = config.getOptions.asScala.toMap ++ options
+    }
+
+    val writer = ds.write.format("org.apache.spark.sql.redis")
+    if (StringUtils.isNotBlank(config.getSaveMode)) {
+      writer.mode(config.getSaveMode)
+    }
+    logger.info(
+      s"Save data to reids host: ${config.getHost}, port: ${config.getPort}, 
dbNum: ${config.getDbNum}, table: ${config.getTargetTable}"
+    )
+    writer.options(options).save()
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/RedisSource.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/RedisSource.scala
new file mode 100644
index 000000000..67b3f2e0a
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/RedisSource.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+class RedisSource extends DataCalcSource[RedisSourceConfig] with Logging {
+
+  override def getData(spark: SparkSession): Dataset[Row] = {
+    val reader = spark.read.format("org.apache.spark.sql.redis")
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      reader.options(config.getOptions)
+    }
+
+    logger.info(
+      s"Load data from reids host: ${config.getHost}, port: ${config.getPort}, 
dbNum: ${config.getDbNum}"
+    )
+
+    config.getSerializer match {
+      case "table" => reader.option("table", config.getSourceTable)
+      case "keysPattern" =>
+        reader.option("keys.pattern", 
config.getKeysPattern).option("infer.schema", "true")
+    }
+    reader
+      .option("host", config.getHost)
+      .option("port", config.getPort)
+      .option("dbNum", config.getDbNum)
+      .option("auth", config.getAuth)
+      .load()
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRedisCalc.scala
 
b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRedisCalc.scala
new file mode 100644
index 000000000..310e8f1ad
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRedisCalc.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.common.io.FsPath
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData
+
+import org.junit.jupiter.api.{Assertions, Test};
+
+class TestRedisCalc {
+
+  val filePath = this.getClass.getResource("/").getFile
+
+  @Test
+  def testRedisWrite: Unit = {
+    // skip os: windows
+    if (!FsPath.WINDOWS) {
+      val data = 
DataCalcGroupData.getData(redisWriteConfigJson.replace("{filePath}", filePath))
+      Assertions.assertTrue(data != null)
+
+      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(sources != null)
+      Assertions.assertTrue(transforms != null)
+      Assertions.assertTrue(sinks != null)
+    }
+  }
+
+  @Test
+  def testRedisReaderTable: Unit = {
+    // skip os: windows
+    if (!FsPath.WINDOWS) {
+      val data = 
DataCalcGroupData.getData(redisTableConfigJson.replace("{filePath}", filePath))
+      Assertions.assertTrue(data != null)
+
+      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(sources != null)
+      Assertions.assertTrue(transforms != null)
+      Assertions.assertTrue(sinks != null)
+    }
+  }
+
+  @Test
+  def testRedisReaderKeysPattern: Unit = {
+    // skip os: windows
+    if (!FsPath.WINDOWS) {
+      val data =
+        
DataCalcGroupData.getData(redisKeysPatternConfigJson.replace("{filePath}", 
filePath))
+      Assertions.assertTrue(data != null)
+
+      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(sources != null)
+      Assertions.assertTrue(transforms != null)
+      Assertions.assertTrue(sinks != null)
+    }
+  }
+
+  val redisWriteConfigJson =
+    """
+      |{
+      |    "sources": [
+      |        {
+      |            "name": "file",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "path": "file://{filePath}/etltest.dolphin",
+      |                "serializer": "csv",
+      |                "options": {
+      |                "header":"true",
+      |                "delimiter":";"
+      |                },
+      |                "columnNames": ["name", "age"]
+      |            }
+      |        }
+      |    ],
+      |    "transformations": [
+      |        {
+      |            "name": "sql",
+      |            "type": "transformation",
+      |            "config": {
+      |                "resultTable": "T111",
+      |                "sql": "select * from T1654611700631"
+      |            }
+      |        }
+      |    ],
+      |    "sinks": [
+      |        {
+      |            "name": "redis",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "saveMode": "overwrite",
+      |                "host":"localhost",
+      |                "port":"6379",
+      |                "auth":"password",
+      |                "targetTable": "redistest"
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
+  val redisKeysPatternConfigJson =
+    """
+      |{
+      |    "sources": [
+      |        {
+      |            "name": "redis",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "host":"localhost",
+      |                "port":"6379",
+      |                "auth":"password",
+      |                "serializer":"keysPattern",
+      |                "keysPattern": "redistest:*"
+      |            }
+      |        }
+      |    ],
+      |    "transformations": [
+      |        {
+      |            "name": "sql",
+      |            "type": "transformation",
+      |            "config": {
+      |                "resultTable": "T111",
+      |                "sql": "select * from T1654611700631"
+      |            }
+      |        }
+      |    ],
+      |    "sinks": [
+      |        {
+      |            "name": "file",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "path": "file:///test",
+      |                "path": "file://{filePath}/csv",
+      |                "saveMode": "overwrite",
+      |                "serializer": "csv"
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
+  val redisTableConfigJson =
+    """
+      |{
+      |    "sources": [
+      |        {
+      |            "name": "redis",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "host":"localhost",
+      |                "port":"6379",
+      |                "auth":"password",
+      |                "serializer":"table",
+      |                "sourceTable": "redistest"
+      |            }
+      |        }
+      |    ],
+      |    "transformations": [
+      |        {
+      |            "name": "sql",
+      |            "type": "transformation",
+      |            "config": {
+      |                "resultTable": "T111",
+      |                "sql": "select * from T1654611700631"
+      |            }
+      |        }
+      |    ],
+      |    "sinks": [
+      |        {
+      |            "name": "file",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "path": "file://{filePath}/csv",
+      |                "saveMode": "overwrite",
+      |                "serializer": "csv"
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
+}
diff --git a/pom.xml b/pom.xml
index 7f65cc1b8..1a51fb4c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,7 +156,7 @@
     <reflections.version>0.9.12</reflections.version>
 
     <!-- dbs -->
-    <jedis.version>2.9.2</jedis.version>
+    <jedis.version>3.9.0</jedis.version>
     <mybatis-plus.version>3.4.3.4</mybatis-plus.version>
     <mysql.connector.version>8.0.28</mysql.connector.version>
     <druid.version>1.2.16</druid.version>
diff --git a/tool/dependencies/known-dependencies.txt 
b/tool/dependencies/known-dependencies.txt
index 642a4b88f..78da16478 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -267,7 +267,7 @@ jdbi3-core-3.4.0.jar
 jdbi3-sqlobject-3.4.0.jar
 jdo-api-3.0.1.jar
 jdom2-2.0.6.jar
-jedis-2.9.2.jar
+jedis-3.9.0.jar
 jersey-apache-client4-1.19.4.jar
 jersey-client-1.19.4.jar
 jersey-client-2.23.1.jar
@@ -495,6 +495,7 @@ snakeyaml-1.33.jar
 snappy-java-1.1.4.jar
 snappy-java-1.1.7.7.jar
 snappy-java-1.1.8.2.jar
+spark-redis_2.12-2.6.0.jar
 spring-aop-5.2.23.RELEASE.jar
 spring-beans-5.2.23.RELEASE.jar
 spring-boot-2.3.12.RELEASE.jar


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to