This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch sink
in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git

commit feeb55c13d815a7920a81ef2420ad9ed50bc25c4
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Apr 7 10:20:00 2025 +0800

    refactor
---
 .../sink/event/PipeRawTabletInsertionEvent.java    |   8 ++
 .../sink/event/PipeTsFileInsertionEvent.java       |   4 +
 .../builtin/sink/protocol/IoTDBConnector.java      |  30 ++---
 .../sink/protocol/IoTDBDataNodeSyncConnector.java  | 104 -----------------
 .../protocol/IoTDBDataRegionSyncConnector.java     | 124 ++++++++++-----------
 .../sink/protocol/IoTDBSslSyncConnector.java       |   5 +-
 .../sink/protocol/PipeReceiverStatusHandler.java   |  55 +--------
 7 files changed, 93 insertions(+), 237 deletions(-)

diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeRawTabletInsertionEvent.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeRawTabletInsertionEvent.java
index c6a9e27..3ea3452 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeRawTabletInsertionEvent.java
@@ -45,6 +45,10 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
     throw new UnsupportedOperationException();
   }
 
+  public Tablet getTablet() {
+    return tablet;
+  }
+
   public boolean isAligned() {
     return isAligned;
   }
@@ -53,6 +57,10 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
     return false;
   }
 
+  public String getDeviceId() {
+    return null;
+  }
+
   public String getTableModelDatabaseName() {
     return null;
   }
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeTsFileInsertionEvent.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeTsFileInsertionEvent.java
index a134208..2fc5e9a 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeTsFileInsertionEvent.java
@@ -39,6 +39,10 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent implements TsFi
     throw new PipeException("");
   }
 
+  public File getTsFile() {
+    return tsFile;
+  }
+
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() throws 
PipeException {
     return toTabletInsertionEvents(Long.MAX_VALUE);
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBConnector.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBConnector.java
index 141bd64..7092314 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBConnector.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBConnector.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeCo
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.rpc.UrlUtils;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import org.slf4j.Logger;
@@ -368,7 +369,7 @@ public abstract class IoTDBConnector implements 
PipeConnector {
 
       if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) {
         givenNodeUrls.addAll(
-            NodeUrlUtils.parseTEndPointUrls(
+            parseTEndPointUrls(
                 Arrays.asList(
                     parameters
                         .getStringByKeys(CONNECTOR_IOTDB_NODE_URLS_KEY)
@@ -378,7 +379,7 @@ public abstract class IoTDBConnector implements 
PipeConnector {
 
       if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) {
         givenNodeUrls.addAll(
-            NodeUrlUtils.parseTEndPointUrls(
+            parseTEndPointUrls(
                 Arrays.asList(
                     parameters
                         .getStringByKeys(SINK_IOTDB_NODE_URLS_KEY)
@@ -394,8 +395,19 @@ public abstract class IoTDBConnector implements 
PipeConnector {
 
     return givenNodeUrls;
   }
+  
+  private static List<TEndPoint> parseTEndPointUrls(List<String> endPointUrls) 
{
+    if (endPointUrls == null) {
+      throw new NumberFormatException("endPointUrls is null");
+    }
+    List<TEndPoint> result = new ArrayList<>();
+    for (String url : endPointUrls) {
+      result.add(UrlUtils.parseTEndPointIpv4AndIpv6Url(url));
+    }
+    return result;
+  }
 
-  private void checkNodeUrls(final Set<TEndPoint> nodeUrls) throws 
PipeParameterNotValidException {
+  private static void checkNodeUrls(final Set<TEndPoint> nodeUrls) throws 
PipeParameterNotValidException {
     for (final TEndPoint nodeUrl : nodeUrls) {
       if (Objects.isNull(nodeUrl.ip) || nodeUrl.ip.isEmpty()) {
         LOGGER.warn(PARSE_URL_ERROR_FORMATTER, "host cannot be empty");
@@ -413,16 +425,4 @@ public abstract class IoTDBConnector implements 
PipeConnector {
         ? PipeTransferCompressedReq.toTPipeTransferReq(req, compressors)
         : req;
   }
-
-  public boolean isRpcCompressionEnabled() {
-    return isRpcCompressionEnabled;
-  }
-
-  public List<PipeCompressor> getCompressors() {
-    return compressors;
-  }
-
-  public PipeReceiverStatusHandler statusHandler() {
-    return receiverStatusHandler;
-  }
 }
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataNodeSyncConnector.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataNodeSyncConnector.java
deleted file mode 100644
index d7c0a77..0000000
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataNodeSyncConnector.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.plugin.builtin.sink.protocol;
-
-import org.apache.iotdb.collector.config.PipeOptions;
-import 
org.apache.iotdb.collector.plugin.builtin.sink.client.IoTDBDataNodeSyncClientManager;
-import 
org.apache.iotdb.collector.plugin.builtin.sink.client.IoTDBSyncClientManager;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.UnknownHostException;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector 
{
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataNodeSyncConnector.class);
-
-  protected IoTDBDataNodeSyncClientManager clientManager;
-
-  @Override
-  public void validate(final PipeParameterValidator validator) throws 
Exception {
-    super.validate(validator);
-
-    final Set<TEndPoint> givenNodeUrls = 
parseNodeUrls(validator.getParameters());
-
-    validator.validate(
-        empty -> {
-          try {
-            // Ensure the sink doesn't point to the thrift receiver on 
DataNode itself
-            return !NodeUrlUtils.containsLocalAddress(
-                givenNodeUrls.stream()
-                    .filter(tEndPoint -> tEndPoint.getPort() == 
PipeOptions.RPC_PORT.value())
-                    .map(TEndPoint::getIp)
-                    .collect(Collectors.toList()));
-          } catch (final UnknownHostException e) {
-            LOGGER.warn("Unknown host when checking pipe sink IP.", e);
-            return false;
-          }
-        },
-        String.format(
-            "One of the endpoints %s of the receivers is pointing back to the 
thrift receiver %s on sender itself, "
-                + "or unknown host when checking pipe sink IP.",
-            givenNodeUrls,
-            new TEndPoint(PipeOptions.RPC_ADDRESS.value(), 
PipeOptions.RPC_PORT.value())));
-  }
-
-  @Override
-  protected IoTDBSyncClientManager constructClient(
-      final List<TEndPoint> nodeUrls,
-      final boolean useSSL,
-      final String trustStorePath,
-      final String trustStorePwd,
-      /* The following parameters are used locally. */
-      final boolean useLeaderCache,
-      final String loadBalanceStrategy,
-      /* The following parameters are used to handshake with the receiver. */
-      final String username,
-      final String password,
-      final boolean shouldReceiverConvertOnTypeMismatch,
-      final String loadTsFileStrategy,
-      final boolean validateTsFile,
-      final boolean shouldMarkAsPipeRequest) {
-    clientManager =
-        new IoTDBDataNodeSyncClientManager(
-            nodeUrls,
-            useSSL,
-            Objects.nonNull(trustStorePath)
-                ? /*IoTDBConfig.addDataHomeDir(trustStorePath)*/ ""
-                : null,
-            trustStorePwd,
-            useLeaderCache,
-            loadBalanceStrategy,
-            username,
-            password,
-            shouldReceiverConvertOnTypeMismatch,
-            loadTsFileStrategy,
-            validateTsFile,
-            shouldMarkAsPipeRequest);
-    return clientManager;
-  }
-}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataRegionSyncConnector.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataRegionSyncConnector.java
index 9380757..04b9844 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataRegionSyncConnector.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.protocol;
 
+import 
org.apache.iotdb.collector.plugin.builtin.sink.client.IoTDBDataNodeSyncClientManager;
 import org.apache.iotdb.collector.plugin.builtin.sink.client.IoTDBSyncClient;
+import 
org.apache.iotdb.collector.plugin.builtin.sink.client.IoTDBSyncClientManager;
 import 
org.apache.iotdb.collector.plugin.builtin.sink.event.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.collector.plugin.builtin.sink.event.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.collector.plugin.builtin.sink.payload.evolvable.batch.PipeTabletEventBatch;
@@ -59,12 +61,49 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
+public class IoTDBDataRegionSyncConnector extends IoTDBSslSyncConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataRegionSyncConnector.class);
 
   private PipeTransferBatchReqBuilder tabletBatchBuilder;
 
+  protected IoTDBDataNodeSyncClientManager clientManager;
+
+  @Override
+  protected IoTDBSyncClientManager constructClient(
+      final List<TEndPoint> nodeUrls,
+      final boolean useSSL,
+      final String trustStorePath,
+      final String trustStorePwd,
+      /* The following parameters are used locally. */
+      final boolean useLeaderCache,
+      final String loadBalanceStrategy,
+      /* The following parameters are used to handshake with the receiver. */
+      final String username,
+      final String password,
+      final boolean shouldReceiverConvertOnTypeMismatch,
+      final String loadTsFileStrategy,
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest) {
+    clientManager =
+        new IoTDBDataNodeSyncClientManager(
+            nodeUrls,
+            useSSL,
+            Objects.nonNull(trustStorePath)
+                ? /*IoTDBConfig.addDataHomeDir(trustStorePath)*/ ""
+                : null,
+            trustStorePwd,
+            useLeaderCache,
+            loadBalanceStrategy,
+            username,
+            password,
+            shouldReceiverConvertOnTypeMismatch,
+            loadTsFileStrategy,
+            validateTsFile,
+            shouldMarkAsPipeRequest);
+    return clientManager;
+  }
+
   @Override
   public void customize(
       final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
@@ -91,59 +130,25 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
-    // PipeProcessor can change the type of TabletInsertionEvent
-    if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
-      LOGGER.warn(
-          "IoTDBThriftSyncConnector only support " + 
"PipeRawTabletInsertionEvent. " + "Ignore {}.",
-          tabletInsertionEvent);
-      return;
-    }
-
-    try {
-      if (isTabletBatchModeEnabled) {
-        final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
-            tabletBatchBuilder.onEvent(tabletInsertionEvent);
-        if (Objects.nonNull(endPointAndBatch)) {
-          doTransferWrapper(endPointAndBatch);
-        }
-      } else {
-        doTransferWrapper((PipeRawTabletInsertionEvent) tabletInsertionEvent);
+    if (isTabletBatchModeEnabled) {
+      final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
+          tabletBatchBuilder.onEvent(tabletInsertionEvent);
+      if (Objects.nonNull(endPointAndBatch)) {
+        doTransferWrapper(endPointAndBatch);
       }
-    } catch (final Exception e) {
-      throw new PipeConnectionException(
-          String.format(
-              "Failed to transfer tablet insertion event %s, because %s.",
-              ((PipeRawTabletInsertionEvent) 
tabletInsertionEvent).coreReportMessage(),
-              e.getMessage()),
-          e);
+    } else {
+      doTransferWrapper((PipeRawTabletInsertionEvent) tabletInsertionEvent);
     }
   }
 
   @Override
   public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
-    // PipeProcessor can change the type of tsFileInsertionEvent
-    if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
-      LOGGER.warn(
-          "IoTDBThriftSyncConnector only support PipeTsFileInsertionEvent. 
Ignore {}.",
-          tsFileInsertionEvent);
-      return;
+    // In order to commit in order
+    if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
+      doTransferWrapper();
     }
 
-    try {
-      // In order to commit in order
-      if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
-        doTransferWrapper();
-      }
-
-      doTransferWrapper((PipeTsFileInsertionEvent) tsFileInsertionEvent);
-    } catch (final Exception e) {
-      throw new PipeConnectionException(
-          String.format(
-              "Failed to transfer tsfile insertion event %s, because %s.",
-              ((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).coreReportMessage(),
-              e.getMessage()),
-          e);
-    }
+    doTransferWrapper((PipeTsFileInsertionEvent) tsFileInsertionEvent);
   }
 
   @Override
@@ -217,13 +222,9 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
     final Map<Pair<String, Long>, Double> pipe2WeightMap = 
batchToTransfer.deepCopyPipe2WeightMap();
 
     for (final Pair<String, File> dbTsFile : dbTsFilePairs) {
-      doTransfer(pipe2WeightMap, dbTsFile.right, null, dbTsFile.left);
+      doTransfer(dbTsFile.right, null, dbTsFile.left);
       try {
-        RetryUtils.retryOnException(
-            () -> {
-              FileUtils.delete(dbTsFile.right);
-              return null;
-            });
+        FileUtils.delete(dbTsFile.right);
       } catch (final NoSuchFileException e) {
         LOGGER.info("The file {} is not found, may already be deleted.", 
dbTsFile);
       } catch (final Exception e) {
@@ -248,7 +249,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
       final TPipeTransferReq req =
           compressIfNeeded(
               PipeTransferTabletRawReqV2.toTPipeTransferReq(
-                  pipeRawTabletInsertionEvent.convertToTablet(),
+                  pipeRawTabletInsertionEvent.getTablet(),
                   pipeRawTabletInsertionEvent.isAligned(),
                   pipeRawTabletInsertionEvent.isTableModelEvent()
                       ? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
@@ -271,7 +272,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
           status,
           String.format(
               "Transfer PipeRawTabletInsertionEvent %s error, result status 
%s",
-              pipeRawTabletInsertionEvent.coreReportMessage(), status),
+              pipeRawTabletInsertionEvent, status),
           pipeRawTabletInsertionEvent.toString());
     }
     if (status.isSetRedirectNode()) {
@@ -283,19 +284,14 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
   private void doTransferWrapper(final PipeTsFileInsertionEvent 
pipeTsFileInsertionEvent)
       throws PipeException, IOException {
     doTransfer(
-        Collections.singletonMap(
-            new Pair<>(
-                pipeTsFileInsertionEvent.getPipeName(), 
pipeTsFileInsertionEvent.getCreationTime()),
-            1.0),
         pipeTsFileInsertionEvent.getTsFile(),
-        pipeTsFileInsertionEvent.isWithMod() ? 
pipeTsFileInsertionEvent.getModFile() : null,
+        null,
         pipeTsFileInsertionEvent.isTableModelEvent()
             ? pipeTsFileInsertionEvent.getTableModelDatabaseName()
             : null);
   }
 
   private void doTransfer(
-      final Map<Pair<String, Long>, Double> pipeName2WeightMap,
       final File tsFile,
       final File modFile,
       final String dataBaseName)
@@ -306,8 +302,8 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
     // 1. Transfer tsFile, and mod file if exists and receiver's version >= 2
     if (Objects.nonNull(modFile) && 
clientManager.supportModsIfIsDataNodeReceiver()) {
-      transferFilePieces(pipeName2WeightMap, modFile, clientAndStatus, true);
-      transferFilePieces(pipeName2WeightMap, tsFile, clientAndStatus, true);
+      transferFilePieces(modFile, clientAndStatus, true);
+      transferFilePieces(tsFile, clientAndStatus, true);
 
       // 2. Transfer file seal signal with mod, which means the file is 
transferred completely
       try {
@@ -329,7 +325,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
             e);
       }
     } else {
-      transferFilePieces(pipeName2WeightMap, tsFile, clientAndStatus, false);
+      transferFilePieces(tsFile, clientAndStatus, false);
 
       // 2. Transfer file seal signal without mod, which means the file is 
transferred completely
       try {
@@ -362,7 +358,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
   }
 
   @Override
-  public void close() {
+  public void close() throws Exception {
     if (tabletBatchBuilder != null) {
       tabletBatchBuilder.close();
     }
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBSslSyncConnector.java
index b1920d6..c49cc2c 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBSslSyncConnector.java
@@ -163,7 +163,6 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
   }
 
   protected void transferFilePieces(
-      final Map<Pair<String, Long>, Double> pipe2WeightMap,
       final File file,
       final Pair<IoTDBSyncClient, Boolean> clientAndStatus,
       final boolean isMultiFile)
@@ -237,11 +236,9 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
       final String fileName, final long position, final byte[] payLoad) throws 
IOException;
 
   @Override
-  public void close() {
+  public void close() throws Exception {
     if (clientManager != null) {
       clientManager.close();
     }
-
-    super.close();
   }
 }
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/PipeReceiverStatusHandler.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/PipeReceiverStatusHandler.java
index 477dc64..12f8e09 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/PipeReceiverStatusHandler.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/PipeReceiverStatusHandler.java
@@ -103,7 +103,7 @@ public class PipeReceiverStatusHandler {
       case 1808: // PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION
         {
           LOGGER.info("Temporary unavailable exception: will retry forever. 
status: {}", status);
-          throw new PipeRuntimeConnectorCriticalException(exceptionMessage);
+          throw new PipeException(exceptionMessage);
         }
 
       case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION
@@ -141,11 +141,11 @@ public class PipeReceiverStatusHandler {
                       + " seconds",
               status);
           exceptionEventHasBeenRetried.set(true);
-          throw new PipeRuntimeConnectorRetryTimesConfigurableException(
+          throw new PipeException(
               exceptionMessage,
               (int)
                   Math.max(
-                      PipeSubtask.MAX_RETRY_TIMES,
+                      5,
                       Math.min(CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenConflictOccurs * 1.1)));
         }
 
@@ -176,11 +176,11 @@ public class PipeReceiverStatusHandler {
                       + " seconds",
               status);
           exceptionEventHasBeenRetried.set(true);
-          throw new PipeRuntimeConnectorRetryTimesConfigurableException(
+          throw new PipeException(
               exceptionMessage,
               (int)
                   Math.max(
-                      PipeSubtask.MAX_RETRY_TIMES,
+                      5,
                       Math.min(
                           CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
         }
@@ -200,49 +200,4 @@ public class PipeReceiverStatusHandler {
     exceptionEventHasBeenRetried.set(false);
     exceptionRecordedMessage.set("");
   }
-
-  /////////////////////////////// Prior status specifier 
///////////////////////////////
-
-  private static final List<Integer> STATUS_PRIORITY =
-      Collections.unmodifiableList(
-          Arrays.asList(
-              TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-              
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode(),
-              TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode(),
-              
TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode(),
-              
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()));
-
-  /**
-   * This method is used to get the highest priority {@link TSStatus} from a 
list of {@link
-   * TSStatus}. The priority of each status is determined by its {@link 
TSStatusCode}, and the
-   * priority sequence is defined in the {@link #STATUS_PRIORITY} list.
-   *
-   * <p>Specifically, it iterates through the input {@link TSStatus} list. For 
each {@link
-   * TSStatus}, if its {@link TSStatusCode} is not in the {@link 
#STATUS_PRIORITY} list, it directly
-   * returns this {@link TSStatus}. Otherwise, it compares the current {@link 
TSStatus} with the
-   * highest priority {@link TSStatus} found so far (initially set to the 
{@link
-   * TSStatusCode#SUCCESS_STATUS}). If the current {@link TSStatus} has a 
higher priority, it
-   * updates the highest priority {@link TSStatus} to the current {@link 
TSStatus}.
-   *
-   * <p>Finally, the method returns the highest priority {@link TSStatus}.
-   *
-   * @param givenStatusList a list of {@link TSStatus} from which the highest 
priority {@link
-   *     TSStatus} is to be found
-   * @return the highest priority {@link TSStatus} from the input list
-   */
-  public static TSStatus getPriorStatus(final List<TSStatus> givenStatusList) {
-    final TSStatus resultStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    for (final TSStatus givenStatus : givenStatusList) {
-      if (!STATUS_PRIORITY.contains(givenStatus.getCode())) {
-        return givenStatus;
-      }
-
-      if (STATUS_PRIORITY.indexOf(givenStatus.getCode())
-          > STATUS_PRIORITY.indexOf(resultStatus.getCode())) {
-        resultStatus.setCode(givenStatus.getCode());
-      }
-    }
-    resultStatus.setSubStatus(givenStatusList);
-    return resultStatus;
-  }
 }

Reply via email to