This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 680608bb48 [flink] Remove useless blocking queue in
AlignedSourceReader (#5119)
680608bb48 is described below
commit 680608bb48dd2cb617166a1069f6706f250b3c25
Author: tsreaper <[email protected]>
AuthorDate: Thu Feb 20 16:54:43 2025 +0800
[flink] Remove useless blocking queue in AlignedSourceReader (#5119)
---
.../flink/source/align/AlignedContinuousFileStoreSource.java | 11 +----------
.../apache/paimon/flink/source/align/AlignedSourceReader.java | 10 ----------
.../paimon/flink/source/align/AlignedSourceReaderTest.java | 2 --
3 files changed, 1 insertion(+), 22 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index 63b3f63f7f..1b3e7b5b19 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -35,8 +35,6 @@ import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
-import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.table.data.RowData;
import javax.annotation.Nullable;
@@ -68,14 +66,7 @@ public class AlignedContinuousFileStoreSource extends
ContinuousFileStoreSource
FileStoreSourceReaderMetrics sourceReaderMetrics =
new FileStoreSourceReaderMetrics(context.metricGroup());
return new AlignedSourceReader(
- context,
- readBuilder.newRead(),
- sourceReaderMetrics,
- ioManager,
- limit,
- new FutureCompletingBlockingQueue<>(
-
context.getConfiguration().get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),
- rowData);
+ context, readBuilder.newRead(), sourceReaderMetrics,
ioManager, limit, rowData);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
index 7d6f47296a..38e5a37e77 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
@@ -29,9 +29,6 @@ import org.apache.paimon.table.source.TableRead;
import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.table.data.RowData;
import javax.annotation.Nullable;
@@ -46,9 +43,6 @@ import java.util.Optional;
public class AlignedSourceReader extends FileStoreSourceReader
implements ExternallyInducedSourceReader<RowData,
FileStoreSourceSplit> {
- private final FutureCompletingBlockingQueue<
- RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>>
- elementsQueue;
private Long nextCheckpointId;
public AlignedSourceReader(
@@ -57,11 +51,8 @@ public class AlignedSourceReader extends
FileStoreSourceReader
FileStoreSourceReaderMetrics metrics,
IOManager ioManager,
@Nullable Long limit,
-
FutureCompletingBlockingQueue<RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>>
- elementsQueue,
@Nullable NestedProjectedRowData rowData) {
super(readerContext, tableRead, metrics, ioManager, limit, rowData);
- this.elementsQueue = elementsQueue;
this.nextCheckpointId = null;
}
@@ -69,7 +60,6 @@ public class AlignedSourceReader extends FileStoreSourceReader
public void handleSourceEvents(SourceEvent sourceEvent) {
if (sourceEvent instanceof CheckpointEvent) {
nextCheckpointId = ((CheckpointEvent)
sourceEvent).getCheckpointId();
- elementsQueue.notifyAvailable();
} else {
super.handleSourceEvents(sourceEvent);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
index f815dbe632..00e1babcff 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -24,7 +24,6 @@ import
org.apache.paimon.flink.source.FileStoreSourceReaderTest;
import org.apache.paimon.flink.source.TestChangelogDataReadWrite;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
-import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.table.data.RowData;
@@ -77,7 +76,6 @@ public class AlignedSourceReaderTest extends
FileStoreSourceReaderTest {
new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
IOManager.create(tempDir.toString()),
null,
- new FutureCompletingBlockingQueue<>(2),
null);
}
}