This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ba89c44 [fix]Fix the error of missing content returned by schema 
change response (#433)
8ba89c44 is described below

commit 8ba89c4488bb7b639566b519da58c698f1e13b2f
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Wed Jul 17 17:22:44 2024 +0800

    [fix]Fix the error of missing content returned by schema change response 
(#433)
---
 .../doris/flink/sink/schema/SchemaChangeManager.java | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
index 27f2aece..c946bee7 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.sink.schema;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.StringUtils;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.codec.binary.Base64;
@@ -198,12 +199,12 @@ public class SchemaChangeManager implements Serializable {
         httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
         httpGet.setEntity(
                 new StringEntity(objectMapper.writeValueAsString(params), 
charsetEncoding));
-        String responseEntity = "";
-        Map<String, Object> responseMap = handleResponse(httpGet, 
responseEntity);
-        return handleSchemaChange(responseMap, responseEntity);
+        String responseEntity = handleResponse(httpGet);
+        return handleSchemaChange(responseEntity);
     }
 
-    private boolean handleSchemaChange(Map<String, Object> responseMap, String 
responseEntity) {
+    private boolean handleSchemaChange(String responseEntity) throws 
JsonProcessingException {
+        Map<String, Object> responseMap = 
objectMapper.readValue(responseEntity, Map.class);
         String code = responseMap.getOrDefault("code", "-1").toString();
         if (code.equals("0")) {
             return true;
@@ -221,9 +222,8 @@ public class SchemaChangeManager implements Serializable {
         }
         LOG.info("Execute SQL: {}", ddl);
         HttpPost httpPost = buildHttpPost(ddl, database);
-        String responseEntity = "";
-        Map<String, Object> responseMap = handleResponse(httpPost, 
responseEntity);
-        return handleSchemaChange(responseMap, responseEntity);
+        String responseEntity = handleResponse(httpPost);
+        return handleSchemaChange(responseEntity);
     }
 
     public HttpPost buildHttpPost(String ddl, String database)
@@ -245,15 +245,13 @@ public class SchemaChangeManager implements Serializable {
         return httpPost;
     }
 
-    private Map<String, Object> handleResponse(HttpUriRequest request, String 
responseEntity) {
+    private String handleResponse(HttpUriRequest request) {
         try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
             CloseableHttpResponse response = httpclient.execute(request);
             final int statusCode = response.getStatusLine().getStatusCode();
             final String reasonPhrase = 
response.getStatusLine().getReasonPhrase();
             if (statusCode == 200 && response.getEntity() != null) {
-                responseEntity = EntityUtils.toString(response.getEntity());
-                Map<String, Object> responseMap = 
objectMapper.readValue(responseEntity, Map.class);
-                return responseMap;
+                return EntityUtils.toString(response.getEntity());
             } else {
                 throw new DorisSchemaChangeException(
                         "Failed to schemaChange, status: "


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to