This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 8ba89c44 [fix]Fix the error of missing content returned by schema change response (#433) 8ba89c44 is described below commit 8ba89c4488bb7b639566b519da58c698f1e13b2f Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed Jul 17 17:22:44 2024 +0800 [fix]Fix the error of missing content returned by schema change response (#433) --- .../doris/flink/sink/schema/SchemaChangeManager.java | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index 27f2aece..c946bee7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -20,6 +20,7 @@ package org.apache.doris.flink.sink.schema; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.StringUtils; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.codec.binary.Base64; @@ -198,12 +199,12 @@ public class SchemaChangeManager implements Serializable { httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); httpGet.setEntity( new StringEntity(objectMapper.writeValueAsString(params), charsetEncoding)); - String responseEntity = ""; - Map<String, Object> responseMap = handleResponse(httpGet, responseEntity); - return handleSchemaChange(responseMap, responseEntity); + String responseEntity = handleResponse(httpGet); + return handleSchemaChange(responseEntity); } - private boolean handleSchemaChange(Map<String, Object> responseMap, String responseEntity) { + private boolean handleSchemaChange(String responseEntity) throws JsonProcessingException { + Map<String, Object> responseMap = objectMapper.readValue(responseEntity, Map.class); String code = responseMap.getOrDefault("code", "-1").toString(); if (code.equals("0")) { return true; @@ -221,9 +222,8 @@ public class SchemaChangeManager implements Serializable { } LOG.info("Execute SQL: {}", ddl); HttpPost httpPost = buildHttpPost(ddl, database); - String responseEntity = ""; - Map<String, Object> responseMap = handleResponse(httpPost, responseEntity); - return handleSchemaChange(responseMap, responseEntity); + String responseEntity = handleResponse(httpPost); + return handleSchemaChange(responseEntity); } public HttpPost buildHttpPost(String ddl, String database) @@ -245,15 +245,13 @@ public class SchemaChangeManager implements Serializable { return httpPost; } - private Map<String, Object> handleResponse(HttpUriRequest request, String responseEntity) { + private String handleResponse(HttpUriRequest request) { try (CloseableHttpClient httpclient = HttpClients.createDefault()) { CloseableHttpResponse response = httpclient.execute(request); final int statusCode = response.getStatusLine().getStatusCode(); final String reasonPhrase = response.getStatusLine().getReasonPhrase(); if (statusCode == 200 && response.getEntity() != null) { - responseEntity = EntityUtils.toString(response.getEntity()); - Map<String, Object> responseMap = objectMapper.readValue(responseEntity, Map.class); - return responseMap; + return EntityUtils.toString(response.getEntity()); } else { throw new DorisSchemaChangeException( "Failed to schemaChange, status: " --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org