This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new 7bf9c4465 spark etl support redis and jedis2.9.2 upgrade 3.9.0 (#4534)
7bf9c4465 is described below
commit 7bf9c4465e0bb38bce575033534f971b508fe0dc
Author: ChengJie1053 <[email protected]>
AuthorDate: Thu May 18 14:13:47 2023 +0800
spark etl support redis and jedis2.9.2 upgrade 3.9.0 (#4534)
---
.../apache/linkis/server/ticket/RedisClient.scala | 4 +-
linkis-engineconn-plugins/spark/pom.xml | 11 ++
.../spark/datacalc/sink/RedisSinkConfig.java | 90 ++++++++++
.../spark/datacalc/source/RedisSourceConfig.java | 94 ++++++++++
.../spark/datacalc/util/PluginUtil.java | 2 +
.../spark/datacalc/sink/RedisSink.scala | 53 ++++++
.../spark/datacalc/source/RedisSource.scala | 50 ++++++
.../spark/datacalc/TestRedisCalc.scala | 200 +++++++++++++++++++++
pom.xml | 2 +-
tool/dependencies/known-dependencies.txt | 3 +-
10 files changed, 505 insertions(+), 4 deletions(-)
diff --git
a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/ticket/RedisClient.scala
b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/ticket/RedisClient.scala
index 3ec7495f1..8f09139e0 100644
---
a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/ticket/RedisClient.scala
+++
b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/ticket/RedisClient.scala
@@ -98,8 +98,8 @@ object RedisClient {
maxIdle: Int,
minIdle: Int,
maxWaitMillis: Long
- ): GenericObjectPoolConfig[Nothing] = {
- val poolConfig = new GenericObjectPoolConfig
+ ): GenericObjectPoolConfig[Jedis] = {
+ val poolConfig = new GenericObjectPoolConfig[Jedis]()
poolConfig.setMaxTotal(maxTotal)
poolConfig.setMaxIdle(maxIdle)
poolConfig.setMinIdle(minIdle)
diff --git a/linkis-engineconn-plugins/spark/pom.xml
b/linkis-engineconn-plugins/spark/pom.xml
index 4897a0c1b..f28b036e2 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -449,6 +449,17 @@
<version>${spark.hadoop.version}</version>
<scope>${spark.hadoop.scope}</scope>
</dependency>
+ <dependency>
+ <groupId>com.redislabs</groupId>
+ <artifactId>spark-redis_${scala.binary.version}</artifactId>
+ <version>2.6.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/RedisSinkConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/RedisSinkConfig.java
new file mode 100644
index 000000000..cb87b9080
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/RedisSinkConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+public class RedisSinkConfig extends SinkConfig {
+
+ @NotBlank private String host;
+
+ @NotBlank private String port;
+
+ private String dbNum = "0";
+ private String auth = "password";
+
+ @NotBlank private String targetTable;
+
+ @NotBlank
+ @Pattern(
+ regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+ message =
+ "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite',
'append', 'ignore', 'error', 'errorifexists'.")
+ private String saveMode = "overwrite";
+
+ public String getSaveMode() {
+ return saveMode;
+ }
+
+ public void setSaveMode(String saveMode) {
+ this.saveMode = saveMode;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getPort() {
+ return port;
+ }
+
+ public void setPort(String port) {
+ this.port = port;
+ }
+
+ public String getDbNum() {
+ return dbNum;
+ }
+
+ public void setDbNum(String dbNum) {
+ this.dbNum = dbNum;
+ }
+
+ public String getTargetTable() {
+ return targetTable;
+ }
+
+ public void setTargetTable(String targetTable) {
+ this.targetTable = targetTable;
+ }
+
+ public String getAuth() {
+ return auth;
+ }
+
+ public void setAuth(String auth) {
+ this.auth = auth;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/RedisSourceConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/RedisSourceConfig.java
new file mode 100644
index 000000000..d13628c82
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/RedisSourceConfig.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+
+public class RedisSourceConfig extends SourceConfig {
+
+ @NotBlank private String host;
+
+ @NotBlank private String port;
+
+ @NotBlank private String serializer = "table";
+
+ private String keysPattern;
+
+ private String sourceTable;
+
+ private String dbNum = "0";
+ private String auth = "password";
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getPort() {
+ return port;
+ }
+
+ public void setPort(String port) {
+ this.port = port;
+ }
+
+ public String getKeysPattern() {
+ return keysPattern;
+ }
+
+ public void setKeysPattern(String keysPattern) {
+ this.keysPattern = keysPattern;
+ }
+
+ public String getDbNum() {
+ return dbNum;
+ }
+
+ public void setDbNum(String dbNum) {
+ this.dbNum = dbNum;
+ }
+
+ public String getAuth() {
+ return auth;
+ }
+
+ public void setAuth(String auth) {
+ this.auth = auth;
+ }
+
+ public String getSourceTable() {
+ return sourceTable;
+ }
+
+ public void setSourceTable(String sourceTable) {
+ this.sourceTable = sourceTable;
+ }
+
+ public String getSerializer() {
+ return serializer;
+ }
+
+ public void setSerializer(String serializer) {
+ this.serializer = serializer;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
index 8cb7beb4f..9af366430 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
@@ -44,6 +44,7 @@ public class PluginUtil {
classMap.put("managed_jdbc", ManagedJdbcSource.class);
classMap.put("jdbc", JdbcSource.class);
classMap.put("file", FileSource.class);
+ classMap.put("redis", RedisSource.class);
return classMap;
}
@@ -59,6 +60,7 @@ public class PluginUtil {
classMap.put("jdbc", JdbcSink.class);
classMap.put("hive", HiveSink.class);
classMap.put("file", FileSink.class);
+ classMap.put("redis", RedisSink.class);
return classMap;
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/RedisSink.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/RedisSink.scala
new file mode 100644
index 000000000..be03cac74
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/RedisSink.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import scala.collection.JavaConverters._
+
+class RedisSink extends DataCalcSink[RedisSinkConfig] with Logging {
+
+ def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+ var options = Map(
+ "host" -> config.getHost,
+ "port" -> config.getPort,
+ "dbNum" -> config.getDbNum,
+ "auth" -> config.getAuth,
+ "table" -> config.getTargetTable
+ )
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ options = config.getOptions.asScala.toMap ++ options
+ }
+
+ val writer = ds.write.format("org.apache.spark.sql.redis")
+ if (StringUtils.isNotBlank(config.getSaveMode)) {
+ writer.mode(config.getSaveMode)
+ }
+ logger.info(
+ s"Save data to reids host: ${config.getHost}, port: ${config.getPort},
dbNum: ${config.getDbNum}, table: ${config.getTargetTable}"
+ )
+ writer.options(options).save()
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/RedisSource.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/RedisSource.scala
new file mode 100644
index 000000000..67b3f2e0a
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/RedisSource.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+class RedisSource extends DataCalcSource[RedisSourceConfig] with Logging {
+
+ override def getData(spark: SparkSession): Dataset[Row] = {
+ val reader = spark.read.format("org.apache.spark.sql.redis")
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ reader.options(config.getOptions)
+ }
+
+ logger.info(
+ s"Load data from reids host: ${config.getHost}, port: ${config.getPort},
dbNum: ${config.getDbNum}"
+ )
+
+ config.getSerializer match {
+ case "table" => reader.option("table", config.getSourceTable)
+ case "keysPattern" =>
+ reader.option("keys.pattern",
config.getKeysPattern).option("infer.schema", "true")
+ }
+ reader
+ .option("host", config.getHost)
+ .option("port", config.getPort)
+ .option("dbNum", config.getDbNum)
+ .option("auth", config.getAuth)
+ .load()
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRedisCalc.scala
b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRedisCalc.scala
new file mode 100644
index 000000000..310e8f1ad
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRedisCalc.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.common.io.FsPath
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData
+
+import org.junit.jupiter.api.{Assertions, Test};
+
+class TestRedisCalc {
+
+ val filePath = this.getClass.getResource("/").getFile
+
+ @Test
+ def testRedisWrite: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ val data =
DataCalcGroupData.getData(redisWriteConfigJson.replace("{filePath}", filePath))
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ @Test
+ def testRedisReaderTable: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ val data =
DataCalcGroupData.getData(redisTableConfigJson.replace("{filePath}", filePath))
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ @Test
+ def testRedisReaderKeysPattern: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ val data =
+
DataCalcGroupData.getData(redisKeysPatternConfigJson.replace("{filePath}",
filePath))
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ val redisWriteConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "file",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "path": "file://{filePath}/etltest.dolphin",
+ | "serializer": "csv",
+ | "options": {
+ | "header":"true",
+ | "delimiter":";"
+ | },
+ | "columnNames": ["name", "age"]
+ | }
+ | }
+ | ],
+ | "transformations": [
+ | {
+ | "name": "sql",
+ | "type": "transformation",
+ | "config": {
+ | "resultTable": "T111",
+ | "sql": "select * from T1654611700631"
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "redis",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "saveMode": "overwrite",
+ | "host":"localhost",
+ | "port":"6379",
+ | "auth":"password",
+ | "targetTable": "redistest"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+ val redisKeysPatternConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "redis",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "host":"localhost",
+ | "port":"6379",
+ | "auth":"password",
+ | "serializer":"keysPattern",
+ | "keysPattern": "redistest:*"
+ | }
+ | }
+ | ],
+ | "transformations": [
+ | {
+ | "name": "sql",
+ | "type": "transformation",
+ | "config": {
+ | "resultTable": "T111",
+ | "sql": "select * from T1654611700631"
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "file",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "path": "file:///test",
+ | "path": "file://{filePath}/csv",
+ | "saveMode": "overwrite",
+ | "serializer": "csv"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+ val redisTableConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "redis",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "host":"localhost",
+ | "port":"6379",
+ | "auth":"password",
+ | "serializer":"table",
+ | "sourceTable": "redistest"
+ | }
+ | }
+ | ],
+ | "transformations": [
+ | {
+ | "name": "sql",
+ | "type": "transformation",
+ | "config": {
+ | "resultTable": "T111",
+ | "sql": "select * from T1654611700631"
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "file",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "path": "file://{filePath}/csv",
+ | "saveMode": "overwrite",
+ | "serializer": "csv"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+}
diff --git a/pom.xml b/pom.xml
index 7f65cc1b8..1a51fb4c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,7 +156,7 @@
<reflections.version>0.9.12</reflections.version>
<!-- dbs -->
- <jedis.version>2.9.2</jedis.version>
+ <jedis.version>3.9.0</jedis.version>
<mybatis-plus.version>3.4.3.4</mybatis-plus.version>
<mysql.connector.version>8.0.28</mysql.connector.version>
<druid.version>1.2.16</druid.version>
diff --git a/tool/dependencies/known-dependencies.txt
b/tool/dependencies/known-dependencies.txt
index 642a4b88f..78da16478 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -267,7 +267,7 @@ jdbi3-core-3.4.0.jar
jdbi3-sqlobject-3.4.0.jar
jdo-api-3.0.1.jar
jdom2-2.0.6.jar
-jedis-2.9.2.jar
+jedis-3.9.0.jar
jersey-apache-client4-1.19.4.jar
jersey-client-1.19.4.jar
jersey-client-2.23.1.jar
@@ -495,6 +495,7 @@ snakeyaml-1.33.jar
snappy-java-1.1.4.jar
snappy-java-1.1.7.7.jar
snappy-java-1.1.8.2.jar
+spark-redis_2.12-2.6.0.jar
spring-aop-5.2.23.RELEASE.jar
spring-beans-5.2.23.RELEASE.jar
spring-boot-2.3.12.RELEASE.jar
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]