This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3e86102f47 [Feature][Connector-V2]Support Doris Fe Node HA (#8311)
3e86102f47 is described below

commit 3e86102f47a2da80b3d1161d56293bc0ec6c70c6
Author: xiaochen <[email protected]>
AuthorDate: Wed Dec 18 19:13:07 2024 +0800

    [Feature][Connector-V2]Support Doris Fe Node HA (#8311)
---
 .../connectors/doris/rest/RestService.java         | 48 +++++++++++--------
 .../doris/sink/writer/DorisSinkWriter.java         | 42 ++++++++++------
 .../doris/rest/models/RestServiceTest.java         | 56 ----------------------
 3 files changed, 55 insertions(+), 91 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
index 77c23f7341..cb2f33df93 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
@@ -240,22 +240,7 @@ public class RestService implements Serializable {
     }
 
     @VisibleForTesting
-    public static String randomEndpoint(String feNodes, Logger logger)
-            throws DorisConnectorException {
-        logger.trace("Parse fenodes '{}'.", feNodes);
-        if (StringUtils.isEmpty(feNodes)) {
-            String errMsg =
-                    String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, 
"fenodes", feNodes);
-            throw new 
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
-        }
-        List<String> nodes = Arrays.asList(feNodes.split(","));
-        Collections.shuffle(nodes);
-        return nodes.get(0).trim();
-    }
-
-    @VisibleForTesting
-    static String getUriStr(
-            DorisSourceConfig dorisSourceConfig, DorisSourceTable 
dorisSourceTable, Logger logger)
+    static String getUriStr(String node, DorisSourceTable dorisSourceTable, 
Logger logger)
             throws DorisConnectorException {
         String tableIdentifier =
                 dorisSourceTable.getTablePath().getDatabaseName()
@@ -263,7 +248,7 @@ public class RestService implements Serializable {
                         + dorisSourceTable.getTablePath().getTableName();
         String[] identifier = parseIdentifier(tableIdentifier, logger);
         return "http://";
-                + randomEndpoint(dorisSourceConfig.getFrontends(), logger)
+                + node.trim()
                 + API_PREFIX
                 + "/"
                 + identifier[0]
@@ -298,16 +283,37 @@ public class RestService implements Serializable {
         }
         logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
 
-        HttpPost httpPost =
-                new HttpPost(getUriStr(dorisSourceConfig, dorisSourceTable, 
logger) + QUERY_PLAN);
         String entity = "{\"sql\": \"" + sql + "\"}";
         logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
         StringEntity stringEntity = new StringEntity(entity, 
StandardCharsets.UTF_8);
         stringEntity.setContentEncoding("UTF-8");
         stringEntity.setContentType("application/json");
-        httpPost.setEntity(stringEntity);
 
-        String resStr = send(dorisSourceConfig, httpPost, logger);
+        List<String> feNodes = 
Arrays.asList(dorisSourceConfig.getFrontends().split(","));
+        Collections.shuffle(feNodes);
+        int feNodesNum = feNodes.size();
+        String resStr = null;
+
+        for (int i = 0; i < feNodesNum; i++) {
+            try {
+                HttpPost httpPost =
+                        new HttpPost(
+                                getUriStr(feNodes.get(i), dorisSourceTable, 
logger) + QUERY_PLAN);
+                httpPost.setEntity(stringEntity);
+                resStr = send(dorisSourceConfig, httpPost, logger);
+                break;
+            } catch (Exception e) {
+                if (i == feNodesNum - 1) {
+                    throw new DorisConnectorException(
+                            DorisConnectorErrorCode.REST_SERVICE_FAILED, e);
+                }
+                log.error(
+                        "Find partition error for feNode: {} with exception: 
{}",
+                        feNodes.get(i),
+                        e.getMessage());
+            }
+        }
+
         logger.debug("Find partition response is '{}'.", resStr);
         QueryPlan queryPlan = getQueryPlan(resStr, logger);
         Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, 
logger);
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index fa0d671e82..2d40ea7bdd 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.doris.rest.RestService;
 import org.apache.seatunnel.connectors.doris.rest.models.RespContent;
 import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer;
 import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer;
@@ -98,21 +97,36 @@ public class DorisSinkWriter
     }
 
     private void initializeLoad() {
-        String backend = 
RestService.randomEndpoint(dorisSinkConfig.getFrontends(), log);
-        try {
-            this.dorisStreamLoad =
-                    new DorisStreamLoad(
-                            backend,
-                            catalogTable.getTablePath(),
-                            dorisSinkConfig,
-                            labelGenerator,
-                            new HttpUtil().getHttpClient());
-            if (dorisSinkConfig.getEnable2PC()) {
-                dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 
1);
+
+        List<String> feNodes = 
Arrays.asList(dorisSinkConfig.getFrontends().split(","));
+        Collections.shuffle(feNodes);
+        int feNodesNum = feNodes.size();
+
+        for (int i = 0; i < feNodesNum; i++) {
+            try {
+                this.dorisStreamLoad =
+                        new DorisStreamLoad(
+                                feNodes.get(i),
+                                catalogTable.getTablePath(),
+                                dorisSinkConfig,
+                                labelGenerator,
+                                new HttpUtil().getHttpClient());
+                if (dorisSinkConfig.getEnable2PC()) {
+                    dorisStreamLoad.abortPreCommit(labelPrefix, 
lastCheckpointId + 1);
+                }
+                break;
+            } catch (Exception e) {
+                if (i == feNodesNum - 1) {
+                    throw new DorisConnectorException(
+                            DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
+                }
+                log.error(
+                        "stream load error for feNode: {} with exception: {}",
+                        feNodes.get(i),
+                        e.getMessage());
             }
-        } catch (Exception e) {
-            throw new 
DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
         }
+
         startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
         // when uploading data in streaming mode, we need to regularly detect 
whether there are
         // exceptions.
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java
 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java
deleted file mode 100644
index aa917d5766..0000000000
--- 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import org.apache.seatunnel.connectors.doris.rest.RestService;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class RestServiceTest {
-
-    @Test
-    void testRandomEndpoint() {
-
-        List<String> list =
-                Arrays.asList(
-                        "fe_host1:fe_http_port1",
-                        "fe_host2:fe_http_port2",
-                        "fe_host3:fe_http_port3",
-                        "fe_host4:fe_http_port4",
-                        "fe_host5:fe_http_port5");
-
-        boolean hasDifferentAddress = false;
-        for (int i = 0; i < 5; i++) {
-            Set<String> addresses =
-                    list.stream()
-                            .map(address -> 
RestService.randomEndpoint(String.join(",", list), log))
-                            .collect(Collectors.toSet());
-            hasDifferentAddress = addresses.size() > 1;
-        }
-        Assertions.assertTrue(hasDifferentAddress);
-    }
-}

Reply via email to