This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch sink in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git
commit feeb55c13d815a7920a81ef2420ad9ed50bc25c4 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Apr 7 10:20:00 2025 +0800 refactor --- .../sink/event/PipeRawTabletInsertionEvent.java | 8 ++ .../sink/event/PipeTsFileInsertionEvent.java | 4 + .../builtin/sink/protocol/IoTDBConnector.java | 30 ++--- .../sink/protocol/IoTDBDataNodeSyncConnector.java | 104 ----------------- .../protocol/IoTDBDataRegionSyncConnector.java | 124 ++++++++++----------- .../sink/protocol/IoTDBSslSyncConnector.java | 5 +- .../sink/protocol/PipeReceiverStatusHandler.java | 55 +-------- 7 files changed, 93 insertions(+), 237 deletions(-) diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeRawTabletInsertionEvent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeRawTabletInsertionEvent.java index c6a9e27..3ea3452 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeRawTabletInsertionEvent.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeRawTabletInsertionEvent.java @@ -45,6 +45,10 @@ public class PipeRawTabletInsertionEvent extends PipeInsertionEvent throw new UnsupportedOperationException(); } + public Tablet getTablet() { + return tablet; + } + public boolean isAligned() { return isAligned; } @@ -53,6 +57,10 @@ public class PipeRawTabletInsertionEvent extends PipeInsertionEvent return false; } + public String getDeviceId() { + return null; + } + public String getTableModelDatabaseName() { return null; } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeTsFileInsertionEvent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeTsFileInsertionEvent.java index a134208..2fc5e9a 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeTsFileInsertionEvent.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/PipeTsFileInsertionEvent.java @@ -39,6 +39,10 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent implements TsFi throw new PipeException(""); } + public File getTsFile() { + return tsFile; + } + @Override public Iterable<TabletInsertionEvent> toTabletInsertionEvents() throws PipeException { return toTabletInsertionEvents(Long.MAX_VALUE); diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBConnector.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBConnector.java index 141bd64..7092314 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBConnector.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBConnector.java @@ -29,6 +29,7 @@ import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeCo import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; +import org.apache.iotdb.rpc.UrlUtils; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.slf4j.Logger; @@ -368,7 +369,7 @@ public abstract class IoTDBConnector implements PipeConnector { if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) { givenNodeUrls.addAll( - NodeUrlUtils.parseTEndPointUrls( + parseTEndPointUrls( Arrays.asList( parameters .getStringByKeys(CONNECTOR_IOTDB_NODE_URLS_KEY) @@ -378,7 +379,7 @@ public abstract class IoTDBConnector implements PipeConnector { if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) { givenNodeUrls.addAll( - NodeUrlUtils.parseTEndPointUrls( + parseTEndPointUrls( Arrays.asList( parameters .getStringByKeys(SINK_IOTDB_NODE_URLS_KEY) @@ -394,8 +395,19 @@ public abstract class IoTDBConnector implements PipeConnector { return givenNodeUrls; } + + private static List<TEndPoint> parseTEndPointUrls(List<String> endPointUrls) { + if (endPointUrls == null) { + throw new NumberFormatException("endPointUrls is null"); + } + List<TEndPoint> result = new ArrayList<>(); + for (String url : endPointUrls) { + result.add(UrlUtils.parseTEndPointIpv4AndIpv6Url(url)); + } + return result; + } - private void checkNodeUrls(final Set<TEndPoint> nodeUrls) throws PipeParameterNotValidException { + private static void checkNodeUrls(final Set<TEndPoint> nodeUrls) throws PipeParameterNotValidException { for (final TEndPoint nodeUrl : nodeUrls) { if (Objects.isNull(nodeUrl.ip) || nodeUrl.ip.isEmpty()) { LOGGER.warn(PARSE_URL_ERROR_FORMATTER, "host cannot be empty"); @@ -413,16 +425,4 @@ public abstract class IoTDBConnector implements PipeConnector { ? PipeTransferCompressedReq.toTPipeTransferReq(req, compressors) : req; } - - public boolean isRpcCompressionEnabled() { - return isRpcCompressionEnabled; - } - - public List<PipeCompressor> getCompressors() { - return compressors; - } - - public PipeReceiverStatusHandler statusHandler() { - return receiverStatusHandler; - } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataNodeSyncConnector.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataNodeSyncConnector.java deleted file mode 100644 index d7c0a77..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataNodeSyncConnector.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.plugin.builtin.sink.protocol; - -import org.apache.iotdb.collector.config.PipeOptions; -import org.apache.iotdb.collector.plugin.builtin.sink.client.IoTDBDataNodeSyncClientManager; -import org.apache.iotdb.collector.plugin.builtin.sink.client.IoTDBSyncClientManager; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.UnknownHostException; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; - -public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector { - - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeSyncConnector.class); - - protected IoTDBDataNodeSyncClientManager clientManager; - - @Override - public void validate(final PipeParameterValidator validator) throws Exception { - super.validate(validator); - - final Set<TEndPoint> givenNodeUrls = parseNodeUrls(validator.getParameters()); - - validator.validate( - empty -> { - try { - // Ensure the sink doesn't point to the thrift receiver on DataNode itself - return !NodeUrlUtils.containsLocalAddress( - givenNodeUrls.stream() - .filter(tEndPoint -> tEndPoint.getPort() == PipeOptions.RPC_PORT.value()) - .map(TEndPoint::getIp) - .collect(Collectors.toList())); - } catch (final UnknownHostException e) { - LOGGER.warn("Unknown host when checking pipe sink IP.", e); - return false; - } - }, - String.format( - "One of the endpoints %s of the receivers is pointing back to the thrift receiver %s on sender itself, " - + "or unknown host when checking pipe sink IP.", - givenNodeUrls, - new TEndPoint(PipeOptions.RPC_ADDRESS.value(), PipeOptions.RPC_PORT.value()))); - } - - @Override - protected IoTDBSyncClientManager constructClient( - final List<TEndPoint> nodeUrls, - final boolean useSSL, - final String trustStorePath, - final String trustStorePwd, - /* The following parameters are used locally. */ - final boolean useLeaderCache, - final String loadBalanceStrategy, - /* The following parameters are used to handshake with the receiver. */ - final String username, - final String password, - final boolean shouldReceiverConvertOnTypeMismatch, - final String loadTsFileStrategy, - final boolean validateTsFile, - final boolean shouldMarkAsPipeRequest) { - clientManager = - new IoTDBDataNodeSyncClientManager( - nodeUrls, - useSSL, - Objects.nonNull(trustStorePath) - ? /*IoTDBConfig.addDataHomeDir(trustStorePath)*/ "" - : null, - trustStorePwd, - useLeaderCache, - loadBalanceStrategy, - username, - password, - shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy, - validateTsFile, - shouldMarkAsPipeRequest); - return clientManager; - } -} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataRegionSyncConnector.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataRegionSyncConnector.java index 9380757..04b9844 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataRegionSyncConnector.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataRegionSyncConnector.java @@ -19,7 +19,9 @@ package org.apache.iotdb.collector.plugin.builtin.sink.protocol; +import org.apache.iotdb.collector.plugin.builtin.sink.client.IoTDBDataNodeSyncClientManager; import org.apache.iotdb.collector.plugin.builtin.sink.client.IoTDBSyncClient; +import org.apache.iotdb.collector.plugin.builtin.sink.client.IoTDBSyncClientManager; import org.apache.iotdb.collector.plugin.builtin.sink.event.PipeRawTabletInsertionEvent; import org.apache.iotdb.collector.plugin.builtin.sink.event.PipeTsFileInsertionEvent; import org.apache.iotdb.collector.plugin.builtin.sink.payload.evolvable.batch.PipeTabletEventBatch; @@ -59,12 +61,49 @@ import java.util.List; import java.util.Map; import java.util.Objects; -public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { +public class IoTDBDataRegionSyncConnector extends IoTDBSslSyncConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionSyncConnector.class); private PipeTransferBatchReqBuilder tabletBatchBuilder; + protected IoTDBDataNodeSyncClientManager clientManager; + + @Override + protected IoTDBSyncClientManager constructClient( + final List<TEndPoint> nodeUrls, + final boolean useSSL, + final String trustStorePath, + final String trustStorePwd, + /* The following parameters are used locally. */ + final boolean useLeaderCache, + final String loadBalanceStrategy, + /* The following parameters are used to handshake with the receiver. */ + final String username, + final String password, + final boolean shouldReceiverConvertOnTypeMismatch, + final String loadTsFileStrategy, + final boolean validateTsFile, + final boolean shouldMarkAsPipeRequest) { + clientManager = + new IoTDBDataNodeSyncClientManager( + nodeUrls, + useSSL, + Objects.nonNull(trustStorePath) + ? /*IoTDBConfig.addDataHomeDir(trustStorePath)*/ "" + : null, + trustStorePwd, + useLeaderCache, + loadBalanceStrategy, + username, + password, + shouldReceiverConvertOnTypeMismatch, + loadTsFileStrategy, + validateTsFile, + shouldMarkAsPipeRequest); + return clientManager; + } + @Override public void customize( final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) @@ -91,59 +130,25 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { - // PipeProcessor can change the type of TabletInsertionEvent - if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) { - LOGGER.warn( - "IoTDBThriftSyncConnector only support " + "PipeRawTabletInsertionEvent. " + "Ignore {}.", - tabletInsertionEvent); - return; - } - - try { - if (isTabletBatchModeEnabled) { - final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch = - tabletBatchBuilder.onEvent(tabletInsertionEvent); - if (Objects.nonNull(endPointAndBatch)) { - doTransferWrapper(endPointAndBatch); - } - } else { - doTransferWrapper((PipeRawTabletInsertionEvent) tabletInsertionEvent); + if (isTabletBatchModeEnabled) { + final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch = + tabletBatchBuilder.onEvent(tabletInsertionEvent); + if (Objects.nonNull(endPointAndBatch)) { + doTransferWrapper(endPointAndBatch); } - } catch (final Exception e) { - throw new PipeConnectionException( - String.format( - "Failed to transfer tablet insertion event %s, because %s.", - ((PipeRawTabletInsertionEvent) tabletInsertionEvent).coreReportMessage(), - e.getMessage()), - e); + } else { + doTransferWrapper((PipeRawTabletInsertionEvent) tabletInsertionEvent); } } @Override public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exception { - // PipeProcessor can change the type of tsFileInsertionEvent - if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) { - LOGGER.warn( - "IoTDBThriftSyncConnector only support PipeTsFileInsertionEvent. Ignore {}.", - tsFileInsertionEvent); - return; + // In order to commit in order + if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) { + doTransferWrapper(); } - try { - // In order to commit in order - if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) { - doTransferWrapper(); - } - - doTransferWrapper((PipeTsFileInsertionEvent) tsFileInsertionEvent); - } catch (final Exception e) { - throw new PipeConnectionException( - String.format( - "Failed to transfer tsfile insertion event %s, because %s.", - ((PipeTsFileInsertionEvent) tsFileInsertionEvent).coreReportMessage(), - e.getMessage()), - e); - } + doTransferWrapper((PipeTsFileInsertionEvent) tsFileInsertionEvent); } @Override @@ -217,13 +222,9 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { final Map<Pair<String, Long>, Double> pipe2WeightMap = batchToTransfer.deepCopyPipe2WeightMap(); for (final Pair<String, File> dbTsFile : dbTsFilePairs) { - doTransfer(pipe2WeightMap, dbTsFile.right, null, dbTsFile.left); + doTransfer(dbTsFile.right, null, dbTsFile.left); try { - RetryUtils.retryOnException( - () -> { - FileUtils.delete(dbTsFile.right); - return null; - }); + FileUtils.delete(dbTsFile.right); } catch (final NoSuchFileException e) { LOGGER.info("The file {} is not found, may already be deleted.", dbTsFile); } catch (final Exception e) { @@ -248,7 +249,7 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { final TPipeTransferReq req = compressIfNeeded( PipeTransferTabletRawReqV2.toTPipeTransferReq( - pipeRawTabletInsertionEvent.convertToTablet(), + pipeRawTabletInsertionEvent.getTablet(), pipeRawTabletInsertionEvent.isAligned(), pipeRawTabletInsertionEvent.isTableModelEvent() ? pipeRawTabletInsertionEvent.getTableModelDatabaseName() @@ -271,7 +272,7 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { status, String.format( "Transfer PipeRawTabletInsertionEvent %s error, result status %s", - pipeRawTabletInsertionEvent.coreReportMessage(), status), + pipeRawTabletInsertionEvent, status), pipeRawTabletInsertionEvent.toString()); } if (status.isSetRedirectNode()) { @@ -283,19 +284,14 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { private void doTransferWrapper(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, IOException { doTransfer( - Collections.singletonMap( - new Pair<>( - pipeTsFileInsertionEvent.getPipeName(), pipeTsFileInsertionEvent.getCreationTime()), - 1.0), pipeTsFileInsertionEvent.getTsFile(), - pipeTsFileInsertionEvent.isWithMod() ? pipeTsFileInsertionEvent.getModFile() : null, + null, pipeTsFileInsertionEvent.isTableModelEvent() ? pipeTsFileInsertionEvent.getTableModelDatabaseName() : null); } private void doTransfer( - final Map<Pair<String, Long>, Double> pipeName2WeightMap, final File tsFile, final File modFile, final String dataBaseName) @@ -306,8 +302,8 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { // 1. Transfer tsFile, and mod file if exists and receiver's version >= 2 if (Objects.nonNull(modFile) && clientManager.supportModsIfIsDataNodeReceiver()) { - transferFilePieces(pipeName2WeightMap, modFile, clientAndStatus, true); - transferFilePieces(pipeName2WeightMap, tsFile, clientAndStatus, true); + transferFilePieces(modFile, clientAndStatus, true); + transferFilePieces(tsFile, clientAndStatus, true); // 2. Transfer file seal signal with mod, which means the file is transferred completely try { @@ -329,7 +325,7 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { e); } } else { - transferFilePieces(pipeName2WeightMap, tsFile, clientAndStatus, false); + transferFilePieces(tsFile, clientAndStatus, false); // 2. Transfer file seal signal without mod, which means the file is transferred completely try { @@ -362,7 +358,7 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { } @Override - public void close() { + public void close() throws Exception { if (tabletBatchBuilder != null) { tabletBatchBuilder.close(); } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBSslSyncConnector.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBSslSyncConnector.java index b1920d6..c49cc2c 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBSslSyncConnector.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBSslSyncConnector.java @@ -163,7 +163,6 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { } protected void transferFilePieces( - final Map<Pair<String, Long>, Double> pipe2WeightMap, final File file, final Pair<IoTDBSyncClient, Boolean> clientAndStatus, final boolean isMultiFile) @@ -237,11 +236,9 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { final String fileName, final long position, final byte[] payLoad) throws IOException; @Override - public void close() { + public void close() throws Exception { if (clientManager != null) { clientManager.close(); } - - super.close(); } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/PipeReceiverStatusHandler.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/PipeReceiverStatusHandler.java index 477dc64..12f8e09 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/PipeReceiverStatusHandler.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/PipeReceiverStatusHandler.java @@ -103,7 +103,7 @@ public class PipeReceiverStatusHandler { case 1808: // PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION { LOGGER.info("Temporary unavailable exception: will retry forever. status: {}", status); - throw new PipeRuntimeConnectorCriticalException(exceptionMessage); + throw new PipeException(exceptionMessage); } case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION @@ -141,11 +141,11 @@ public class PipeReceiverStatusHandler { + " seconds", status); exceptionEventHasBeenRetried.set(true); - throw new PipeRuntimeConnectorRetryTimesConfigurableException( + throw new PipeException( exceptionMessage, (int) Math.max( - PipeSubtask.MAX_RETRY_TIMES, + 5, Math.min(CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenConflictOccurs * 1.1))); } @@ -176,11 +176,11 @@ public class PipeReceiverStatusHandler { + " seconds", status); exceptionEventHasBeenRetried.set(true); - throw new PipeRuntimeConnectorRetryTimesConfigurableException( + throw new PipeException( exceptionMessage, (int) Math.max( - PipeSubtask.MAX_RETRY_TIMES, + 5, Math.min( CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1))); } @@ -200,49 +200,4 @@ public class PipeReceiverStatusHandler { exceptionEventHasBeenRetried.set(false); exceptionRecordedMessage.set(""); } - - /////////////////////////////// Prior status specifier /////////////////////////////// - - private static final List<Integer> STATUS_PRIORITY = - Collections.unmodifiableList( - Arrays.asList( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode(), - TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode(), - TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode(), - TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())); - - /** - * This method is used to get the highest priority {@link TSStatus} from a list of {@link - * TSStatus}. The priority of each status is determined by its {@link TSStatusCode}, and the - * priority sequence is defined in the {@link #STATUS_PRIORITY} list. - * - * <p>Specifically, it iterates through the input {@link TSStatus} list. For each {@link - * TSStatus}, if its {@link TSStatusCode} is not in the {@link #STATUS_PRIORITY} list, it directly - * returns this {@link TSStatus}. Otherwise, it compares the current {@link TSStatus} with the - * highest priority {@link TSStatus} found so far (initially set to the {@link - * TSStatusCode#SUCCESS_STATUS}). If the current {@link TSStatus} has a higher priority, it - * updates the highest priority {@link TSStatus} to the current {@link TSStatus}. - * - * <p>Finally, the method returns the highest priority {@link TSStatus}. - * - * @param givenStatusList a list of {@link TSStatus} from which the highest priority {@link - * TSStatus} is to be found - * @return the highest priority {@link TSStatus} from the input list - */ - public static TSStatus getPriorStatus(final List<TSStatus> givenStatusList) { - final TSStatus resultStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - for (final TSStatus givenStatus : givenStatusList) { - if (!STATUS_PRIORITY.contains(givenStatus.getCode())) { - return givenStatus; - } - - if (STATUS_PRIORITY.indexOf(givenStatus.getCode()) - > STATUS_PRIORITY.indexOf(resultStatus.getCode())) { - resultStatus.setCode(givenStatus.getCode()); - } - } - resultStatus.setSubStatus(givenStatusList); - return resultStatus; - } }
