This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b8e5a4b164 [core] Fix bug when both main branch and fallback branch 
have no primary key (#5665)
b8e5a4b164 is described below

commit b8e5a4b16495150730b0876f563f348e78339dff
Author: tsreaper <[email protected]>
AuthorDate: Tue May 27 17:13:07 2025 +0800

    [core] Fix bug when both main branch and fallback branch have no primary 
key (#5665)
---
 .../paimon/table/FallbackReadFileStoreTable.java   | 63 +++++++++++++++++++---
 .../org/apache/paimon/table/source/DataSplit.java  |  2 +-
 .../org/apache/paimon/flink/BranchSqlITCase.java   | 22 ++++++++
 3 files changed, 79 insertions(+), 8 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 07d837bd42..4ceabed38b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -25,6 +25,10 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.options.Options;
@@ -49,11 +53,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -233,6 +240,49 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         return true;
     }
 
+    private static class FallbackDataSplit extends DataSplit {
+
+        private static final long serialVersionUID = 1L;
+
+        private boolean isFallback;
+
+        private FallbackDataSplit(DataSplit dataSplit, boolean isFallback) {
+            assign(dataSplit);
+            this.isFallback = isFallback;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return super.equals(o) && isFallback == ((FallbackDataSplit) 
o).isFallback;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), isFallback);
+        }
+
+        private void writeObject(ObjectOutputStream out) throws IOException {
+            serialize(new DataOutputViewStreamWrapper(out));
+        }
+
+        private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+            FallbackDataSplit split = deserialize(new 
DataInputViewStreamWrapper(in));
+            assign(split);
+            this.isFallback = split.isFallback;
+        }
+
+        @Override
+        public void serialize(DataOutputView out) throws IOException {
+            super.serialize(out);
+            out.writeBoolean(isFallback);
+        }
+
+        public static FallbackDataSplit deserialize(DataInputView in) throws 
IOException {
+            DataSplit dataSplit = DataSplit.deserialize(in);
+            return new FallbackDataSplit(dataSplit, in.readBoolean());
+        }
+    }
+
     /** Scan implementation for {@link FallbackReadFileStoreTable}. */
     public static class FallbackReadScan implements DataTableScan {
 
@@ -320,7 +370,7 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
             Set<BinaryRow> completePartitions = new HashSet<>();
             for (Split split : mainScan.plan().splits()) {
                 DataSplit dataSplit = (DataSplit) split;
-                splits.add(dataSplit);
+                splits.add(new FallbackDataSplit(dataSplit, false));
                 completePartitions.add(dataSplit.partition());
             }
 
@@ -331,7 +381,7 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
             if (!remainingPartitions.isEmpty()) {
                 fallbackScan.withPartitionFilter(remainingPartitions);
                 for (Split split : fallbackScan.plan().splits()) {
-                    splits.add((DataSplit) split);
+                    splits.add(new FallbackDataSplit((DataSplit) split, true));
                 }
             }
             return new DataFilePlan(splits);
@@ -405,11 +455,10 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
 
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
-            DataSplit dataSplit = (DataSplit) split;
-            if (!dataSplit.dataFiles().isEmpty()
-                    && dataSplit.dataFiles().get(0).minKey().getFieldCount() > 
0) {
+            FallbackDataSplit dataSplit = (FallbackDataSplit) split;
+            if (dataSplit.isFallback) {
                 try {
-                    return fallbackRead.createReader(split);
+                    return fallbackRead.createReader(dataSplit);
                 } catch (Exception ignored) {
                     LOG.error(
                             "Reading from fallback branch has problems for 
files: {}",
@@ -418,7 +467,7 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
                                     .collect(Collectors.joining(", ")));
                 }
             }
-            return mainRead.createReader(split);
+            return mainRead.createReader(dataSplit);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index c34fca6997..f4491618d8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -312,7 +312,7 @@ public class DataSplit implements Split {
         assign(deserialize(new DataInputViewStreamWrapper(in)));
     }
 
-    private void assign(DataSplit other) {
+    protected void assign(DataSplit other) {
         this.snapshotId = other.snapshotId;
         this.partition = other.partition;
         this.bucket = other.bucket;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index ec161e76ac..57a6746fea 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -655,6 +655,28 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                         "+I[2, 20, dog, 2]");
     }
 
+    @Test
+    public void testMainAndFallbackNoPrimaryKeys() throws Exception {
+        sql("CREATE TABLE t ( pt INT, v INT ) PARTITIONED BY (pt) WITH ( 
'bucket' = '-1' )");
+        sql("INSERT INTO t VALUES (1, 110)");
+        sql("CALL sys.create_branch('default.t', 'test')");
+        sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'test' )");
+        sql("INSERT INTO `t$branch_test` VALUES (2, 210)");
+        assertThat(collectResult("SELECT * FROM t"))
+                .containsExactlyInAnyOrder("+I[1, 110]", "+I[2, 210]");
+
+        sql("ALTER TABLE t ADD v2 INT");
+        sql("ALTER TABLE `t$branch_test` ADD v2 INT");
+        sql("INSERT INTO t VALUES (1, 120, 1200)");
+        sql("INSERT INTO `t$branch_test` VALUES (2, 220, 2200)");
+        assertThat(collectResult("SELECT * FROM t"))
+                .containsExactlyInAnyOrder(
+                        "+I[1, 110, null]",
+                        "+I[2, 210, null]",
+                        "+I[1, 120, 1200]",
+                        "+I[2, 220, 2200]");
+    }
+
     private List<String> collectResult(String sql) throws Exception {
         List<String> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {

Reply via email to