This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new eeb201e40 [core] Remove the duplicated 
ContinuousCompactorFollowUpScanner (#2846)
eeb201e40 is described below

commit eeb201e40cf236b9cf575b9b91e49aed174e9fd3
Author: Aitozi <[email protected]>
AuthorDate: Thu Feb 22 10:46:25 2024 +0800

    [core] Remove the duplicated ContinuousCompactorFollowUpScanner (#2846)
---
 .../table/source/InnerStreamTableScanImpl.java     |   3 +-
 .../ContinuousCompactorFollowUpScanner.java        |  50 ---------
 .../ContinuousCompactorFollowUpScannerTest.java    | 125 ---------------------
 3 files changed, 1 insertion(+), 177 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index ca4bd8a8d..eab95015c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -27,7 +27,6 @@ import 
org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import 
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
 import 
org.apache.paimon.table.source.snapshot.ContinuousAppendAndCompactFollowUpScanner;
-import 
org.apache.paimon.table.source.snapshot.ContinuousCompactorFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.FollowUpScanner;
 import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
@@ -190,7 +189,7 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
                 options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
         switch (type) {
             case COMPACT_BUCKET_TABLE:
-                return new ContinuousCompactorFollowUpScanner();
+                return new DeltaFollowUpScanner();
             case COMPACT_APPEND_NO_BUCKET:
                 return new ContinuousAppendAndCompactFollowUpScanner();
             case FILE_MONITOR:
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
deleted file mode 100644
index 7564d5372..000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.ScanMode;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** {@link FollowUpScanner} used internally for stand-alone streaming compact 
job sources. */
-public class ContinuousCompactorFollowUpScanner implements FollowUpScanner {
-
-    private static final Logger LOG =
-            LoggerFactory.getLogger(ContinuousCompactorFollowUpScanner.class);
-
-    @Override
-    public boolean shouldScanSnapshot(Snapshot snapshot) {
-        if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
-            return true;
-        }
-
-        LOG.debug(
-                "Next snapshot id {} is not APPEND, but is {}, check next 
one.",
-                snapshot.id(),
-                snapshot.commitKind());
-        return false;
-    }
-
-    @Override
-    public SnapshotReader.Plan scan(long snapshotId, SnapshotReader 
snapshotReader) {
-        return 
snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshotId).read();
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
deleted file mode 100644
index 9e4ac18f4..000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.io.DataFileMetaSerializer;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.table.system.BucketsTable;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.utils.SnapshotManager;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link ContinuousCompactorFollowUpScanner}. */
-public class ContinuousCompactorFollowUpScannerTest extends ScannerTestBase {
-
-    private final DataFileMetaSerializer dataFileMetaSerializer = new 
DataFileMetaSerializer();
-
-    @Test
-    public void testScan() throws Exception {
-        SnapshotManager snapshotManager = table.snapshotManager();
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
-
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(1, 20, 200L));
-        write.write(rowData(2, 40, 400L));
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        write.write(rowData(2, 30, 300L));
-        write.write(rowDataWithKind(RowKind.DELETE, 2, 40, 400L));
-        write.compact(binaryRow(1), 0, true);
-        commit.commit(1, write.prepareCommit(true, 1));
-
-        write.close();
-        commit.close();
-
-        Map<String, String> overwritePartition = new HashMap<>();
-        overwritePartition.put("pt", "1");
-        write = table.newWrite(commitUser).withIgnorePreviousFiles(true);
-        commit = table.newCommit(commitUser).withOverwrite(overwritePartition);
-        write.write(rowData(1, 10, 101L));
-        write.write(rowData(1, 20, 201L));
-        commit.commit(2, write.prepareCommit(true, 2));
-
-        write.close();
-        commit.close();
-
-        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
-
-        BucketsTable bucketsTable = new BucketsTable(table, true);
-        TableRead read = bucketsTable.newRead();
-        ContinuousCompactorFollowUpScanner scanner = new 
ContinuousCompactorFollowUpScanner();
-
-        Snapshot snapshot = snapshotManager.snapshot(1);
-        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
-        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        TableScan.Plan plan = scanner.scan(1, snapshotReader);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|1|0|1", "+I 1|2|0|1"));
-
-        snapshot = snapshotManager.snapshot(2);
-        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
-        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        plan = scanner.scan(2, snapshotReader);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Collections.singletonList("+I 2|2|0|1"));
-
-        snapshot = snapshotManager.snapshot(3);
-        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
-        assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
-
-        snapshot = snapshotManager.snapshot(4);
-        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
-        assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
-    }
-
-    @Override
-    protected String rowDataToString(InternalRow rowData) {
-        int numFiles;
-        try {
-            numFiles = 
dataFileMetaSerializer.deserializeList(rowData.getBinary(3)).size();
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-
-        return String.format(
-                "%s %d|%d|%d|%d",
-                rowData.getRowKind().shortString(),
-                rowData.getLong(0),
-                deserializeBinaryRow(rowData.getBinary(1)).getInt(0),
-                rowData.getInt(2),
-                numFiles);
-    }
-}

Reply via email to