This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new d94a874bc Move Handover to common module (#2877) d94a874bc is described below commit d94a874bcb09f476784b6c42cc6ec6468d515023 Author: Xiao Zhao <zhaomin1...@163.com> AuthorDate: Mon Sep 26 11:18:33 2022 +0800 Move Handover to common module (#2877) --- .../org/apache/seatunnel/common}/Handover.java | 2 +- .../pulsar/source/reader/PulsarSourceReader.java | 1 + .../source/reader/PulsarSplitReaderThread.java | 1 + .../translation/spark/common/Handover.java | 88 ---------------------- .../spark/common/InternalRowCollector.java | 1 + .../source/batch/ParallelBatchPartitionReader.java | 2 +- 6 files changed, 5 insertions(+), 90 deletions(-) diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/Handover.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java similarity index 97% rename from seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/Handover.java rename to seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java index 04b8f0a24..d1f62fda8 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/Handover.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader; +package org.apache.seatunnel.common; import static com.google.common.base.Preconditions.checkNotNull; diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java index 1aebd8fb4..ddb2e9147 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java @@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader; import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.common.Handover; import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig; import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil; import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig; diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java index f708a1759..9817046b7 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader; +import org.apache.seatunnel.common.Handover; import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil; import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig; import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor; diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/Handover.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/Handover.java deleted file mode 100644 index be54e2045..000000000 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/Handover.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.translation.spark.common; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.Closeable; -import java.util.Optional; -import java.util.concurrent.LinkedBlockingQueue; - -public final class Handover<T> implements Closeable { - private final Object lock = new Object(); - private final LinkedBlockingQueue<T> blockingQueue = - new LinkedBlockingQueue<>(); - private Throwable error; - - public boolean isEmpty() { - return blockingQueue.isEmpty(); - } - - public Optional<T> pollNext() throws Exception { - if (error != null) { - rethrowException(error, error.getMessage()); - } else if (!isEmpty()) { - return Optional.ofNullable(blockingQueue.poll()); - } - return Optional.empty(); - } - - public void produce(final T element) - throws InterruptedException, ClosedException { - if (error != null) { - throw new ClosedException(); - } - blockingQueue.put(element); - } - - public void reportError(Throwable t) { - checkNotNull(t); - - synchronized (lock) { - // do not override the initial exception - if (error == null) { - error = t; - } - lock.notifyAll(); - } - } - - @Override - public void close() { - synchronized (lock) { - if (error == null) { - error = new ClosedException(); - } - lock.notifyAll(); - } - } - - public static void rethrowException(Throwable t, String parentMessage) throws Exception { - if (t instanceof Error) { - throw (Error) t; - } else if (t instanceof Exception) { - throw (Exception) t; - } else { - throw new Exception(parentMessage, t); - } - } - - public static final class ClosedException extends Exception { - private static final long serialVersionUID = 1L; - } -} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/InternalRowCollector.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/InternalRowCollector.java index 036055dc0..1a02c3a43 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/InternalRowCollector.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/InternalRowCollector.java @@ -20,6 +20,7 @@ package org.apache.seatunnel.translation.spark.common; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.Handover; import org.apache.seatunnel.translation.spark.common.serialization.InternalRowConverter; import org.apache.spark.sql.catalyst.InternalRow; diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java index 5e3d42303..92494d49e 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java @@ -20,9 +20,9 @@ package org.apache.seatunnel.translation.spark.common.source.batch; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.Handover; import org.apache.seatunnel.translation.source.BaseSourceFunction; import org.apache.seatunnel.translation.source.ParallelSource; -import org.apache.seatunnel.translation.spark.common.Handover; import org.apache.seatunnel.translation.spark.common.InternalRowCollector; import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;