TyrantLucifer commented on code in PR #3631: URL: https://github.com/apache/incubator-seatunnel/pull/3631#discussion_r1039109371
########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.client; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +import java.util.List; + +@AllArgsConstructor +@Getter +@Setter Review Comment: Use @Data ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisSinkManager.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.client; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.doris.config.SinkConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; + +import com.google.common.base.Strings; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class DorisSinkManager { + + private final SinkConfig sinkConfig; + private final List<byte[]> batchList; + + private DorisStreamLoadVisitor dorisStreamLoadVisitor; + private ScheduledExecutorService scheduler; + private ScheduledFuture<?> scheduledFuture; + private volatile boolean initialize; + private volatile Exception flushException; + private int batchRowCount = 0; + private long batchBytesSize = 0; + + private Integer batchIntervalMs; Review Comment: ```suggestion private final Integer batchIntervalMs; ``` ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisStreamLoadVisitor.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.client; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.doris.config.SinkConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; +import org.apache.seatunnel.connectors.doris.util.DelimiterParserUtil; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.binary.Base64; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Slf4j +public class DorisStreamLoadVisitor { + private final HttpHelper httpHelper = new HttpHelper(); + private static final int MAX_SLEEP_TIME = 5; + + private final SinkConfig sinkConfig; + private long pos; + private static final String RESULT_FAILED = "Fail"; + private static final String RESULT_SUCCESS = "Success"; + private static final String RESULT_LABEL_EXISTED = "Label Already Exists"; + private static final String LAEBL_STATE_VISIBLE = "VISIBLE"; + private static final String LAEBL_STATE_COMMITTED = "COMMITTED"; + private static final String RESULT_LABEL_PREPARE = "PREPARE"; + private static final String RESULT_LABEL_ABORTED = "ABORTED"; + private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN"; + + private List<String> fieldNames; + + public DorisStreamLoadVisitor(SinkConfig sinkConfig, List<String> fieldNames) { + this.sinkConfig = sinkConfig; + this.fieldNames = fieldNames; + } + + public Boolean doStreamLoad(DorisFlushTuple flushData) throws IOException { + String host = getAvailableHost(); + if (null == host) { + throw new DorisConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "None of the host in `load_url` could be connected."); + } + String loadUrl = String.format("%s/api/%s/%s/_stream_load", host, sinkConfig.getDatabase(), sinkConfig.getTable()); + if (log.isDebugEnabled()) { + log.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); + } + Map<String, Object> loadResult = httpHelper.doHttpPut(loadUrl, joinRows(flushData.getRows(), flushData.getBytes().intValue()), getStreamLoadHttpHeader(flushData.getLabel())); + final String keyStatus = "Status"; + if (null == loadResult || !loadResult.containsKey(keyStatus)) { + throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Unable to flush data to Doris: unknown result status. " + loadResult); + } + if (log.isDebugEnabled()) { + log.debug(new StringBuilder("StreamLoad response:\n").append(JsonUtils.toJsonString(loadResult)).toString()); Review Comment: String.format ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisStreamLoadVisitor.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.client; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.doris.config.SinkConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; +import org.apache.seatunnel.connectors.doris.util.DelimiterParserUtil; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.binary.Base64; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Slf4j +public class DorisStreamLoadVisitor { + private final HttpHelper httpHelper = new HttpHelper(); + private static final int MAX_SLEEP_TIME = 5; + + private final SinkConfig sinkConfig; + private long pos; + private static final String RESULT_FAILED = "Fail"; + private static final String RESULT_SUCCESS = "Success"; + private static final String RESULT_LABEL_EXISTED = "Label Already Exists"; + private static final String LAEBL_STATE_VISIBLE = "VISIBLE"; + private static final String LAEBL_STATE_COMMITTED = "COMMITTED"; + private static final String RESULT_LABEL_PREPARE = "PREPARE"; + private static final String RESULT_LABEL_ABORTED = "ABORTED"; + private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN"; + + private List<String> fieldNames; + + public DorisStreamLoadVisitor(SinkConfig sinkConfig, List<String> fieldNames) { + this.sinkConfig = sinkConfig; + this.fieldNames = fieldNames; + } + + public Boolean doStreamLoad(DorisFlushTuple flushData) throws IOException { + String host = getAvailableHost(); + if (null == host) { + throw new DorisConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "None of the host in `load_url` could be connected."); + } + String loadUrl = String.format("%s/api/%s/%s/_stream_load", host, sinkConfig.getDatabase(), sinkConfig.getTable()); + if (log.isDebugEnabled()) { + log.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); + } + Map<String, Object> loadResult = httpHelper.doHttpPut(loadUrl, joinRows(flushData.getRows(), flushData.getBytes().intValue()), getStreamLoadHttpHeader(flushData.getLabel())); + final String keyStatus = "Status"; + if (null == loadResult || !loadResult.containsKey(keyStatus)) { + throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Unable to flush data to Doris: unknown result status. " + loadResult); + } + if (log.isDebugEnabled()) { + log.debug(new StringBuilder("StreamLoad response:\n").append(JsonUtils.toJsonString(loadResult)).toString()); + } + if (RESULT_FAILED.equals(loadResult.get(keyStatus))) { + StringBuilder errorBuilder = new StringBuilder("Failed to flush data to Doris.\n"); + if (loadResult.containsKey("Message")) { + errorBuilder.append(loadResult.get("Message")); + errorBuilder.append('\n'); + } + if (loadResult.containsKey("ErrorURL")) { + try { + errorBuilder.append(httpHelper.doHttpGet(loadResult.get("ErrorURL").toString())); + errorBuilder.append('\n'); + } catch (IOException e) { + log.warn("Get Error URL failed. {} ", loadResult.get("ErrorURL"), e); + } + } else { + errorBuilder.append(JsonUtils.toJsonString(loadResult)); + errorBuilder.append('\n'); + } + throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, errorBuilder.toString()); + } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) { + log.debug(new StringBuilder("StreamLoad response:\n").append(JsonUtils.toJsonString(loadResult)).toString()); Review Comment: String.format ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/HttpHelper.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.client; + +import org.apache.seatunnel.common.utils.JsonUtils; + +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +@Slf4j +public class HttpHelper { + private static final int DEFAULT_CONNECT_TIMEOUT = 1000000; + + public HttpEntity getHttpEntity(CloseableHttpResponse resp) { + int code = resp.getStatusLine().getStatusCode(); + if (HttpStatus.SC_OK != code) { + log.warn("Request failed with code:{}", code); + return null; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; + } + return respEntity; + } + + public String doHttpGet(String getUrl) throws IOException { + log.info("Executing GET from {}.", getUrl); + try (CloseableHttpClient httpclient = buildHttpClient()) { + HttpGet httpGet = new HttpGet(getUrl); + try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; + } + return EntityUtils.toString(respEntity); + } + } + } + + public Map<String, Object> doHttpGet(String getUrl, Map<String, String> header) throws IOException { + log.info("Executing GET from {}.", getUrl); + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(getUrl); + if (null != header) { + for (Map.Entry<String, String> entry : header.entrySet()) { + httpGet.setHeader(entry.getKey(), String.valueOf(entry.getValue())); + } + } + try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { + HttpEntity respEntity = getHttpEntity(resp); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; + } + return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class); + } + } + } + + @SuppressWarnings("unchecked") + public Map<String, Object> doHttpPut(String url, byte[] data, Map<String, String> header) throws IOException { + final HttpClientBuilder httpClientBuilder = HttpClients.custom() + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + try (CloseableHttpClient httpclient = httpClientBuilder.build()) { + HttpPut httpPut = new HttpPut(url); + if (null != header) { + for (Map.Entry<String, String> entry : header.entrySet()) { + httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); + } + } + httpPut.setEntity(new ByteArrayEntity(data)); + httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); + try (CloseableHttpResponse resp = httpclient.execute(httpPut)) { + int code = resp.getStatusLine().getStatusCode(); + if (HttpStatus.SC_OK != code) { + String errorText; + try { + HttpEntity respEntity = resp.getEntity(); + errorText = EntityUtils.toString(respEntity); + } catch (Exception err) { + errorText = "find errorText failed: " + err.getMessage(); + } + log.warn("Request failed with code:{}, err:{}", code, errorText); + Map<String, Object> errorMap = new HashMap<>(); + errorMap.put("Status", "Fail"); + errorMap.put("Message", errorText); + return errorMap; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; + } + return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class); + } + } + } + + private CloseableHttpClient buildHttpClient() { + final HttpClientBuilder httpClientBuilder = HttpClients.custom() + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + return httpClientBuilder.build(); + } + + public boolean tryHttpConnection(String host) { + try { + URL url = new URL(host); + HttpURLConnection co = (HttpURLConnection) url.openConnection(); + co.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT); + co.connect(); + co.disconnect(); + return true; + } catch (Exception e1) { + log.warn("Failed to connect to address:{}", host, e1); Review Comment: The same as above. ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisSinkManager.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.client; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.doris.config.SinkConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; + +import com.google.common.base.Strings; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class DorisSinkManager { + + private final SinkConfig sinkConfig; + private final List<byte[]> batchList; + + private DorisStreamLoadVisitor dorisStreamLoadVisitor; + private ScheduledExecutorService scheduler; + private ScheduledFuture<?> scheduledFuture; + private volatile boolean initialize; + private volatile Exception flushException; + private int batchRowCount = 0; + private long batchBytesSize = 0; + + private Integer batchIntervalMs; + + public DorisSinkManager(SinkConfig sinkConfig, List<String> fileNames) { + this.sinkConfig = sinkConfig; + this.batchList = new ArrayList<>(); + this.batchIntervalMs = sinkConfig.getBatchIntervalMs(); + dorisStreamLoadVisitor = new DorisStreamLoadVisitor(sinkConfig, fileNames); + } + + private void tryInit() throws IOException { + if (initialize) { + return; + } + initialize = true; + + if (batchIntervalMs != null) { Review Comment: What's the meaning of this logic? I think batchIntervalMs always non null. If this parameter is null, program should throw exception and tips user to config it. ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisSinkManager.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.client; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.doris.config.SinkConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; + +import com.google.common.base.Strings; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class DorisSinkManager { + + private final SinkConfig sinkConfig; + private final List<byte[]> batchList; + + private DorisStreamLoadVisitor dorisStreamLoadVisitor; Review Comment: ```suggestion private final DorisStreamLoadVisitor dorisStreamLoadVisitor; ``` ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/HttpHelper.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.client; + +import org.apache.seatunnel.common.utils.JsonUtils; + +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +@Slf4j +public class HttpHelper { + private static final int DEFAULT_CONNECT_TIMEOUT = 1000000; + + public HttpEntity getHttpEntity(CloseableHttpResponse resp) { + int code = resp.getStatusLine().getStatusCode(); + if (HttpStatus.SC_OK != code) { + log.warn("Request failed with code:{}", code); + return null; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; + } + return respEntity; + } + + public String doHttpGet(String getUrl) throws IOException { + log.info("Executing GET from {}.", getUrl); + try (CloseableHttpClient httpclient = buildHttpClient()) { + HttpGet httpGet = new HttpGet(getUrl); + try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; + } + return EntityUtils.toString(respEntity); + } + } + } + + public Map<String, Object> doHttpGet(String getUrl, Map<String, String> header) throws IOException { + log.info("Executing GET from {}.", getUrl); + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(getUrl); + if (null != header) { + for (Map.Entry<String, String> entry : header.entrySet()) { + httpGet.setHeader(entry.getKey(), String.valueOf(entry.getValue())); + } + } + try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { + HttpEntity respEntity = getHttpEntity(resp); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; Review Comment: Throw exception here? I think if reqeust failed the business whole process should be stopped. ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisSinkManager.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.client; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.doris.config.SinkConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; + +import com.google.common.base.Strings; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class DorisSinkManager { + + private final SinkConfig sinkConfig; + private final List<byte[]> batchList; + + private DorisStreamLoadVisitor dorisStreamLoadVisitor; + private ScheduledExecutorService scheduler; + private ScheduledFuture<?> scheduledFuture; + private volatile boolean initialize; + private volatile Exception flushException; + private int batchRowCount = 0; + private long batchBytesSize = 0; + + private Integer batchIntervalMs; + + public DorisSinkManager(SinkConfig sinkConfig, List<String> fileNames) { + this.sinkConfig = sinkConfig; + this.batchList = new ArrayList<>(); + this.batchIntervalMs = sinkConfig.getBatchIntervalMs(); + dorisStreamLoadVisitor = new DorisStreamLoadVisitor(sinkConfig, fileNames); + } + + private void tryInit() throws IOException { + if (initialize) { + return; + } + initialize = true; + + if (batchIntervalMs != null) { + scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("Doris-sink-output-%s").build()); + scheduledFuture = scheduler.scheduleAtFixedRate( + () -> { + try { + flush(); + } catch (IOException e) { + flushException = e; + } + }, + batchIntervalMs, + batchIntervalMs, + TimeUnit.MILLISECONDS); + } + } + + public synchronized void write(String record) throws IOException { + tryInit(); + checkFlushException(); + byte[] bts = record.getBytes(StandardCharsets.UTF_8); + batchList.add(bts); + batchRowCount++; + batchBytesSize += bts.length; + if (batchRowCount >= sinkConfig.getBatchMaxSize() || batchBytesSize >= sinkConfig.getBatchMaxBytes()) { + flush(); + } + } + + public synchronized void close() throws IOException { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + scheduler.shutdown(); + } + + flush(); + } + + public synchronized void flush() throws IOException { + checkFlushException(); + if (batchList.isEmpty()) { + return; + } + String label = createBatchLabel(); + DorisFlushTuple tuple = new DorisFlushTuple(label, batchBytesSize, new ArrayList<>(batchList)); + for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) { + try { + Boolean successFlag = dorisStreamLoadVisitor.doStreamLoad(tuple); + if (successFlag) { + break; + } + } catch (Exception e) { + log.warn("Writing records to Doris failed, retry times = {}", i, e); + if (i >= sinkConfig.getMaxRetries()) { + throw new DorisConnectorException(DorisConnectorErrorCode.WRITE_RECORDS_FAILED, "The number of retries was exceeded,writing records to Doris failed.", e); + } + + if (e instanceof DorisConnectorException && ((DorisConnectorException) e).needReCreateLabel()) { + String newLabel = createBatchLabel(); + log.warn(String.format("Batch label changed from [%s] to [%s]", tuple.getLabel(), newLabel)); + tuple.setLabel(newLabel); + } + + try { + long backoff = Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i, + sinkConfig.getMaxRetryBackoffMs()); + Thread.sleep(backoff); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, + "Unable to flush, interrupted while doing another attempt.", e); + } + } + } + batchList.clear(); + batchRowCount = 0; + batchBytesSize = 0; + } + + private void checkFlushException() { + if (flushException != null) { + throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, flushException); + } + } + + public String createBatchLabel() { + StringBuilder sb = new StringBuilder(); Review Comment: Use `String.format` instead of it. ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisSinkManager.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.client; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.doris.config.SinkConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; + +import com.google.common.base.Strings; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class DorisSinkManager { + + private final SinkConfig sinkConfig; + private final List<byte[]> batchList; + + private DorisStreamLoadVisitor dorisStreamLoadVisitor; + private ScheduledExecutorService scheduler; + private ScheduledFuture<?> scheduledFuture; + private volatile boolean initialize; + private volatile Exception flushException; + private int batchRowCount = 0; + private long batchBytesSize = 0; + + private Integer batchIntervalMs; + + public DorisSinkManager(SinkConfig sinkConfig, List<String> fileNames) { + this.sinkConfig = sinkConfig; + this.batchList = new ArrayList<>(); + this.batchIntervalMs = sinkConfig.getBatchIntervalMs(); + dorisStreamLoadVisitor = new DorisStreamLoadVisitor(sinkConfig, fileNames); + } + + private void tryInit() throws IOException { + if (initialize) { + return; + } + initialize = true; + + if (batchIntervalMs != null) { + scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("Doris-sink-output-%s").build()); + scheduledFuture = scheduler.scheduleAtFixedRate( + () -> { + try { + flush(); + } catch (IOException e) { + flushException = e; + } + }, + batchIntervalMs, + batchIntervalMs, + TimeUnit.MILLISECONDS); + } + } + + public synchronized void write(String record) throws IOException { + tryInit(); + checkFlushException(); + byte[] bts = record.getBytes(StandardCharsets.UTF_8); + batchList.add(bts); + batchRowCount++; + batchBytesSize += bts.length; + if (batchRowCount >= sinkConfig.getBatchMaxSize() || batchBytesSize >= sinkConfig.getBatchMaxBytes()) { + flush(); + } + } + + public synchronized void close() throws IOException { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + scheduler.shutdown(); + } + + flush(); + } + + public synchronized void flush() throws IOException { + checkFlushException(); + if (batchList.isEmpty()) { + return; + } + String label = createBatchLabel(); + DorisFlushTuple tuple = new DorisFlushTuple(label, batchBytesSize, new ArrayList<>(batchList)); Review Comment: new ArrayList<>(batchList) what's the meaning of it? It can cause memory waste. ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisStreamLoadVisitor.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.client; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.doris.config.SinkConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; +import org.apache.seatunnel.connectors.doris.util.DelimiterParserUtil; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.binary.Base64; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Slf4j +public class DorisStreamLoadVisitor { + private final HttpHelper httpHelper = new HttpHelper(); + private static final int MAX_SLEEP_TIME = 5; + + private final SinkConfig sinkConfig; + private long pos; + private static final String RESULT_FAILED = "Fail"; + private static final String RESULT_SUCCESS = "Success"; + private static final String RESULT_LABEL_EXISTED = "Label Already Exists"; + private static final String LAEBL_STATE_VISIBLE = "VISIBLE"; + private static final String LAEBL_STATE_COMMITTED = "COMMITTED"; + private static final String RESULT_LABEL_PREPARE = "PREPARE"; + private static final String RESULT_LABEL_ABORTED = "ABORTED"; + private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN"; + + private List<String> fieldNames; + + public DorisStreamLoadVisitor(SinkConfig sinkConfig, List<String> fieldNames) { + this.sinkConfig = sinkConfig; + this.fieldNames = fieldNames; + } + + public Boolean doStreamLoad(DorisFlushTuple flushData) throws IOException { + String host = getAvailableHost(); + if (null == host) { + throw new DorisConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "None of the host in `load_url` could be connected."); + } + String loadUrl = String.format("%s/api/%s/%s/_stream_load", host, sinkConfig.getDatabase(), sinkConfig.getTable()); + if (log.isDebugEnabled()) { + log.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); + } + Map<String, Object> loadResult = httpHelper.doHttpPut(loadUrl, joinRows(flushData.getRows(), flushData.getBytes().intValue()), getStreamLoadHttpHeader(flushData.getLabel())); + final String keyStatus = "Status"; + if (null == loadResult || !loadResult.containsKey(keyStatus)) { + throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Unable to flush data to Doris: unknown result status. " + loadResult); + } + if (log.isDebugEnabled()) { + log.debug(new StringBuilder("StreamLoad response:\n").append(JsonUtils.toJsonString(loadResult)).toString()); + } + if (RESULT_FAILED.equals(loadResult.get(keyStatus))) { + StringBuilder errorBuilder = new StringBuilder("Failed to flush data to Doris.\n"); + if (loadResult.containsKey("Message")) { + errorBuilder.append(loadResult.get("Message")); + errorBuilder.append('\n'); + } + if (loadResult.containsKey("ErrorURL")) { + try { + errorBuilder.append(httpHelper.doHttpGet(loadResult.get("ErrorURL").toString())); + errorBuilder.append('\n'); + } catch (IOException e) { + log.warn("Get Error URL failed. {} ", loadResult.get("ErrorURL"), e); + } + } else { + errorBuilder.append(JsonUtils.toJsonString(loadResult)); + errorBuilder.append('\n'); + } + throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, errorBuilder.toString()); + } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) { + log.debug(new StringBuilder("StreamLoad response:\n").append(JsonUtils.toJsonString(loadResult)).toString()); + // has to block-checking the state to get the final result + checkLabelState(host, flushData.getLabel()); + } + return RESULT_SUCCESS.equals(loadResult.get(keyStatus)); + } + + private String getAvailableHost() { + List<String> hostList = sinkConfig.getNodeUrls(); + long tmp = pos + hostList.size(); + for (; pos < tmp; pos++) { + String host = String.format("http://%s", hostList.get((int) (pos % hostList.size()))); + if (httpHelper.tryHttpConnection(host)) { + return host; + } + } + return null; + } + + private byte[] joinRows(List<byte[]> rows, int totalBytes) { + if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) { + Map<String, Object> props = sinkConfig.getStreamLoadProps(); + byte[] lineDelimiter = DelimiterParserUtil.parse((String) props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); + ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); + for (byte[] row : rows) { + bos.put(row); + bos.put(lineDelimiter); + } + return bos.array(); + } + + if (SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getLoadFormat())) { + ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1)); + bos.put("[".getBytes(StandardCharsets.UTF_8)); + byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); + boolean isFirstElement = true; + for (byte[] row : rows) { + if (!isFirstElement) { + bos.put(jsonDelimiter); + } + bos.put(row); + isFirstElement = false; + } + bos.put("]".getBytes(StandardCharsets.UTF_8)); + return bos.array(); + } + throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Failed to join rows data, unsupported `format` from stream load properties:"); + } + + @SuppressWarnings("unchecked") + private void checkLabelState(String host, String label) throws IOException { + int idx = 0; + while (true) { + try { + TimeUnit.SECONDS.sleep(Math.min(++idx, MAX_SLEEP_TIME)); + } catch (InterruptedException ex) { + break; + } + try { + String queryLoadStateUrl = String.format("%s/api/%s/get_load_state?label=%s", host, sinkConfig.getDatabase(), label); + Map<String, Object> result = httpHelper.doHttpGet(queryLoadStateUrl, getLoadStateHttpHeader(label)); + if (result == null) { + throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, String.format("Failed to flush data to Doris, Error " + + "could not get the final state of label[%s].\n", label), null); + } + String labelState = (String) result.get("state"); + if (null == labelState) { + throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, String.format("Failed to flush data to Doris, Error " + + "could not get the final state of label[%s]. response[%s]\n", label, JsonUtils.toJsonString(result)), null); + } + log.info(String.format("Checking label[%s] state[%s]\n", label, labelState)); + switch (labelState) { + case LAEBL_STATE_VISIBLE: + case LAEBL_STATE_COMMITTED: + return; + case RESULT_LABEL_PREPARE: + continue; + case RESULT_LABEL_ABORTED: + throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, String.format("Failed to flush data to Doris, Error " + + "label[%s] state[%s]\n", label, labelState), true); + case RESULT_LABEL_UNKNOWN: + default: + throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, String.format("Failed to flush data to Doris, Error " + + "label[%s] state[%s]\n", label, labelState)); + } + } catch (IOException e) { + throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, e); + } + } + } + + private String getBasicAuthHeader(String username, String password) { + String auth = username + ":" + password; + byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8)); + return new StringBuilder("Basic ").append(new String(encodedAuth)).toString(); Review Comment: String.foramt ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.common.config.TypesafeConfigUtils; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Setter +@Getter +@ToString Review Comment: Use @Data ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.common.config.TypesafeConfigUtils; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Setter +@Getter +@ToString +public class SinkConfig { + + private static final int DEFAULT_BATCH_MAX_SIZE = 1024; + private static final long DEFAULT_BATCH_BYTES = 5 * 1024 * 1024; + + private static final String LOAD_FORMAT = "format"; + private static final StreamLoadFormat DEFAULT_LOAD_FORMAT = StreamLoadFormat.CSV; + private static final String COLUMN_SEPARATOR = "column_separator"; + + public static final Option<List<String>> NODE_URLS = Options.key("nodeUrls") + .listType() + .noDefaultValue() + .withDescription("Doris cluster address, the format is [\"fe_ip:fe_http_port\", ...]"); + + public static final Option<String> USERNAME = Options.key("username") + .stringType() + .noDefaultValue() + .withDescription("Doris user username"); + + public static final Option<String> PASSWORD = Options.key("password") + .stringType() + .noDefaultValue() + .withDescription("Doris user password"); + + public static final Option<String> LABEL_PREFIX = Options.key("labelPrefix") + .stringType() + .noDefaultValue() + .withDescription("The prefix of Doris stream load label"); + + public static final Option<String> DATABASE = Options.key("database") + .stringType() + .noDefaultValue() + .withDescription("The name of Doris database"); + + public static final Option<String> TABLE = Options.key("table") + .stringType() + .noDefaultValue() + .withDescription("The name of Doris table"); + + public static final Option<String> DORIS_SINK_CONFIG_PREFIX = Options.key("sink.properties.") + .stringType() + .noDefaultValue() + .withDescription("The parameter of the stream load data_desc. " + + "The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name "); + + public static final Option<Integer> BATCH_MAX_SIZE = Options.key("batch_max_rows") + .intType() + .defaultValue(DEFAULT_BATCH_MAX_SIZE) + .withDescription("For batch writing, when the number of buffers reaches the number of batch_max_rows or the byte size of batch_max_bytes or the time reaches batch_interval_ms, the data will be flushed into the Doris"); + + public static final Option<Long> BATCH_MAX_BYTES = Options.key("batch_max_bytes") + .longType() + .defaultValue(DEFAULT_BATCH_BYTES) + .withDescription("For batch writing, when the number of buffers reaches the number of batch_max_rows or the byte size of batch_max_bytes or the time reaches batch_interval_ms, the data will be flushed into the Doris"); + + public static final Option<Integer> BATCH_INTERVAL_MS = Options.key("batch_interval_ms") + .intType() + .noDefaultValue() + .withDescription("For batch writing, when the number of buffers reaches the number of batch_max_rows or the byte size of batch_max_bytes or the time reaches batch_interval_ms, the data will be flushed into the Doris"); + + public static final Option<Integer> MAX_RETRIES = Options.key("max_retries") + .intType() + .noDefaultValue() + .withDescription("The number of retries to flush failed"); + + public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS = Options.key("retry_backoff_multiplier_ms") + .intType() + .noDefaultValue() + .withDescription("Using as a multiplier for generating the next delay for backoff"); + + public static final Option<Integer> MAX_RETRY_BACKOFF_MS = Options.key("max_retry_backoff_ms") + .intType() + .noDefaultValue() + .withDescription("The amount of time to wait before attempting to retry a request to Doris"); + + public enum StreamLoadFormat { + CSV, JSON; + public static StreamLoadFormat parse(String format) { + if (StreamLoadFormat.JSON.name().equals(format)) { + return JSON; + } + return CSV; + } + } + + private List<String> nodeUrls; + private String username; + private String password; + private String database; + private String table; + private String labelPrefix; + private String columnSeparator; + private StreamLoadFormat loadFormat = DEFAULT_LOAD_FORMAT; + + private int batchMaxSize = DEFAULT_BATCH_MAX_SIZE; + private long batchMaxBytes = DEFAULT_BATCH_BYTES; + + private Integer batchIntervalMs; + private int maxRetries; + private int retryBackoffMultiplierMs; + private int maxRetryBackoffMs; + + private final Map<String, Object> streamLoadProps = new HashMap<>(); Review Comment: ```suggestion private final Map<String, String> streamLoadProps = new HashMap<>(); ``` ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/HttpHelper.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.client; + +import org.apache.seatunnel.common.utils.JsonUtils; + +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +@Slf4j +public class HttpHelper { + private static final int DEFAULT_CONNECT_TIMEOUT = 1000000; + + public HttpEntity getHttpEntity(CloseableHttpResponse resp) { + int code = resp.getStatusLine().getStatusCode(); + if (HttpStatus.SC_OK != code) { + log.warn("Request failed with code:{}", code); + return null; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; + } + return respEntity; + } + + public String doHttpGet(String getUrl) throws IOException { + log.info("Executing GET from {}.", getUrl); + try (CloseableHttpClient httpclient = buildHttpClient()) { + HttpGet httpGet = new HttpGet(getUrl); + try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; + } + return EntityUtils.toString(respEntity); + } + } + } + + public Map<String, Object> doHttpGet(String getUrl, Map<String, String> header) throws IOException { + log.info("Executing GET from {}.", getUrl); + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(getUrl); + if (null != header) { + for (Map.Entry<String, String> entry : header.entrySet()) { + httpGet.setHeader(entry.getKey(), String.valueOf(entry.getValue())); + } + } + try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { + HttpEntity respEntity = getHttpEntity(resp); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; + } + return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class); + } + } + } + + @SuppressWarnings("unchecked") + public Map<String, Object> doHttpPut(String url, byte[] data, Map<String, String> header) throws IOException { + final HttpClientBuilder httpClientBuilder = HttpClients.custom() + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + try (CloseableHttpClient httpclient = httpClientBuilder.build()) { + HttpPut httpPut = new HttpPut(url); + if (null != header) { + for (Map.Entry<String, String> entry : header.entrySet()) { + httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); + } + } + httpPut.setEntity(new ByteArrayEntity(data)); + httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); + try (CloseableHttpResponse resp = httpclient.execute(httpPut)) { + int code = resp.getStatusLine().getStatusCode(); + if (HttpStatus.SC_OK != code) { + String errorText; + try { + HttpEntity respEntity = resp.getEntity(); + errorText = EntityUtils.toString(respEntity); + } catch (Exception err) { + errorText = "find errorText failed: " + err.getMessage(); + } + log.warn("Request failed with code:{}, err:{}", code, errorText); + Map<String, Object> errorMap = new HashMap<>(); + errorMap.put("Status", "Fail"); + errorMap.put("Message", errorText); + return errorMap; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; Review Comment: The same as above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
