This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 67df9b2a4 [core] Support for query manifests (#1399)
67df9b2a4 is described below
commit 67df9b2a48993f0703ffa9df7bac18956a75f42e
Author: xwmr-max <[email protected]>
AuthorDate: Tue Jul 4 10:14:36 2023 +0800
[core] Support for query manifests (#1399)
---
.../apache/paimon/table/system/ManifestsTable.java | 252 +++++++++++++++++++++
.../paimon/table/system/SystemTableLoader.java | 3 +
.../apache/paimon/flink/CatalogTableITCase.java | 26 +++
.../org/apache/paimon/spark/SparkReadITCase.java | 24 ++
4 files changed, 305 insertions(+)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
new file mode 100644
index 000000000..f1cc8bce9
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.SerializationUtils;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+
+/** A {@link Table} for showing committing snapshots of table. */
+public class ManifestsTable implements ReadonlyTable {
+ private static final long serialVersionUID = 1L;
+
+ public static final String MANIFESTS = "manifests";
+
+ public static final RowType TABLE_TYPE =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "file_name",
SerializationUtils.newStringType(false)),
+ new DataField(1, "file_size", new
BigIntType(false)),
+ new DataField(2, "num_added_files", new
BigIntType(false)),
+ new DataField(3, "num_deleted_files", new
BigIntType(false)),
+ new DataField(4, "schema_id", new
BigIntType(false))));
+
+ private final FileIO fileIO;
+ private final Path location;
+ private final Table dataTable;
+
+ public ManifestsTable(FileIO fileIO, Path location, Table dataTable) {
+ this.fileIO = fileIO;
+ this.location = location;
+ this.dataTable = dataTable;
+ }
+
+ @Override
+ public InnerTableScan newScan() {
+ return new ManifestsScan();
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ return new ManifestsRead(fileIO, dataTable);
+ }
+
+ @Override
+ public String name() {
+ return location.getName() + SYSTEM_TABLE_SPLITTER + MANIFESTS;
+ }
+
+ @Override
+ public RowType rowType() {
+ return TABLE_TYPE;
+ }
+
+ @Override
+ public List<String> primaryKeys() {
+ return Collections.singletonList("file_name");
+ }
+
+ @Override
+ public Table copy(Map<String, String> dynamicOptions) {
+ return new ManifestsTable(fileIO, location, dataTable);
+ }
+
+ private class ManifestsScan extends ReadOnceTableScan {
+
+ @Override
+ public InnerTableScan withFilter(Predicate predicate) {
+ // TODO
+ return this;
+ }
+
+ @Override
+ protected Plan innerPlan() {
+ return () -> Collections.singletonList(new ManifestsSplit(fileIO,
location, dataTable));
+ }
+ }
+
+ private static class ManifestsSplit implements Split {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileIO fileIO;
+ private final Path location;
+ private final Table dataTable;
+
+ private ManifestsSplit(FileIO fileIO, Path location, Table dataTable) {
+ this.fileIO = fileIO;
+ this.location = location;
+ this.dataTable = dataTable;
+ }
+
+ @Override
+ public long rowCount() {
+ return new StatsManifestsGetter(fileIO, location,
dataTable).manifestFileMetas().size();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ManifestsSplit that = (ManifestsSplit) o;
+ return Objects.equals(location, that.location);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(location);
+ }
+ }
+
+ private static class ManifestsRead implements InnerTableRead {
+
+ private int[][] projection;
+
+ private FileIO fileIO;
+
+ private Table dataTable;
+
+ public ManifestsRead(FileIO fileIO, Table dataTable) {
+ this.fileIO = fileIO;
+ this.dataTable = dataTable;
+ }
+
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+ // TODO
+ return this;
+ }
+
+ @Override
+ public InnerTableRead withProjection(int[][] projection) {
+ this.projection = projection;
+ return this;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ if (!(split instanceof ManifestsSplit)) {
+ throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
+ }
+ Path location = ((ManifestsSplit) split).location;
+ Snapshot snapshot = new SnapshotManager(fileIO,
location).latestSnapshot();
+ List<ManifestFileMeta> manifestFileMetas =
+ new StatsManifestsGetter(fileIO, location,
dataTable).manifestFileMetas();
+
+ Iterator<InternalRow> rows =
+ Iterators.transform(manifestFileMetas.iterator(),
this::toRow);
+ if (projection != null) {
+ rows =
+ Iterators.transform(
+ rows, row ->
ProjectedRow.from(projection).replaceRow(row));
+ }
+ return new IteratorRecordReader<>(rows);
+ }
+
+ private InternalRow toRow(ManifestFileMeta manifestFileMeta) {
+ return GenericRow.of(
+ BinaryString.fromString(manifestFileMeta.fileName()),
+ manifestFileMeta.fileSize(),
+ manifestFileMeta.numAddedFiles(),
+ manifestFileMeta.numDeletedFiles(),
+ manifestFileMeta.schemaId());
+ }
+ }
+
+ private static class StatsManifestsGetter {
+ private final FileIO fileIO;
+ private final Table dataTable;
+ private final Path location;
+
+ private List<ManifestFileMeta> manifestFileMetas;
+
+ private StatsManifestsGetter(FileIO fileIO, Path location, Table
dataTable) {
+ this.fileIO = fileIO;
+ this.location = location;
+ this.dataTable = dataTable;
+ }
+
+ private void initialize() {
+ Snapshot snapshot = new SnapshotManager(fileIO,
location).latestSnapshot();
+ FileStorePathFactory fileStorePathFactory = new
FileStorePathFactory(location);
+ CoreOptions coreOptions = CoreOptions.fromMap(dataTable.options());
+ FileFormat fileFormat = coreOptions.manifestFormat();
+ ManifestList manifestList =
+ new ManifestList.Factory(fileIO, fileFormat,
fileStorePathFactory, null)
+ .create();
+ manifestFileMetas = snapshot.allManifests(manifestList);
+ }
+
+ private List<ManifestFileMeta> manifestFileMetas() {
+ if (manifestFileMetas == null) {
+ initialize();
+ }
+ return manifestFileMetas;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 55d684208..2bfc39fad 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
import static org.apache.paimon.table.system.FilesTable.FILES;
+import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
@@ -40,6 +41,8 @@ public class SystemTableLoader {
public static Table load(String type, FileIO fileIO, FileStoreTable
dataTable) {
Path location = dataTable.location();
switch (type.toLowerCase()) {
+ case MANIFESTS:
+ return new ManifestsTable(fileIO, location, dataTable);
case SNAPSHOTS:
return new SnapshotsTable(fileIO, location);
case OPTIONS:
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 95b36fd70..746d097ab 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -98,6 +98,32 @@ public class CatalogTableITCase extends CatalogITCaseBase {
+ "'Identifier{database='default',
table='T$aa$bb'}', please use data table.");
}
+ @Test
+ public void testManifestsTable() throws Exception {
+ sql("CREATE TABLE T (a INT, b INT)");
+ sql("INSERT INTO T VALUES (1, 2)");
+
+ List<Row> result = sql("SELECT schema_id, file_name, file_size FROM
T$manifests");
+
+ result.forEach(
+ row -> {
+ assertThat((long) row.getField(0)).isEqualTo(0L);
+ assertThat(StringUtils.startsWith((String)
row.getField(1), "manifest"))
+ .isTrue();
+ assertThat((long) row.getField(2)).isGreaterThan(0L);
+ });
+ }
+
+ @Test
+ public void testManifestsTableWithFileCount() {
+ sql("CREATE TABLE T (a INT, b INT)");
+ sql("INSERT INTO T VALUES (1, 2)");
+ sql("INSERT INTO T VALUES (3, 4)");
+
+ List<Row> result = sql("SELECT num_added_files, num_deleted_files FROM
T$manifests");
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1L, 0L),
Row.of(1L, 0L));
+ }
+
@Test
public void testSchemasTable() throws Exception {
sql(
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 3d51e3933..05f54e86b 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -112,6 +112,30 @@ public class SparkReadITCase extends SparkReadTestBase {
assertThat(rows.toString()).isEqualTo("[[1,2,2,0]]");
}
+ @Test
+ public void testManifestsTable() {
+ List<Row> rows =
+ spark.table("`t1$manifests`")
+ .select("schema_id", "file_name", "file_size")
+ .collectAsList();
+ Long schemaId = rows.get(0).getLong(0);
+ String fileName = rows.get(0).getString(1);
+ Long fileSize = rows.get(0).getLong(2);
+
+ assertThat(schemaId).isEqualTo(0L);
+ assertThat(fileName).startsWith("manifest");
+ assertThat(fileSize).isGreaterThan(0L);
+ }
+
+ @Test
+ public void testManifestsTableWithRecordCount() {
+ List<Row> rows =
+ spark.table("`t1$manifests`")
+ .select("num_added_files", "num_deleted_files")
+ .collectAsList();
+ assertThat(rows.toString()).isEqualTo("[[1,0]]");
+ }
+
@Test
public void testCatalogFilterPushDown() {
innerTestSimpleTypeFilterPushDown(spark.table("t1"));