This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e4e8f7fbf [Improve][Connector-V2] Format SeaTunnelRow use 
seatunnel-format-json (#2435)
e4e8f7fbf is described below

commit e4e8f7fbff51a1159ca5bd37be0666961af61d68
Author: TyrantLucifer <[email protected]>
AuthorDate: Wed Aug 17 09:38:47 2022 +0800

    [Improve][Connector-V2] Format SeaTunnelRow use seatunnel-format-json 
(#2435)
---
 .../connector-http/connector-http-base/pom.xml            |  6 ++++++
 .../connectors/seatunnel/http/sink/HttpSinkWriter.java    | 15 ++++++---------
 2 files changed, 12 insertions(+), 9 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml 
b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
index 521cb2bfa..2a062f596 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
@@ -36,6 +36,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
index de02bcd9d..e68a1c471 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
@@ -17,19 +17,19 @@
 
 package org.apache.seatunnel.connectors.seatunnel.http.sink;
 
+import org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import 
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
 import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Objects;
 
 public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
@@ -37,21 +37,18 @@ public class HttpSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
     protected final HttpClientProvider httpClient = 
HttpClientProvider.getInstance();
     protected final SeaTunnelRowType seaTunnelRowType;
     protected final HttpParameter httpParameter;
+    protected final SerializationSchema serializationSchema;
 
     public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType, HttpParameter 
httpParameter) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.httpParameter = httpParameter;
+        this.serializationSchema = new 
JsonSerializationSchema(seaTunnelRowType);
     }
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
-        ObjectMapper objectMapper = new ObjectMapper();
-        HashMap<Object, Object> objectMap = new HashMap<>();
-        int totalFields = seaTunnelRowType.getTotalFields();
-        for (int i = 0; i < totalFields; i++) {
-            objectMap.put(seaTunnelRowType.getFieldName(i), 
element.getField(i));
-        }
-        String body = objectMapper.writeValueAsString(objectMap);
+        byte[] serialize = serializationSchema.serialize(element);
+        String body = new String(serialize);
         try {
             // only support post web hook
             HttpResponse response = httpClient.doPost(httpParameter.getUrl(), 
httpParameter.getHeaders(), body);

Reply via email to