This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0acc99ab4 [core] Added system table consumers can be queried. (#1489)
0acc99ab4 is described below

commit 0acc99ab41818f2cfe33a2f4488f466c1e618259
Author: Kerwin <[email protected]>
AuthorDate: Mon Jul 3 19:59:31 2023 +0800

    [core] Added system table consumers can be queried. (#1489)
---
 docs/content/how-to/system-tables.md               |  18 ++
 .../apache/paimon/consumer/ConsumerManager.java    |  87 +++++----
 .../apache/paimon/table/system/ConsumersTable.java | 199 +++++++++++++++++++++
 .../paimon/table/system/SystemTableLoader.java     |   3 +
 .../java/org/apache/paimon/utils/FileUtils.java    |  30 +++-
 .../java/org/apache/paimon/utils/TagManager.java   |  52 ++----
 .../apache/paimon/flink/CatalogTableITCase.java    |  19 ++
 7 files changed, 322 insertions(+), 86 deletions(-)

diff --git a/docs/content/how-to/system-tables.md 
b/docs/content/how-to/system-tables.md
index fed2a26d3..51d6a2deb 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -176,3 +176,21 @@ SELECT * FROM MyTable$tags;
 2 rows in set
 */
 ```
+
+## Consumers Table
+
+You can query all consumers which contains next snapshot.
+
+```sql
+SELECT * FROM MyTable$consumers;
+
+/*
++-------------+------------------+
+| consumer_id | next_snapshot_id |
++-------------+------------------+
+|         id1 |                1 |
+|         id2 |                3 |
++-------------+------------------+
+2 rows in set
+*/
+```
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java 
b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
index d9565b0a9..247f4372e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.consumer;
 
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.utils.DateTimeUtils;
@@ -30,9 +29,15 @@ import java.io.Serializable;
 import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
 import java.time.LocalDateTime;
-import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.FileUtils.listOriginalVersionedFiles;
+import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
 
 /** Manage consumer groups. */
 public class ConsumerManager implements Serializable {
@@ -65,24 +70,8 @@ public class ConsumerManager implements Serializable {
 
     public OptionalLong minNextSnapshot() {
         try {
-            Path directory = consumerDirectory();
-            if (!fileIO.exists(directory)) {
-                return OptionalLong.empty();
-            }
-
-            FileStatus[] statuses = fileIO.listStatus(directory);
-
-            if (statuses == null) {
-                throw new RuntimeException(
-                        String.format(
-                                "The return value is null of the listStatus 
for the '%s' directory.",
-                                directory));
-            }
-
-            return Arrays.stream(statuses)
-                    .map(FileStatus::getPath)
-                    .filter(path -> path.getName().startsWith(CONSUMER_PREFIX))
-                    .map(path -> Consumer.fromPath(fileIO, path))
+            return listOriginalVersionedFiles(fileIO, consumerDirectory(), 
CONSUMER_PREFIX)
+                    .map(this::consumer)
                     .filter(Optional::isPresent)
                     .map(Optional::get)
                     .mapToLong(Consumer::nextSnapshot)
@@ -94,34 +83,42 @@ public class ConsumerManager implements Serializable {
 
     public void expire(LocalDateTime expireDateTime) {
         try {
-            Path directory = consumerDirectory();
-            if (!fileIO.exists(directory)) {
-                return;
-            }
-
-            FileStatus[] statuses = fileIO.listStatus(directory);
-
-            if (statuses == null) {
-                throw new RuntimeException(
-                        String.format(
-                                "The return value is null of the listStatus 
for the '%s' directory.",
-                                directory));
-            }
-
-            for (FileStatus status : statuses) {
-                if (isConsumerFile(status.getPath().getName())) {
-                    LocalDateTime modificationTime =
-                            
DateTimeUtils.toLocalDateTime(status.getModificationTime());
-                    if (expireDateTime.isAfter(modificationTime)) {
-                        fileIO.deleteQuietly(status.getPath());
-                    }
-                }
-            }
+            listVersionedFileStatus(fileIO, consumerDirectory(), 
CONSUMER_PREFIX)
+                    .forEach(
+                            status -> {
+                                LocalDateTime modificationTime =
+                                        
DateTimeUtils.toLocalDateTime(status.getModificationTime());
+                                if (expireDateTime.isAfter(modificationTime)) {
+                                    fileIO.deleteQuietly(status.getPath());
+                                }
+                            });
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
     }
 
+    /** Get all consumer. */
+    public Map<String, Long> consumers() throws IOException {
+        Map<String, Long> consumers = new HashMap<>();
+        listOriginalVersionedFiles(fileIO, consumerDirectory(), 
CONSUMER_PREFIX)
+                .forEach(
+                        id -> {
+                            Optional<Consumer> consumer = this.consumer(id);
+                            consumer.ifPresent(value -> consumers.put(id, 
value.nextSnapshot()));
+                        });
+        return consumers;
+    }
+
+    /** List all consumer IDs. */
+    public List<String> listAllIds() {
+        try {
+            return listOriginalVersionedFiles(fileIO, consumerDirectory(), 
CONSUMER_PREFIX)
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
     private Path consumerDirectory() {
         return new Path(tablePath + "/consumer");
     }
@@ -129,8 +126,4 @@ public class ConsumerManager implements Serializable {
     private Path consumerPath(String consumerId) {
         return new Path(tablePath + "/consumer/" + CONSUMER_PREFIX + 
consumerId);
     }
-
-    private boolean isConsumerFile(String file) {
-        return file.startsWith(CONSUMER_PREFIX);
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
new file mode 100644
index 000000000..570dc4683
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+
+/** A {@link Table} for showing consumers of table. */
+public class ConsumersTable implements ReadonlyTable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String CONSUMERS = "consumers";
+
+    public static final RowType TABLE_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new DataField(
+                                    0, "consumer_id", 
SerializationUtils.newStringType(false)),
+                            new DataField(1, "next_snapshot_id", new 
BigIntType(false))));
+
+    private final FileIO fileIO;
+    private final Path location;
+
+    public ConsumersTable(FileIO fileIO, Path location) {
+        this.fileIO = fileIO;
+        this.location = location;
+    }
+
+    @Override
+    public String name() {
+        return location.getName() + SYSTEM_TABLE_SPLITTER + CONSUMERS;
+    }
+
+    @Override
+    public RowType rowType() {
+        return TABLE_TYPE;
+    }
+
+    @Override
+    public List<String> primaryKeys() {
+        return Collections.singletonList("consumer_id");
+    }
+
+    @Override
+    public InnerTableScan newScan() {
+        return new ConsumersTable.ConsumersScan();
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        return new ConsumersTable.ConsumersRead(fileIO);
+    }
+
+    @Override
+    public Table copy(Map<String, String> dynamicOptions) {
+        return new ConsumersTable(fileIO, location);
+    }
+
+    private class ConsumersScan extends ReadOnceTableScan {
+
+        @Override
+        public InnerTableScan withFilter(Predicate predicate) {
+            return this;
+        }
+
+        @Override
+        public Plan innerPlan() {
+            return () ->
+                    Collections.singletonList(new 
ConsumersTable.ConsumersSplit(fileIO, location));
+        }
+    }
+
+    /** {@link Split} implementation for {@link ConsumersTable}. */
+    private static class ConsumersSplit implements Split {
+
+        private static final long serialVersionUID = 1L;
+
+        private final FileIO fileIO;
+        private final Path location;
+
+        private ConsumersSplit(FileIO fileIO, Path location) {
+            this.fileIO = fileIO;
+            this.location = location;
+        }
+
+        @Override
+        public long rowCount() {
+            return new ConsumerManager(fileIO, location).listAllIds().size();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ConsumersTable.ConsumersSplit that = 
(ConsumersTable.ConsumersSplit) o;
+            return Objects.equals(location, that.location);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(location);
+        }
+    }
+
+    /** {@link TableRead} implementation for {@link ConsumersTable}. */
+    private static class ConsumersRead implements InnerTableRead {
+
+        private final FileIO fileIO;
+        private int[][] projection;
+
+        public ConsumersRead(FileIO fileIO) {
+            this.fileIO = fileIO;
+        }
+
+        @Override
+        public InnerTableRead withFilter(Predicate predicate) {
+            return this;
+        }
+
+        @Override
+        public InnerTableRead withProjection(int[][] projection) {
+            this.projection = projection;
+            return this;
+        }
+
+        @Override
+        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+            if (!(split instanceof ConsumersTable.ConsumersSplit)) {
+                throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
+            }
+            Path location = ((ConsumersTable.ConsumersSplit) split).location;
+            Map<String, Long> consumers = new ConsumerManager(fileIO, 
location).consumers();
+            Iterator<InternalRow> rows =
+                    Iterators.transform(consumers.entrySet().iterator(), 
this::toRow);
+            if (projection != null) {
+                rows =
+                        Iterators.transform(
+                                rows, row -> 
ProjectedRow.from(projection).replaceRow(row));
+            }
+            return new IteratorRecordReader<>(rows);
+        }
+
+        private InternalRow toRow(Map.Entry<String, Long> consumer) {
+            return GenericRow.of(BinaryString.fromString(consumer.getKey()), 
consumer.getValue());
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 01fa2432d..55d684208 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -26,6 +26,7 @@ import org.apache.paimon.table.Table;
 import javax.annotation.Nullable;
 
 import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
+import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
 import static org.apache.paimon.table.system.FilesTable.FILES;
 import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
 import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
@@ -51,6 +52,8 @@ public class SystemTableLoader {
                 return new FilesTable(dataTable);
             case TAGS:
                 return new TagsTable(fileIO, location);
+            case CONSUMERS:
+                return new ConsumersTable(fileIO, location);
             default:
                 return null;
         }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
index 3c12aa85f..e41ba0241 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
@@ -72,8 +72,31 @@ public class FileUtils {
      */
     public static Stream<Long> listVersionedFiles(FileIO fileIO, Path dir, 
String prefix)
             throws IOException {
+        return listOriginalVersionedFiles(fileIO, dir, 
prefix).map(Long::parseLong);
+    }
+
+    /**
+     * List original versioned files for the directory.
+     *
+     * @return version stream
+     */
+    public static Stream<String> listOriginalVersionedFiles(FileIO fileIO, 
Path dir, String prefix)
+            throws IOException {
+        return listVersionedFileStatus(fileIO, dir, prefix)
+                .map(FileStatus::getPath)
+                .map(Path::getName)
+                .map(name -> name.substring(prefix.length()));
+    }
+
+    /**
+     * List versioned file status for the directory.
+     *
+     * @return file status stream
+     */
+    public static Stream<FileStatus> listVersionedFileStatus(FileIO fileIO, 
Path dir, String prefix)
+            throws IOException {
         if (!fileIO.exists(dir)) {
-            return Stream.of();
+            return Stream.empty();
         }
 
         FileStatus[] statuses = fileIO.listStatus(dir);
@@ -86,10 +109,7 @@ public class FileUtils {
         }
 
         return Arrays.stream(statuses)
-                .map(FileStatus::getPath)
-                .map(Path::getName)
-                .filter(name -> name.startsWith(prefix))
-                .map(name -> Long.parseLong(name.substring(prefix.length())));
+                .filter(status -> 
status.getPath().getName().startsWith(prefix));
     }
 
     public static RecordReader<InternalRow> createFormatReader(
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 4a4f13dba..f6fc165d4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -20,17 +20,16 @@ package org.apache.paimon.utils;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
+import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Manager for {@code Tag}. */
@@ -97,8 +96,12 @@ public class TagManager {
         return Snapshot.fromPath(fileIO, tagPath(tagName));
     }
 
-    public int tagCount() {
-        return listStatus().length;
+    public long tagCount() {
+        try {
+            return listVersionedFileStatus(fileIO, tagDirectory(), 
TAG_PREFIX).count();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     /** Get all tagged snapshots sorted by snapshot id. */
@@ -108,39 +111,20 @@ public class TagManager {
 
     /** Get all tagged snapshots with names sorted by snapshot id. */
     public SortedMap<Snapshot, String> tags() {
-        FileStatus[] statuses = listStatus();
-        TreeMap<Snapshot, String> tags = new 
TreeMap<>(Comparator.comparingLong(Snapshot::id));
 
-        for (FileStatus status : statuses) {
-            Path path = status.getPath();
-            tags.put(
-                    Snapshot.fromPath(fileIO, path), 
path.getName().substring(TAG_PREFIX.length()));
-        }
-        return tags;
-    }
-
-    private FileStatus[] listStatus() {
-        Path tagDirectory = tagDirectory();
+        TreeMap<Snapshot, String> tags = new 
TreeMap<>(Comparator.comparingLong(Snapshot::id));
         try {
-            if (!fileIO.exists(tagDirectory)) {
-                return new FileStatus[0];
-            }
-
-            FileStatus[] statuses = fileIO.listStatus(tagDirectory);
-
-            if (statuses == null) {
-                throw new RuntimeException(
-                        String.format(
-                                "The return value is null of the listStatus 
for the '%s' directory.",
-                                tagDirectory));
-            }
-
-            return Arrays.stream(statuses)
-                    .filter(status -> 
status.getPath().getName().startsWith(TAG_PREFIX))
-                    .toArray(FileStatus[]::new);
+            listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
+                    .forEach(
+                            status -> {
+                                Path path = status.getPath();
+                                tags.put(
+                                        Snapshot.fromPath(fileIO, path),
+                                        
path.getName().substring(TAG_PREFIX.length()));
+                            });
         } catch (IOException e) {
-            throw new RuntimeException(
-                    String.format("Failed to list status in the '%s' 
directory.", tagDirectory), e);
+            throw new RuntimeException(e);
         }
+        return tags;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 2752765dc..95b36fd70 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.types.IntType;
+import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.types.Row;
@@ -484,4 +485,22 @@ public class CatalogTableITCase extends CatalogITCaseBase {
 
         assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L), 
Row.of("tag2", 2L, 0L, 2L));
     }
+
+    @Test
+    public void testConsumersTable() throws Exception {
+        batchSql("CREATE TABLE T (a INT, b INT)");
+        batchSql("INSERT INTO T VALUES (1, 2)");
+        batchSql("INSERT INTO T VALUES (3, 4)");
+
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(
+                        streamSqlIter("SELECT * FROM T /*+ 
OPTIONS('consumer-id'='my1') */"));
+
+        batchSql("INSERT INTO T VALUES (5, 6), (7, 8)");
+        assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1, 
2), Row.of(3, 4));
+        iterator.close();
+
+        List<Row> result = sql("SELECT * FROM T$consumers");
+        assertThat(result).containsExactly(Row.of("my1", 3L));
+    }
 }

Reply via email to