This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0acc99ab4 [core] Added system table consumers can be queried. (#1489)
0acc99ab4 is described below
commit 0acc99ab41818f2cfe33a2f4488f466c1e618259
Author: Kerwin <[email protected]>
AuthorDate: Mon Jul 3 19:59:31 2023 +0800
[core] Added system table consumers can be queried. (#1489)
---
docs/content/how-to/system-tables.md | 18 ++
.../apache/paimon/consumer/ConsumerManager.java | 87 +++++----
.../apache/paimon/table/system/ConsumersTable.java | 199 +++++++++++++++++++++
.../paimon/table/system/SystemTableLoader.java | 3 +
.../java/org/apache/paimon/utils/FileUtils.java | 30 +++-
.../java/org/apache/paimon/utils/TagManager.java | 52 ++----
.../apache/paimon/flink/CatalogTableITCase.java | 19 ++
7 files changed, 322 insertions(+), 86 deletions(-)
diff --git a/docs/content/how-to/system-tables.md
b/docs/content/how-to/system-tables.md
index fed2a26d3..51d6a2deb 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -176,3 +176,21 @@ SELECT * FROM MyTable$tags;
2 rows in set
*/
```
+
+## Consumers Table
+
+You can query all consumers which contains next snapshot.
+
+```sql
+SELECT * FROM MyTable$consumers;
+
+/*
++-------------+------------------+
+| consumer_id | next_snapshot_id |
++-------------+------------------+
+| id1 | 1 |
+| id2 | 3 |
++-------------+------------------+
+2 rows in set
+*/
+```
diff --git
a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
index d9565b0a9..247f4372e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -19,7 +19,6 @@
package org.apache.paimon.consumer;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.utils.DateTimeUtils;
@@ -30,9 +29,15 @@ import java.io.Serializable;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
-import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.FileUtils.listOriginalVersionedFiles;
+import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
/** Manage consumer groups. */
public class ConsumerManager implements Serializable {
@@ -65,24 +70,8 @@ public class ConsumerManager implements Serializable {
public OptionalLong minNextSnapshot() {
try {
- Path directory = consumerDirectory();
- if (!fileIO.exists(directory)) {
- return OptionalLong.empty();
- }
-
- FileStatus[] statuses = fileIO.listStatus(directory);
-
- if (statuses == null) {
- throw new RuntimeException(
- String.format(
- "The return value is null of the listStatus
for the '%s' directory.",
- directory));
- }
-
- return Arrays.stream(statuses)
- .map(FileStatus::getPath)
- .filter(path -> path.getName().startsWith(CONSUMER_PREFIX))
- .map(path -> Consumer.fromPath(fileIO, path))
+ return listOriginalVersionedFiles(fileIO, consumerDirectory(),
CONSUMER_PREFIX)
+ .map(this::consumer)
.filter(Optional::isPresent)
.map(Optional::get)
.mapToLong(Consumer::nextSnapshot)
@@ -94,34 +83,42 @@ public class ConsumerManager implements Serializable {
public void expire(LocalDateTime expireDateTime) {
try {
- Path directory = consumerDirectory();
- if (!fileIO.exists(directory)) {
- return;
- }
-
- FileStatus[] statuses = fileIO.listStatus(directory);
-
- if (statuses == null) {
- throw new RuntimeException(
- String.format(
- "The return value is null of the listStatus
for the '%s' directory.",
- directory));
- }
-
- for (FileStatus status : statuses) {
- if (isConsumerFile(status.getPath().getName())) {
- LocalDateTime modificationTime =
-
DateTimeUtils.toLocalDateTime(status.getModificationTime());
- if (expireDateTime.isAfter(modificationTime)) {
- fileIO.deleteQuietly(status.getPath());
- }
- }
- }
+ listVersionedFileStatus(fileIO, consumerDirectory(),
CONSUMER_PREFIX)
+ .forEach(
+ status -> {
+ LocalDateTime modificationTime =
+
DateTimeUtils.toLocalDateTime(status.getModificationTime());
+ if (expireDateTime.isAfter(modificationTime)) {
+ fileIO.deleteQuietly(status.getPath());
+ }
+ });
} catch (IOException e) {
throw new RuntimeException(e);
}
}
+ /** Get all consumer. */
+ public Map<String, Long> consumers() throws IOException {
+ Map<String, Long> consumers = new HashMap<>();
+ listOriginalVersionedFiles(fileIO, consumerDirectory(),
CONSUMER_PREFIX)
+ .forEach(
+ id -> {
+ Optional<Consumer> consumer = this.consumer(id);
+ consumer.ifPresent(value -> consumers.put(id,
value.nextSnapshot()));
+ });
+ return consumers;
+ }
+
+ /** List all consumer IDs. */
+ public List<String> listAllIds() {
+ try {
+ return listOriginalVersionedFiles(fileIO, consumerDirectory(),
CONSUMER_PREFIX)
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
private Path consumerDirectory() {
return new Path(tablePath + "/consumer");
}
@@ -129,8 +126,4 @@ public class ConsumerManager implements Serializable {
private Path consumerPath(String consumerId) {
return new Path(tablePath + "/consumer/" + CONSUMER_PREFIX +
consumerId);
}
-
- private boolean isConsumerFile(String file) {
- return file.startsWith(CONSUMER_PREFIX);
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
new file mode 100644
index 000000000..570dc4683
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+
+/** A {@link Table} for showing consumers of table. */
+public class ConsumersTable implements ReadonlyTable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String CONSUMERS = "consumers";
+
+ public static final RowType TABLE_TYPE =
+ new RowType(
+ Arrays.asList(
+ new DataField(
+ 0, "consumer_id",
SerializationUtils.newStringType(false)),
+ new DataField(1, "next_snapshot_id", new
BigIntType(false))));
+
+ private final FileIO fileIO;
+ private final Path location;
+
+ public ConsumersTable(FileIO fileIO, Path location) {
+ this.fileIO = fileIO;
+ this.location = location;
+ }
+
+ @Override
+ public String name() {
+ return location.getName() + SYSTEM_TABLE_SPLITTER + CONSUMERS;
+ }
+
+ @Override
+ public RowType rowType() {
+ return TABLE_TYPE;
+ }
+
+ @Override
+ public List<String> primaryKeys() {
+ return Collections.singletonList("consumer_id");
+ }
+
+ @Override
+ public InnerTableScan newScan() {
+ return new ConsumersTable.ConsumersScan();
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ return new ConsumersTable.ConsumersRead(fileIO);
+ }
+
+ @Override
+ public Table copy(Map<String, String> dynamicOptions) {
+ return new ConsumersTable(fileIO, location);
+ }
+
+ private class ConsumersScan extends ReadOnceTableScan {
+
+ @Override
+ public InnerTableScan withFilter(Predicate predicate) {
+ return this;
+ }
+
+ @Override
+ public Plan innerPlan() {
+ return () ->
+ Collections.singletonList(new
ConsumersTable.ConsumersSplit(fileIO, location));
+ }
+ }
+
+ /** {@link Split} implementation for {@link ConsumersTable}. */
+ private static class ConsumersSplit implements Split {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileIO fileIO;
+ private final Path location;
+
+ private ConsumersSplit(FileIO fileIO, Path location) {
+ this.fileIO = fileIO;
+ this.location = location;
+ }
+
+ @Override
+ public long rowCount() {
+ return new ConsumerManager(fileIO, location).listAllIds().size();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ConsumersTable.ConsumersSplit that =
(ConsumersTable.ConsumersSplit) o;
+ return Objects.equals(location, that.location);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(location);
+ }
+ }
+
+ /** {@link TableRead} implementation for {@link ConsumersTable}. */
+ private static class ConsumersRead implements InnerTableRead {
+
+ private final FileIO fileIO;
+ private int[][] projection;
+
+ public ConsumersRead(FileIO fileIO) {
+ this.fileIO = fileIO;
+ }
+
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+ return this;
+ }
+
+ @Override
+ public InnerTableRead withProjection(int[][] projection) {
+ this.projection = projection;
+ return this;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ if (!(split instanceof ConsumersTable.ConsumersSplit)) {
+ throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
+ }
+ Path location = ((ConsumersTable.ConsumersSplit) split).location;
+ Map<String, Long> consumers = new ConsumerManager(fileIO,
location).consumers();
+ Iterator<InternalRow> rows =
+ Iterators.transform(consumers.entrySet().iterator(),
this::toRow);
+ if (projection != null) {
+ rows =
+ Iterators.transform(
+ rows, row ->
ProjectedRow.from(projection).replaceRow(row));
+ }
+ return new IteratorRecordReader<>(rows);
+ }
+
+ private InternalRow toRow(Map.Entry<String, Long> consumer) {
+ return GenericRow.of(BinaryString.fromString(consumer.getKey()),
consumer.getValue());
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 01fa2432d..55d684208 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -26,6 +26,7 @@ import org.apache.paimon.table.Table;
import javax.annotation.Nullable;
import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
+import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
import static org.apache.paimon.table.system.FilesTable.FILES;
import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
@@ -51,6 +52,8 @@ public class SystemTableLoader {
return new FilesTable(dataTable);
case TAGS:
return new TagsTable(fileIO, location);
+ case CONSUMERS:
+ return new ConsumersTable(fileIO, location);
default:
return null;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
index 3c12aa85f..e41ba0241 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
@@ -72,8 +72,31 @@ public class FileUtils {
*/
public static Stream<Long> listVersionedFiles(FileIO fileIO, Path dir,
String prefix)
throws IOException {
+ return listOriginalVersionedFiles(fileIO, dir,
prefix).map(Long::parseLong);
+ }
+
+ /**
+ * List original versioned files for the directory.
+ *
+ * @return version stream
+ */
+ public static Stream<String> listOriginalVersionedFiles(FileIO fileIO,
Path dir, String prefix)
+ throws IOException {
+ return listVersionedFileStatus(fileIO, dir, prefix)
+ .map(FileStatus::getPath)
+ .map(Path::getName)
+ .map(name -> name.substring(prefix.length()));
+ }
+
+ /**
+ * List versioned file status for the directory.
+ *
+ * @return file status stream
+ */
+ public static Stream<FileStatus> listVersionedFileStatus(FileIO fileIO,
Path dir, String prefix)
+ throws IOException {
if (!fileIO.exists(dir)) {
- return Stream.of();
+ return Stream.empty();
}
FileStatus[] statuses = fileIO.listStatus(dir);
@@ -86,10 +109,7 @@ public class FileUtils {
}
return Arrays.stream(statuses)
- .map(FileStatus::getPath)
- .map(Path::getName)
- .filter(name -> name.startsWith(prefix))
- .map(name -> Long.parseLong(name.substring(prefix.length())));
+ .filter(status ->
status.getPath().getName().startsWith(prefix));
}
public static RecordReader<InternalRow> createFormatReader(
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 4a4f13dba..f6fc165d4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -20,17 +20,16 @@ package org.apache.paimon.utils;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
+import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Manager for {@code Tag}. */
@@ -97,8 +96,12 @@ public class TagManager {
return Snapshot.fromPath(fileIO, tagPath(tagName));
}
- public int tagCount() {
- return listStatus().length;
+ public long tagCount() {
+ try {
+ return listVersionedFileStatus(fileIO, tagDirectory(),
TAG_PREFIX).count();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
/** Get all tagged snapshots sorted by snapshot id. */
@@ -108,39 +111,20 @@ public class TagManager {
/** Get all tagged snapshots with names sorted by snapshot id. */
public SortedMap<Snapshot, String> tags() {
- FileStatus[] statuses = listStatus();
- TreeMap<Snapshot, String> tags = new
TreeMap<>(Comparator.comparingLong(Snapshot::id));
- for (FileStatus status : statuses) {
- Path path = status.getPath();
- tags.put(
- Snapshot.fromPath(fileIO, path),
path.getName().substring(TAG_PREFIX.length()));
- }
- return tags;
- }
-
- private FileStatus[] listStatus() {
- Path tagDirectory = tagDirectory();
+ TreeMap<Snapshot, String> tags = new
TreeMap<>(Comparator.comparingLong(Snapshot::id));
try {
- if (!fileIO.exists(tagDirectory)) {
- return new FileStatus[0];
- }
-
- FileStatus[] statuses = fileIO.listStatus(tagDirectory);
-
- if (statuses == null) {
- throw new RuntimeException(
- String.format(
- "The return value is null of the listStatus
for the '%s' directory.",
- tagDirectory));
- }
-
- return Arrays.stream(statuses)
- .filter(status ->
status.getPath().getName().startsWith(TAG_PREFIX))
- .toArray(FileStatus[]::new);
+ listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
+ .forEach(
+ status -> {
+ Path path = status.getPath();
+ tags.put(
+ Snapshot.fromPath(fileIO, path),
+
path.getName().substring(TAG_PREFIX.length()));
+ });
} catch (IOException e) {
- throw new RuntimeException(
- String.format("Failed to list status in the '%s'
directory.", tagDirectory), e);
+ throw new RuntimeException(e);
}
+ return tags;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 2752765dc..95b36fd70 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.IntType;
+import org.apache.paimon.utils.BlockingIterator;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.types.Row;
@@ -484,4 +485,22 @@ public class CatalogTableITCase extends CatalogITCaseBase {
assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L),
Row.of("tag2", 2L, 0L, 2L));
}
+
+ @Test
+ public void testConsumersTable() throws Exception {
+ batchSql("CREATE TABLE T (a INT, b INT)");
+ batchSql("INSERT INTO T VALUES (1, 2)");
+ batchSql("INSERT INTO T VALUES (3, 4)");
+
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(
+ streamSqlIter("SELECT * FROM T /*+
OPTIONS('consumer-id'='my1') */"));
+
+ batchSql("INSERT INTO T VALUES (5, 6), (7, 8)");
+ assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1,
2), Row.of(3, 4));
+ iterator.close();
+
+ List<Row> result = sql("SELECT * FROM T$consumers");
+ assertThat(result).containsExactly(Row.of("my1", 3L));
+ }
}