This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 321c8efa9 [core] Optizime IN filter pushdown to snapshot/tag/schema 
system tables (#4436)
321c8efa9 is described below

commit 321c8efa984e039451c0fd5c8a660322c00b56ae
Author: xuzifu666 <[email protected]>
AuthorDate: Wed Nov 6 13:50:41 2024 +0800

    [core] Optizime IN filter pushdown to snapshot/tag/schema system tables 
(#4436)
---
 .../org/apache/paimon/schema/SchemaManager.java    |  4 +
 .../apache/paimon/table/system/SchemasTable.java   | 29 ++++++-
 .../apache/paimon/table/system/SnapshotsTable.java | 32 +++++++-
 .../org/apache/paimon/table/system/TagsTable.java  | 74 +++++++++++++-----
 .../org/apache/paimon/utils/SnapshotManager.java   |  7 ++
 .../apache/paimon/flink/CatalogTableITCase.java    | 90 ++++++++++++++++++++--
 6 files changed, 205 insertions(+), 31 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index d1efbcfe1..7b987b049 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -128,6 +128,10 @@ public class SchemaManager implements Serializable {
         return 
listAllIds().stream().map(this::schema).collect(Collectors.toList());
     }
 
+    public List<TableSchema> schemasWithId(List<Long> schemaIds) {
+        return 
schemaIds.stream().map(this::schema).collect(Collectors.toList());
+    }
+
     public List<TableSchema> listWithRange(
             Optional<Long> optionalMaxSchemaId, Optional<Long> 
optionalMinSchemaId) {
         Long lowerBoundSchemaId = 0L;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index aab6c1d87..b6150ef75 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -35,6 +35,7 @@ import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.LeafPredicateExtractor;
 import org.apache.paimon.predicate.LessOrEqual;
 import org.apache.paimon.predicate.LessThan;
+import org.apache.paimon.predicate.Or;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaManager;
@@ -64,6 +65,7 @@ import javax.annotation.Nullable;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -203,6 +205,7 @@ public class SchemasTable implements ReadonlyTable {
 
         private Optional<Long> optionalFilterSchemaIdMax = Optional.empty();
         private Optional<Long> optionalFilterSchemaIdMin = Optional.empty();
+        private final List<Long> schemaIds = new ArrayList<>();
 
         public SchemasRead(FileIO fileIO) {
             this.fileIO = fileIO;
@@ -223,6 +226,22 @@ public class SchemasTable implements ReadonlyTable {
                         handleLeafPredicate(leaf, leafName);
                     }
                 }
+
+                // optimize for IN filter
+                if ((compoundPredicate.function()) instanceof Or) {
+                    List<Predicate> children = compoundPredicate.children();
+                    for (Predicate leaf : children) {
+                        if (leaf instanceof LeafPredicate
+                                && (((LeafPredicate) leaf).function() 
instanceof Equal)
+                                && 
leaf.visit(LeafPredicateExtractor.INSTANCE).get(leafName)
+                                        != null) {
+                            schemaIds.add((Long) ((LeafPredicate) 
leaf).literals().get(0));
+                        } else {
+                            schemaIds.clear();
+                            break;
+                        }
+                    }
+                }
             } else {
                 handleLeafPredicate(predicate, leafName);
             }
@@ -279,8 +298,14 @@ public class SchemasTable implements ReadonlyTable {
             Path location = schemasSplit.location;
             SchemaManager manager = new SchemaManager(fileIO, location, 
branch);
 
-            Collection<TableSchema> tableSchemas =
-                    manager.listWithRange(optionalFilterSchemaIdMax, 
optionalFilterSchemaIdMin);
+            Collection<TableSchema> tableSchemas;
+            if (!schemaIds.isEmpty()) {
+                tableSchemas = manager.schemasWithId(schemaIds);
+            } else {
+                tableSchemas =
+                        manager.listWithRange(optionalFilterSchemaIdMax, 
optionalFilterSchemaIdMin);
+            }
+
             Iterator<InternalRow> rows = 
Iterators.transform(tableSchemas.iterator(), this::toRow);
             if (readType != null) {
                 rows =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index 5bec2b109..8bf4766d5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -36,6 +36,7 @@ import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.LeafPredicateExtractor;
 import org.apache.paimon.predicate.LessOrEqual;
 import org.apache.paimon.predicate.LessThan;
+import org.apache.paimon.predicate.Or;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FileStoreTable;
@@ -62,6 +63,7 @@ import java.io.IOException;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -206,6 +208,7 @@ public class SnapshotsTable implements ReadonlyTable {
         private RowType readType;
         private Optional<Long> optionalFilterSnapshotIdMax = Optional.empty();
         private Optional<Long> optionalFilterSnapshotIdMin = Optional.empty();
+        private final List<Long> snapshotIds = new ArrayList<>();
 
         public SnapshotsRead(FileIO fileIO) {
             this.fileIO = fileIO;
@@ -220,12 +223,27 @@ public class SnapshotsTable implements ReadonlyTable {
             String leafName = "snapshot_id";
             if (predicate instanceof CompoundPredicate) {
                 CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
+                List<Predicate> children = compoundPredicate.children();
                 if ((compoundPredicate.function()) instanceof And) {
-                    List<Predicate> children = compoundPredicate.children();
                     for (Predicate leaf : children) {
                         handleLeafPredicate(leaf, leafName);
                     }
                 }
+
+                // optimize for IN filter
+                if ((compoundPredicate.function()) instanceof Or) {
+                    for (Predicate leaf : children) {
+                        if (leaf instanceof LeafPredicate
+                                && (((LeafPredicate) leaf).function() 
instanceof Equal)
+                                && 
leaf.visit(LeafPredicateExtractor.INSTANCE).get(leafName)
+                                        != null) {
+                            snapshotIds.add((Long) ((LeafPredicate) 
leaf).literals().get(0));
+                        } else {
+                            snapshotIds.clear();
+                            break;
+                        }
+                    }
+                }
             } else {
                 handleLeafPredicate(predicate, leafName);
             }
@@ -284,9 +302,15 @@ public class SnapshotsTable implements ReadonlyTable {
             }
             SnapshotManager snapshotManager =
                     new SnapshotManager(fileIO, ((SnapshotsSplit) 
split).location, branch);
-            Iterator<Snapshot> snapshots =
-                    snapshotManager.snapshotsWithinRange(
-                            optionalFilterSnapshotIdMax, 
optionalFilterSnapshotIdMin);
+
+            Iterator<Snapshot> snapshots;
+            if (!snapshotIds.isEmpty()) {
+                snapshots = snapshotManager.snapshotsWithId(snapshotIds);
+            } else {
+                snapshots =
+                        snapshotManager.snapshotsWithinRange(
+                                optionalFilterSnapshotIdMax, 
optionalFilterSnapshotIdMin);
+            }
 
             Iterator<InternalRow> rows = Iterators.transform(snapshots, 
this::toRow);
             if (readType != null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index dd7335e38..f3342e9f2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -26,9 +26,11 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.CompoundPredicate;
 import org.apache.paimon.predicate.Equal;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.LeafPredicateExtractor;
+import org.apache.paimon.predicate.Or;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FileStoreTable;
@@ -74,10 +76,12 @@ public class TagsTable implements ReadonlyTable {
 
     public static final String TAGS = "tags";
 
+    private static final String TAG_NAME = "tag_name";
+
     public static final RowType TABLE_TYPE =
             new RowType(
                     Arrays.asList(
-                            new DataField(0, "tag_name", 
SerializationUtils.newStringType(false)),
+                            new DataField(0, TAG_NAME, 
SerializationUtils.newStringType(false)),
                             new DataField(1, "snapshot_id", new 
BigIntType(false)),
                             new DataField(2, "schema_id", new 
BigIntType(false)),
                             new DataField(3, "commit_time", new 
TimestampType(false, 3)),
@@ -115,7 +119,7 @@ public class TagsTable implements ReadonlyTable {
 
     @Override
     public List<String> primaryKeys() {
-        return Collections.singletonList("tag_name");
+        return Collections.singletonList(TAG_NAME);
     }
 
     @Override
@@ -134,23 +138,20 @@ public class TagsTable implements ReadonlyTable {
     }
 
     private class TagsScan extends ReadOnceTableScan {
-        private @Nullable LeafPredicate tagName;
+        private @Nullable Predicate tagPredicate;
 
         @Override
         public InnerTableScan withFilter(Predicate predicate) {
             if (predicate == null) {
                 return this;
             }
-            // TODO
-            Map<String, LeafPredicate> leafPredicates =
-                    predicate.visit(LeafPredicateExtractor.INSTANCE);
-            tagName = leafPredicates.get("tag_name");
+            tagPredicate = predicate;
             return this;
         }
 
         @Override
         public Plan innerPlan() {
-            return () -> Collections.singletonList(new TagsSplit(location, 
tagName));
+            return () -> Collections.singletonList(new TagsSplit(location, 
tagPredicate));
         }
     }
 
@@ -160,11 +161,11 @@ public class TagsTable implements ReadonlyTable {
 
         private final Path location;
 
-        private final @Nullable LeafPredicate tagName;
+        private final @Nullable Predicate tagPredicate;
 
-        private TagsSplit(Path location, @Nullable LeafPredicate tagName) {
+        private TagsSplit(Path location, @Nullable Predicate tagPredicate) {
             this.location = location;
-            this.tagName = tagName;
+            this.tagPredicate = tagPredicate;
         }
 
         @Override
@@ -176,7 +177,8 @@ public class TagsTable implements ReadonlyTable {
                 return false;
             }
             TagsSplit that = (TagsSplit) o;
-            return Objects.equals(location, that.location) && 
Objects.equals(tagName, that.tagName);
+            return Objects.equals(location, that.location)
+                    && Objects.equals(tagPredicate, that.tagPredicate);
         }
 
         @Override
@@ -217,18 +219,52 @@ public class TagsTable implements ReadonlyTable {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
             Path location = ((TagsSplit) split).location;
-            LeafPredicate predicate = ((TagsSplit) split).tagName;
+            Predicate predicate = ((TagsSplit) split).tagPredicate;
             TagManager tagManager = new TagManager(fileIO, location, branch);
 
             Map<String, Tag> nameToSnapshot = new TreeMap<>();
+            Map<String, Tag> predicateMap = new TreeMap<>();
+            if (predicate != null) {
+                if (predicate instanceof LeafPredicate
+                        && ((LeafPredicate) predicate).function() instanceof 
Equal
+                        && ((LeafPredicate) predicate).literals().get(0) 
instanceof BinaryString
+                        && 
predicate.visit(LeafPredicateExtractor.INSTANCE).get(TAG_NAME) != null) {
+                    String equalValue = ((LeafPredicate) 
predicate).literals().get(0).toString();
+                    if (tagManager.tagExists(equalValue)) {
+                        predicateMap.put(equalValue, 
tagManager.tag(equalValue));
+                    }
+                }
 
-            if (predicate != null
-                    && predicate.function() instanceof Equal
-                    && predicate.literals().get(0) instanceof BinaryString) {
-                String equalValue = predicate.literals().get(0).toString();
-                if (tagManager.tagExists(equalValue)) {
-                    nameToSnapshot.put(equalValue, tagManager.tag(equalValue));
+                if (predicate instanceof CompoundPredicate) {
+                    CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
+                    // optimize for IN filter
+                    if ((compoundPredicate.function()) instanceof Or) {
+                        List<Predicate> children = 
compoundPredicate.children();
+                        for (Predicate leaf : children) {
+                            if (leaf instanceof LeafPredicate
+                                    && (((LeafPredicate) leaf).function() 
instanceof Equal
+                                            && ((LeafPredicate) 
leaf).literals().get(0)
+                                                    instanceof BinaryString)
+                                    && predicate
+                                                    
.visit(LeafPredicateExtractor.INSTANCE)
+                                                    .get(TAG_NAME)
+                                            != null) {
+                                String equalValue =
+                                        ((LeafPredicate) 
leaf).literals().get(0).toString();
+                                if (tagManager.tagExists(equalValue)) {
+                                    predicateMap.put(equalValue, 
tagManager.tag(equalValue));
+                                }
+                            } else {
+                                predicateMap.clear();
+                                break;
+                            }
+                        }
+                    }
                 }
+            }
+
+            if (!predicateMap.isEmpty()) {
+                nameToSnapshot.putAll(predicateMap);
             } else {
                 for (Pair<Tag, String> tag : tagManager.tagObjects()) {
                     nameToSnapshot.put(tag.getValue(), tag.getKey());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 7e2fce0ad..5902d4c84 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -417,6 +417,13 @@ public class SnapshotManager implements Serializable {
                 .collect(Collectors.toList());
     }
 
+    public Iterator<Snapshot> snapshotsWithId(List<Long> snapshotIds) {
+        return snapshotIds.stream()
+                .map(this::snapshot)
+                .sorted(Comparator.comparingLong(Snapshot::id))
+                .iterator();
+    }
+
     public Iterator<Snapshot> snapshotsWithinRange(
             Optional<Long> optionalMaxSnapshotId, Optional<Long> 
optionalMinSnapshotId) {
         Long lowerBoundSnapshotId = earliestSnapshotId();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index b614b5953..9c1a2f4e3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -70,14 +70,23 @@ public class CatalogTableITCase extends CatalogITCaseBase {
         sql("CREATE TABLE T (a INT, b INT)");
         sql("INSERT INTO T VALUES (1, 2)");
         sql("INSERT INTO T VALUES (3, 4)");
+        sql("INSERT INTO T VALUES (5, 6)");
 
         List<Row> result = sql("SELECT snapshot_id, schema_id, commit_kind 
FROM T$snapshots");
-        assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"), 
Row.of(2L, 0L, "APPEND"));
+        assertThat(result)
+                .containsExactly(
+                        Row.of(1L, 0L, "APPEND"),
+                        Row.of(2L, 0L, "APPEND"),
+                        Row.of(3L, 0L, "APPEND"));
 
         result =
                 sql(
                         "SELECT snapshot_id, schema_id, commit_kind FROM 
T$snapshots WHERE schema_id = 0");
-        assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"), 
Row.of(2L, 0L, "APPEND"));
+        assertThat(result)
+                .containsExactly(
+                        Row.of(1L, 0L, "APPEND"),
+                        Row.of(2L, 0L, "APPEND"),
+                        Row.of(3L, 0L, "APPEND"));
 
         result =
                 sql(
@@ -87,7 +96,7 @@ public class CatalogTableITCase extends CatalogITCaseBase {
         result =
                 sql(
                         "SELECT snapshot_id, schema_id, commit_kind FROM 
T$snapshots WHERE snapshot_id > 1");
-        assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND"));
+        assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND"), 
Row.of(3L, 0L, "APPEND"));
 
         result =
                 sql(
@@ -97,12 +106,30 @@ public class CatalogTableITCase extends CatalogITCaseBase {
         result =
                 sql(
                         "SELECT snapshot_id, schema_id, commit_kind FROM 
T$snapshots WHERE snapshot_id >= 1");
-        assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, 
"APPEND"));
+        assertThat(result)
+                .contains(
+                        Row.of(1L, 0L, "APPEND"),
+                        Row.of(2L, 0L, "APPEND"),
+                        Row.of(3L, 0L, "APPEND"));
 
         result =
                 sql(
                         "SELECT snapshot_id, schema_id, commit_kind FROM 
T$snapshots WHERE snapshot_id <= 2");
         assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, 
"APPEND"));
+
+        result =
+                sql(
+                        "SELECT snapshot_id, schema_id, commit_kind FROM 
T$snapshots WHERE snapshot_id in (1, 2)");
+        assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, 
"APPEND"));
+
+        result =
+                sql(
+                        "SELECT snapshot_id, schema_id, commit_kind FROM 
T$snapshots WHERE snapshot_id in (1, 2) or schema_id=0");
+        assertThat(result)
+                .contains(
+                        Row.of(1L, 0L, "APPEND"),
+                        Row.of(2L, 0L, "APPEND"),
+                        Row.of(3L, 0L, "APPEND"));
     }
 
     @Test
@@ -281,6 +308,42 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                                 + 
"{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], 
{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\","
                                 + 
"\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\"}, ]]");
 
+        // test for IN filter
+        result =
+                sql(
+                        "SELECT schema_id, fields, partition_keys, "
+                                + "primary_keys, options, `comment` FROM 
T$schemas where schema_id in (1, 3)");
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT 
NULL\"},"
+                                + 
"{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}],
 [], [\"a\"], "
+                                + 
"{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 
h\",\"b.bb.bbb\":\"val2\"}, ], "
+                                + "+I[3, 
[{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT 
NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},"
+                                + 
"{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], "
+                                + 
"{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 
h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\",\"snapshot.num-retained.min\":\"18\"},
 ]]");
+
+        result =
+                sql(
+                        "SELECT schema_id, fields, partition_keys, "
+                                + "primary_keys, options, `comment` FROM 
T$schemas where schema_id in (1, 3) or 
fields='[{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT 
NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}]'
 order by schema_id");
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT 
NULL\"},"
+                                + 
"{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}],
 [], [\"a\"], "
+                                + 
"{\"a.aa.aaa\":\"val1\",\"b.bb.bbb\":\"val2\"}, ], "
+                                + "+I[1, 
[{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},"
+                                + 
"{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}],
 [], [\"a\"], "
+                                + 
"{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 
h\",\"b.bb.bbb\":\"val2\"}, ], "
+                                + "+I[2, 
[{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},"
+                                + 
"{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}],
 [], [\"a\"], "
+                                + 
"{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 
h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\"}, ], "
+                                + "+I[3, 
[{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT 
NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},"
+                                + 
"{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], "
+                                + 
"{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 
h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\",\"snapshot.num-retained.min\":\"18\"},
 ], "
+                                + "+I[4, 
[{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT 
NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}],
 [], [\"a\"], "
+                                + 
"{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 
h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\",\"manifest.format\":\"avro\","
+                                + "\"snapshot.num-retained.min\":\"18\"}, ]]");
+
         // check with not exist schema id
         assertThatThrownBy(
                         () ->
@@ -844,20 +907,35 @@ public class CatalogTableITCase extends CatalogITCaseBase 
{
         sql("CREATE TABLE T (a INT, b INT)");
         sql("INSERT INTO T VALUES (1, 2)");
         sql("INSERT INTO T VALUES (3, 4)");
+        sql("INSERT INTO T VALUES (5, 6)");
 
         paimonTable("T").createTag("tag1", 1);
         paimonTable("T").createTag("tag2", 2);
+        paimonTable("T").createTag("tag3", 3);
 
         List<Row> result =
                 sql(
                         "SELECT tag_name, snapshot_id, schema_id, record_count 
FROM T$tags ORDER BY tag_name");
-
-        assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L), 
Row.of("tag2", 2L, 0L, 2L));
+        assertThat(result)
+                .containsExactly(
+                        Row.of("tag1", 1L, 0L, 1L),
+                        Row.of("tag2", 2L, 0L, 2L),
+                        Row.of("tag3", 3L, 0L, 3L));
 
         result =
                 sql(
                         "SELECT tag_name, snapshot_id, schema_id, record_count 
FROM T$tags where tag_name = 'tag1' ");
         assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L));
+
+        result =
+                sql(
+                        "SELECT tag_name, snapshot_id, schema_id, record_count 
FROM T$tags where tag_name in ('tag1', 'tag3')");
+        assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L), 
Row.of("tag3", 3L, 0L, 3L));
+
+        result =
+                sql(
+                        "SELECT tag_name, snapshot_id, schema_id, record_count 
FROM T$tags where tag_name in ('tag1') or snapshot_id=2");
+        assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L), 
Row.of("tag2", 2L, 0L, 2L));
     }
 
     @Test

Reply via email to