This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e6c51a95c7 [Improve][Connector-V2]Support multi-table sink feature for
httpsink (#6316)
e6c51a95c7 is described below
commit e6c51a95c77542b051af4ac2caaf8cff67213666
Author: lizhenglei <[email protected]>
AuthorDate: Sun Feb 18 11:15:24 2024 +0800
[Improve][Connector-V2]Support multi-table sink feature for httpsink (#6316)
---
.../connectors/seatunnel/http/sink/HttpSink.java | 4 +-
.../seatunnel/http/sink/HttpSinkWriter.java | 4 +-
.../seatunnel/e2e/connector/http/HttpIT.java | 16 ++++
.../src/test/resources/fake_to_multitable.conf | 88 ++++++++++++++++++++++
.../src/test/resources/mockserver-config.json | 24 ++++++
5 files changed, 134 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
index 1cf22b0164..da1cb0a8da 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,7 +37,8 @@ import java.io.IOException;
import java.util.Map;
import java.util.stream.Collectors;
-public class HttpSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class HttpSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+ implements SupportMultiTableSink {
protected final HttpParameter httpParameter = new HttpParameter();
protected SeaTunnelRowType seaTunnelRowType;
protected Config pluginConfig;
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
index dc1790733d..0333b8f37a 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.http.sink;
import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
@@ -32,7 +33,8 @@ import java.io.IOException;
import java.util.Objects;
@Slf4j
-public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+ implements SupportMultiTableSinkWriter<Void> {
protected final HttpClientProvider httpClient;
protected final SeaTunnelRowType seaTunnelRowType;
protected final HttpParameter httpParameter;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index 405bc5157f..9dc38cbd1c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -19,7 +19,9 @@ package org.apache.seatunnel.e2e.connector.http;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -44,6 +46,8 @@ public class HttpIT extends TestSuiteBase implements
TestResource {
private static final String TMP_DIR = "/tmp";
+ private static final String successCount = "Total Write Count :
2";
+
private static final String IMAGE = "mockserver/mockserver:5.14.0";
private GenericContainer<?> mockserverContainer;
@@ -162,6 +166,18 @@ public class HttpIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult18.getExitCode());
}
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK/FLINK do not support multiple
table read")
+ @TestTemplate
+ public void testMultiTableHttp(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/fake_to_multitable.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertTrue(execResult.getStdout().contains(successCount));
+ }
+
public String getMockServerConfig() {
return "/mockserver-config.json";
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_multitable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_multitable.conf
new file mode 100644
index 0000000000..7ed2ea8a59
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_multitable.conf
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "http_sink_1"
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "http_sink_2"
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+
+
+sink {
+ Http {
+ url = "http://mockserver:1080/example/httpMultiTableContentSink"
+ headers {
+ token = "9e32e859ef044462a257e1fc76730066"
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
index 4ce23c4acb..42d000f713 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
@@ -4720,5 +4720,29 @@
"Content-Type": "application/json"
}
}
+ },
+ {
+ "httpRequest": {
+ "path": "/example/httpMultiTableContentSink",
+ "method": "POST",
+ "headers": {
+ "token": ["9e32e859ef044462a257e1fc76730066"]
+ }
+ },
+ "httpResponse": {
+ "body": [
+ {
+ "name": "httpMultiTableContentSink",
+ "age": 18
+ },
+ {
+ "name": "pizz2",
+ "age": 19
+ }
+ ],
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
}
]