This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new eae32bfb84 [Core][Flink] Resolve streaming source high CPU usage 
(#8354)
eae32bfb84 is described below

commit eae32bfb840572e6509486541d2176cab1763c06
Author: Jast <[email protected]>
AuthorDate: Tue Dec 24 19:19:38 2024 +0800

    [Core][Flink] Resolve streaming source high CPU usage (#8354)
---
 .../flink/source/FlinkRowCollector.java            | 14 +++++++
 .../flink/source/FlinkSourceReader.java            | 44 +++++++++++++++++++++-
 2 files changed, 56 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
index 2ea584029e..c7eadfcfb7 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
@@ -46,6 +46,8 @@ public class FlinkRowCollector implements 
Collector<SeaTunnelRow> {
 
     private final Meter sourceReadQPS;
 
+    private boolean emptyThisPollNext = true;
+
     public FlinkRowCollector(Config envConfig, MetricsContext metricsContext) {
         this.flowControlGate = 
FlowControlGate.create(FlowControlStrategy.fromConfig(envConfig));
         this.sourceReadCount = 
metricsContext.counter(MetricNames.SOURCE_RECEIVED_COUNT);
@@ -61,6 +63,7 @@ public class FlinkRowCollector implements 
Collector<SeaTunnelRow> {
             sourceReadCount.inc();
             sourceReadBytes.inc(record.getBytesSize());
             sourceReadQPS.markEvent();
+            emptyThisPollNext = false;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -71,8 +74,19 @@ public class FlinkRowCollector implements 
Collector<SeaTunnelRow> {
         return this;
     }
 
+    @Override
+    public boolean isEmptyThisPollNext() {
+        return emptyThisPollNext;
+    }
+
+    @Override
+    public void resetEmptyThisPollNext() {
+        this.emptyThisPollNext = true;
+    }
+
     public FlinkRowCollector withReaderOutput(ReaderOutput<SeaTunnelRow> 
readerOutput) {
         this.readerOutput = readerOutput;
+        this.emptyThisPollNext = true;
         return this;
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
index fb1dc85174..4c2a7b6d2e 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.translation.flink.source;
 
+import 
org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.source.SourceSplit;
@@ -34,6 +35,9 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -55,10 +59,25 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
 
     private InputStatus inputStatus = InputStatus.MORE_AVAILABLE;
 
+    private volatile CompletableFuture<Void> availabilityFuture;
+
+    private static final long DEFAULT_WAIT_TIME_MILLIS = 1000L;
+
+    private final ScheduledExecutorService scheduledExecutor;
+
     public FlinkSourceReader(
             org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> 
sourceReader,
             org.apache.seatunnel.api.source.SourceReader.Context context,
             Config envConfig) {
+        this.scheduledExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ThreadFactoryBuilder()
+                                .setDaemon(true)
+                                .setNameFormat(
+                                        String.format(
+                                                "source-reader-scheduler-%d",
+                                                context.getIndexOfSubtask()))
+                                .build());
         this.sourceReader = sourceReader;
         this.context = context;
         this.flinkRowCollector = new FlinkRowCollector(envConfig, 
context.getMetricsContext());
@@ -78,9 +97,19 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
     public InputStatus pollNext(ReaderOutput<SeaTunnelRow> output) throws 
Exception {
         if (!((FlinkSourceReaderContext) context).isSendNoMoreElementEvent()) {
             sourceReader.pollNext(flinkRowCollector.withReaderOutput(output));
+            if (flinkRowCollector.isEmptyThisPollNext()) {
+                synchronized (this) {
+                    if (availabilityFuture == null || 
availabilityFuture.isDone()) {
+                        availabilityFuture = new CompletableFuture<>();
+                        scheduleComplete(availabilityFuture);
+                        LOGGER.debug("No data available, wait for next poll.");
+                    }
+                }
+                return InputStatus.NOTHING_AVAILABLE;
+            }
         } else {
             // reduce CPU idle
-            Thread.sleep(1000L);
+            Thread.sleep(DEFAULT_WAIT_TIME_MILLIS);
         }
         return inputStatus;
     }
@@ -97,7 +126,8 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
 
     @Override
     public CompletableFuture<Void> isAvailable() {
-        return CompletableFuture.completedFuture(null);
+        CompletableFuture<Void> future = availabilityFuture;
+        return future != null ? future : 
CompletableFuture.completedFuture(null);
     }
 
     @Override
@@ -123,8 +153,13 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
 
     @Override
     public void close() throws Exception {
+        CompletableFuture<Void> future = availabilityFuture;
+        if (future != null && !future.isDone()) {
+            future.complete(null);
+        }
         sourceReader.close();
         context.getEventListener().onEvent(new ReaderCloseEvent());
+        scheduledExecutor.shutdown();
     }
 
     @Override
@@ -136,4 +171,9 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
     public void notifyCheckpointAborted(long checkpointId) throws Exception {
         sourceReader.notifyCheckpointAborted(checkpointId);
     }
+
+    private void scheduleComplete(CompletableFuture<Void> future) {
+        scheduledExecutor.schedule(
+                () -> future.complete(null), DEFAULT_WAIT_TIME_MILLIS, 
TimeUnit.MILLISECONDS);
+    }
 }

Reply via email to