This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ba4576657e [core] Support to query indexes (#4788)
ba4576657e is described below

commit ba4576657edf1dde60ac321423d4fa6032169119
Author: yuzelin <[email protected]>
AuthorDate: Fri Dec 27 13:40:34 2024 +0800

    [core] Support to query indexes (#4788)
---
 docs/content/concepts/system-tables.md             |  19 ++
 .../paimon/table/system/SystemTableLoader.java     |   2 +
 .../paimon/table/system/TableIndexesTable.java     | 238 +++++++++++++++++++++
 .../org/apache/paimon/flink/SystemTableITCase.java |  32 +++
 4 files changed, 291 insertions(+)

diff --git a/docs/content/concepts/system-tables.md 
b/docs/content/concepts/system-tables.md
index 5795aea419..92119874e2 100644
--- a/docs/content/concepts/system-tables.md
+++ b/docs/content/concepts/system-tables.md
@@ -389,6 +389,25 @@ SELECT * FROM T$statistics;
 */
 ```
 
+### Table Indexes Table
+
+You can query the table's index files generated for dynamic bucket table 
(index_type = HASH) and deletion vectors
+(index_type = DELETION_VECTORS) through indexes table.
+
+```sql
+SELECT * FROM my_table$table_indexes;
+
+/*
++--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
+|                      partition |      bucket |                     
index_type |                      file_name |            file_size |            
row_count |                      dv_ranges |
++--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
+|                   [2024-10-01] |           0 |                           
HASH | index-70abfebf-149e-4796-9f... |                   12 |                  
  3 |                         <NULL> |
+|                   [2024-10-01] |           0 |               
DELETION_VECTORS | index-633857e7-cdce-47d2-87... |                   33 |      
              1 | [(data-346cb9c8-4032-4d66-a... |
++--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
+2 rows in set
+*/
+```
+
 ## Global System Table
 
 Global system tables contain the statistical information of all the tables 
exists in paimon. For convenient of searching, we create a reference system 
database called `sys`.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index b77b72e412..57c3c2caac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -48,6 +48,7 @@ import static 
org.apache.paimon.table.system.ReadOptimizedTable.READ_OPTIMIZED;
 import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
 import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
 import static org.apache.paimon.table.system.StatisticTable.STATISTICS;
+import static org.apache.paimon.table.system.TableIndexesTable.TABLE_INDEXES;
 import static org.apache.paimon.table.system.TagsTable.TAGS;
 
 /** Loader to load system {@link Table}s. */
@@ -70,6 +71,7 @@ public class SystemTableLoader {
                     .put(AGGREGATION_FIELDS, AggregationFieldsTable::new)
                     .put(STATISTICS, StatisticTable::new)
                     .put(BINLOG, BinlogTable::new)
+                    .put(TABLE_INDEXES, TableIndexesTable::new)
                     .build();
 
     public static final List<String> SYSTEM_TABLES = new 
ArrayList<>(SYSTEM_TABLE_LOADERS.keySet());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
new file mode 100644
index 0000000000..08731e768a
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMetaSerializer;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+
+/** A {@link Table} for showing indexes. */
+public class TableIndexesTable implements ReadonlyTable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TableIndexesTable.class);
+
+    public static final String TABLE_INDEXES = "table_indexes";
+
+    public static final RowType TABLE_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new DataField(0, "partition", 
SerializationUtils.newStringType(true)),
+                            new DataField(1, "bucket", new IntType(false)),
+                            new DataField(2, "index_type", 
newStringType(false)),
+                            new DataField(3, "file_name", 
newStringType(false)),
+                            new DataField(4, "file_size", new 
BigIntType(false)),
+                            new DataField(5, "row_count", new 
BigIntType(false)),
+                            new DataField(
+                                    6,
+                                    "dv_ranges",
+                                    new ArrayType(true, 
DeletionVectorMeta.SCHEMA))));
+
+    private final FileStoreTable dataTable;
+
+    public TableIndexesTable(FileStoreTable dataTable) {
+        this.dataTable = dataTable;
+    }
+
+    @Override
+    public InnerTableScan newScan() {
+        return new IndexesScan();
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        return new IndexesRead(dataTable);
+    }
+
+    @Override
+    public String name() {
+        return dataTable.name() + SYSTEM_TABLE_SPLITTER + TABLE_INDEXES;
+    }
+
+    @Override
+    public RowType rowType() {
+        return TABLE_TYPE;
+    }
+
+    @Override
+    public List<String> primaryKeys() {
+        return Collections.singletonList("file_name");
+    }
+
+    @Override
+    public Table copy(Map<String, String> dynamicOptions) {
+        return new TableIndexesTable(dataTable.copy(dynamicOptions));
+    }
+
+    private static class IndexesScan extends ReadOnceTableScan {
+
+        @Override
+        public InnerTableScan withFilter(Predicate predicate) {
+            return this;
+        }
+
+        @Override
+        protected Plan innerPlan() {
+            return () -> Collections.singletonList(new IndexesSplit());
+        }
+    }
+
+    private static class IndexesSplit extends SingletonSplit {
+
+        private static final long serialVersionUID = 1L;
+
+        private IndexesSplit() {}
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            return o != null && getClass() == o.getClass();
+        }
+    }
+
+    private static class IndexesRead implements InnerTableRead {
+
+        private RowType readType;
+
+        private final FileStoreTable dataTable;
+
+        public IndexesRead(FileStoreTable dataTable) {
+            this.dataTable = dataTable;
+        }
+
+        @Override
+        public InnerTableRead withFilter(Predicate predicate) {
+            return this;
+        }
+
+        @Override
+        public InnerTableRead withReadType(RowType readType) {
+            this.readType = readType;
+            return this;
+        }
+
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            return this;
+        }
+
+        @Override
+        public RecordReader<InternalRow> createReader(Split split) {
+            if (!(split instanceof IndexesSplit)) {
+                throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
+            }
+            List<IndexManifestEntry> manifestFileMetas = 
allIndexEntries(dataTable);
+
+            RowDataToObjectArrayConverter partitionConverter =
+                    new 
RowDataToObjectArrayConverter(dataTable.schema().logicalPartitionType());
+
+            Iterator<InternalRow> rows =
+                    Iterators.transform(
+                            manifestFileMetas.iterator(),
+                            indexManifestEntry -> toRow(indexManifestEntry, 
partitionConverter));
+            if (readType != null) {
+                rows =
+                        Iterators.transform(
+                                rows,
+                                row ->
+                                        ProjectedRow.from(readType, 
TableIndexesTable.TABLE_TYPE)
+                                                .replaceRow(row));
+            }
+            return new IteratorRecordReader<>(rows);
+        }
+
+        private InternalRow toRow(
+                IndexManifestEntry indexManifestEntry,
+                RowDataToObjectArrayConverter partitionConverter) {
+            LinkedHashMap<String, DeletionVectorMeta> dvMetas =
+                    indexManifestEntry.indexFile().deletionVectorMetas();
+            return GenericRow.of(
+                    BinaryString.fromString(
+                            Arrays.toString(
+                                    
partitionConverter.convert(indexManifestEntry.partition()))),
+                    indexManifestEntry.bucket(),
+                    
BinaryString.fromString(indexManifestEntry.indexFile().indexType()),
+                    
BinaryString.fromString(indexManifestEntry.indexFile().fileName()),
+                    indexManifestEntry.indexFile().fileSize(),
+                    indexManifestEntry.indexFile().rowCount(),
+                    dvMetas == null
+                            ? null
+                            : 
IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values()));
+        }
+    }
+
+    private static List<IndexManifestEntry> allIndexEntries(FileStoreTable 
dataTable) {
+        IndexFileHandler indexFileHandler = 
dataTable.store().newIndexFileHandler();
+        Snapshot snapshot = TimeTravelUtil.resolveSnapshot(dataTable);
+        if (snapshot == null) {
+            LOG.warn("Check if your snapshot is empty.");
+            return Collections.emptyList();
+        }
+        String indexManifest = snapshot.indexManifest();
+        if (indexManifest == null || 
!indexFileHandler.existsManifest(indexManifest)) {
+            LOG.warn("indexManifest doesn't exist.");
+            return Collections.emptyList();
+        }
+
+        return indexFileHandler.readManifest(indexManifest);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
index 771f4acc5e..98ec635e85 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
@@ -63,4 +63,36 @@ public class SystemTableITCase extends CatalogTableITCase {
                         Row.of("+I", new Integer[] {1}, new Integer[] {3}),
                         Row.of("+I", new Integer[] {2}, new Integer[] {2}));
     }
+
+    @Test
+    public void testIndexesTable() {
+        sql(
+                "CREATE TABLE T (pt STRING, a INT, b STRING, PRIMARY KEY (pt, 
a) NOT ENFORCED)"
+                        + " PARTITIONED BY (pt) with 
('deletion-vectors.enabled'='true')");
+        sql(
+                "INSERT INTO T VALUES ('2024-10-01', 1, 
'aaaaaaaaaaaaaaaaaaa'), ('2024-10-01', 2, 'b'), ('2024-10-01', 3, 'c')");
+        sql("INSERT INTO T VALUES ('2024-10-01', 1, 'a_new1'), ('2024-10-01', 
3, 'c_new1')");
+
+        List<Row> rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type 
= 'HASH'");
+        assertThat(rows.size()).isEqualTo(1);
+        Row row = rows.get(0);
+        assertThat(row.getField(0)).isEqualTo("[2024-10-01]");
+        assertThat(row.getField(1)).isEqualTo(0);
+        assertThat(row.getField(2)).isEqualTo("HASH");
+        assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
+        assertThat(row.getField(4)).isEqualTo(12L);
+        assertThat(row.getField(5)).isEqualTo(3L);
+        assertThat(row.getField(6)).isNull();
+
+        rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type = 
'DELETION_VECTORS'");
+        assertThat(rows.size()).isEqualTo(1);
+        row = rows.get(0);
+        assertThat(row.getField(0)).isEqualTo("[2024-10-01]");
+        assertThat(row.getField(1)).isEqualTo(0);
+        assertThat(row.getField(2)).isEqualTo("DELETION_VECTORS");
+        assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
+        assertThat(row.getField(4)).isEqualTo(33L);
+        assertThat(row.getField(5)).isEqualTo(1L);
+        assertThat(row.getField(6)).isNotNull();
+    }
 }

Reply via email to