This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a8a0e28b65 [core] Add option to support reading sequence_number in
AuditLogTable and BinlogTable (#6933)
a8a0e28b65 is described below
commit a8a0e28b650f98268499c9826e9775184abfbf36
Author: bosiew.tian <[email protected]>
AuthorDate: Thu Jan 22 13:55:13 2026 +0800
[core] Add option to support reading sequence_number in AuditLogTable and
BinlogTable (#6933)
---
docs/content/concepts/system-tables.md | 61 +++++++
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 22 +++
.../org/apache/paimon/schema/SchemaValidation.java | 13 ++
.../paimon/table/source/KeyValueTableRead.java | 16 +-
.../source/ValueContentRowDataRecordIterator.java | 20 +++
.../IncrementalChangelogReadProvider.java | 2 +-
.../source/splitread/IncrementalDiffSplitRead.java | 2 +-
.../splitread/MergeFileSplitReadProvider.java | 3 +-
.../apache/paimon/table/system/AuditLogTable.java | 142 +++++++++------
.../apache/paimon/table/system/BinlogTable.java | 36 ++--
.../paimon/table/system/AuditLogTableTest.java | 101 ++++++++---
.../paimon/table/system/BinlogTableTest.java | 191 +++++++++++++++++++++
.../paimon/flink/lookup/LookupCompactDiffRead.java | 9 +-
.../apache/paimon/flink/BatchFileStoreITCase.java | 122 +++++++++++++
15 files changed, 653 insertions(+), 93 deletions(-)
diff --git a/docs/content/concepts/system-tables.md
b/docs/content/concepts/system-tables.md
index fd412f3fa5..0b8feed61c 100644
--- a/docs/content/concepts/system-tables.md
+++ b/docs/content/concepts/system-tables.md
@@ -136,6 +136,49 @@ SELECT * FROM my_table$audit_log;
*/
```
+For primary key tables, you can enable the
`table-read.sequence-number.enabled` option to include the `_SEQUENCE_NUMBER`
field in the output.
+
+{{< tabs "audit-log-sequence-number" >}}
+
+{{< tab "Enable via ALTER TABLE" >}}
+```sql
+ALTER TABLE my_table SET ('table-read.sequence-number.enabled' = 'true');
+```
+{{< /tab >}}
+
+{{< tab "Enable via CREATE TABLE" >}}
+```sql
+CREATE TABLE my_table (
+ ...
+) WITH (
+ 'table-read.sequence-number.enabled' = 'true'
+);
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+```sql
+SELECT * FROM my_table$audit_log;
+
+/*
++------------------+--------------------+-----------------+-----------------+
+| rowkind | _SEQUENCE_NUMBER | column_0 | column_1 |
++------------------+--------------------+-----------------+-----------------+
+| +I | 0 | ... | ... |
++------------------+--------------------+-----------------+-----------------+
+| -U | 0 | ... | ... |
++------------------+--------------------+-----------------+-----------------+
+| +U | 1 | ... | ... |
++------------------+--------------------+-----------------+-----------------+
+3 rows in set
+*/
+```
+
+{{< hint info >}}
+The `table-read.sequence-number.enabled` option cannot be set via SQL hints.
+{{< /hint >}}
+
### Binlog Table
You can query the binlog through binlog table. In the binlog system table, the
update before and update after will be packed in one row.
@@ -158,6 +201,24 @@ SELECT * FROM T$binlog;
*/
```
+Similar to the audit_log table, you can also enable
`table-read.sequence-number.enabled` to include `_SEQUENCE_NUMBER` in the
binlog table output:
+
+```sql
+SELECT * FROM T$binlog;
+
+/*
++------------------+--------------------+----------------------+-----------------------+
+| rowkind | _SEQUENCE_NUMBER | column_0 |
column_1 |
++------------------+--------------------+----------------------+-----------------------+
+| +I | 0 | [col_0] | [col_1]
|
++------------------+--------------------+----------------------+-----------------------+
+| +U | 1 | [col_0_ub, col_0_ua] | [col_1_ub,
col_1_ua] |
++------------------+--------------------+----------------------+-----------------------+
+| -D | 2 | [col_0] | [col_1]
|
++------------------+--------------------+----------------------+-----------------------+
+*/
+```
+
### Read-optimized Table
If you require extreme reading performance and can accept reading slightly old
data,
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 12d45a6a00..724e04cd5c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -152,6 +152,12 @@ under the License.
<td>String</td>
<td>Fields that are ignored for comparison while generating -U, +U
changelog for the same record. This configuration is only valid for the
changelog-producer.row-deduplicate is true.</td>
</tr>
+ <tr>
+ <td><h5>table-read.sequence-number.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to include the _SEQUENCE_NUMBER field when reading the
audit_log or binlog system tables. This is only valid for primary key
tables.</td>
+ </tr>
<tr>
<td><h5>changelog.num-retained.max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 5033eb2fa5..1b8170a9e2 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -879,6 +879,24 @@ public class CoreOptions implements Serializable {
.withDescription(
"Fields that are ignored for comparison while
generating -U, +U changelog for the same record. This configuration is only
valid for the changelog-producer.row-deduplicate is true.");
+ public static final ConfigOption<Boolean>
TABLE_READ_SEQUENCE_NUMBER_ENABLED =
+ key("table-read.sequence-number.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to include the _SEQUENCE_NUMBER field
when reading the audit_log or binlog "
+ + "system tables. This is only valid for
primary key tables.");
+
+ @ExcludeFromDocumentation("Internal use only")
+ public static final ConfigOption<Boolean>
KEY_VALUE_SEQUENCE_NUMBER_ENABLED =
+ key("key-value.sequence_number.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to include the _SEQUENCE_NUMBER field
when reading key-value data. "
+ + "This is an internal option used by
AuditLogTable and BinlogTable "
+ + "when table.read_sequence_number_enabled
is set to true.");
+
public static final ConfigOption<String> SEQUENCE_FIELD =
key("sequence.field")
.stringType()
@@ -2810,6 +2828,10 @@ public class CoreOptions implements Serializable {
.orElse(Collections.emptyList());
}
+ public boolean tableReadSequenceNumberEnabled() {
+ return options.get(TABLE_READ_SEQUENCE_NUMBER_ENABLED);
+ }
+
public boolean scanPlanSortPartition() {
return options.get(SCAN_PLAN_SORT_PARTITION);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index cceae0567e..062f71f057 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -239,6 +239,8 @@ public class SchemaValidation {
validateIncrementalClustering(schema, options);
validateChainTable(schema, options);
+
+ validateChangelogReadSequenceNumber(schema, options);
}
public static void validateFallbackBranch(SchemaManager schemaManager,
TableSchema schema) {
@@ -672,4 +674,15 @@ public class SchemaValidation {
"Partition timestamp formatter is required for chain
table.");
}
}
+
+ private static void validateChangelogReadSequenceNumber(
+ TableSchema schema, CoreOptions options) {
+ if (options.tableReadSequenceNumberEnabled()) {
+ checkArgument(
+ !schema.primaryKeys().isEmpty(),
+ "Cannot enable '%s' for non-primary-key table. "
+ + "Sequence number is only available for primary
key tables.",
+ CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key());
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index 3b257e6f3a..fda7d70ffd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
@@ -42,6 +43,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.function.Supplier;
/**
@@ -150,14 +152,24 @@ public final class KeyValueTableRead extends
AbstractDataTableRead {
throw new RuntimeException("Should not happen.");
}
- public static RecordReader<InternalRow> unwrap(RecordReader<KeyValue>
reader) {
+ public static RecordReader<InternalRow> unwrap(
+ RecordReader<KeyValue> reader, Map<String, String> schemaOptions) {
return new RecordReader<InternalRow>() {
@Nullable
@Override
public RecordIterator<InternalRow> readBatch() throws IOException {
+ boolean keyValueSequenceNumberEnabled =
+ Boolean.parseBoolean(
+ schemaOptions.getOrDefault(
+
CoreOptions.KEY_VALUE_SEQUENCE_NUMBER_ENABLED.key(),
+ "false"));
+
RecordIterator<KeyValue> batch = reader.readBatch();
- return batch == null ? null : new
ValueContentRowDataRecordIterator(batch);
+ return batch == null
+ ? null
+ : new ValueContentRowDataRecordIterator(
+ batch, keyValueSequenceNumberEnabled);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
index adcf504a5d..7c892b23c1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
@@ -19,7 +19,9 @@
package org.apache.paimon.table.source;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.reader.RecordReader;
import java.io.IOException;
@@ -27,8 +29,17 @@ import java.io.IOException;
/** A {@link RecordReader.RecordIterator} mapping a {@link KeyValue} to its
value. */
public class ValueContentRowDataRecordIterator extends
ResetRowKindRecordIterator {
+ private final boolean keyValueSequenceNumberEnabled;
+
public
ValueContentRowDataRecordIterator(RecordReader.RecordIterator<KeyValue>
kvIterator) {
+ this(kvIterator, false);
+ }
+
+ public ValueContentRowDataRecordIterator(
+ RecordReader.RecordIterator<KeyValue> kvIterator,
+ boolean keyValueSequenceNumberEnabled) {
super(kvIterator);
+ this.keyValueSequenceNumberEnabled = keyValueSequenceNumberEnabled;
}
@Override
@@ -40,6 +51,15 @@ public class ValueContentRowDataRecordIterator extends
ResetRowKindRecordIterato
InternalRow rowData = kv.value();
rowData.setRowKind(kv.valueKind());
+
+ if (keyValueSequenceNumberEnabled) {
+ JoinedRow joinedRow = new JoinedRow();
+ GenericRow systemFieldsRow = new GenericRow(1);
+ systemFieldsRow.setField(0, kv.sequenceNumber());
+ joinedRow.replace(systemFieldsRow, rowData);
+ joinedRow.setRowKind(kv.valueKind());
+ return joinedRow;
+ }
return rowData;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
index d7d9866a9c..56646dab84 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
@@ -73,7 +73,7 @@ public class IncrementalChangelogReadProvider implements
SplitReadProvider {
incrementalSplit.afterFiles(),
incrementalSplit.afterDeletionFiles(),
false));
- return unwrap(reader);
+ return unwrap(reader, read.tableSchema().options());
};
return SplitRead.convert(read, convertedFactory);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index d00a8a4fd9..1e93f148b7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -110,7 +110,7 @@ public class IncrementalDiffSplitRead implements
SplitRead<InternalRow> {
ProjectedRow.from(readType,
mergeRead.tableSchema().logicalRowType());
reader = reader.transform(kv ->
kv.replaceValue(projectedRow.replaceRow(kv.value())));
}
- return KeyValueTableRead.unwrap(reader);
+ return KeyValueTableRead.unwrap(reader,
mergeRead.tableSchema().options());
}
private static RecordReader<KeyValue> readDiff(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
index fb3b850812..2e6c127c7f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
@@ -49,7 +49,8 @@ public class MergeFileSplitReadProvider implements
SplitReadProvider {
private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead>
supplier) {
final MergeFileSplitRead read =
supplier.get().withReadKeyType(RowType.of());
- return SplitRead.convert(read, split ->
unwrap(read.createReader(split)));
+ return SplitRead.convert(
+ read, split -> unwrap(read.createReader(split),
read.tableSchema().options()));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 001e1543ee..d613d9455f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -82,6 +82,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
/** A {@link Table} for reading audit log of table. */
@@ -89,29 +90,43 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
public static final String AUDIT_LOG = "audit_log";
- public static final PredicateReplaceVisitor PREDICATE_CONVERTER =
- p -> {
- Optional<FieldRef> fieldRefOptional = p.fieldRefOptional();
- if (!fieldRefOptional.isPresent()) {
- return Optional.empty();
- }
- FieldRef fieldRef = fieldRefOptional.get();
- if (fieldRef.index() == 0) {
- return Optional.empty();
- }
- return Optional.of(
- new LeafPredicate(
- p.function(),
- fieldRef.type(),
- fieldRef.index() - 1,
- fieldRef.name(),
- p.literals()));
- };
+ protected final FileStoreTable wrapped;
- private final FileStoreTable wrapped;
+ protected final List<DataField> specialFields;
public AuditLogTable(FileStoreTable wrapped) {
this.wrapped = wrapped;
+ this.specialFields = new ArrayList<>();
+ specialFields.add(SpecialFields.ROW_KIND);
+
+ boolean includeSequenceNumber =
+
CoreOptions.fromMap(wrapped.options()).tableReadSequenceNumberEnabled();
+
+ if (includeSequenceNumber) {
+
this.wrapped.options().put(CoreOptions.KEY_VALUE_SEQUENCE_NUMBER_ENABLED.key(),
"true");
+ specialFields.add(SpecialFields.SEQUENCE_NUMBER);
+ }
+ }
+
+ /** Creates a PredicateReplaceVisitor that adjusts field indices by
systemFieldCount. */
+ private PredicateReplaceVisitor createPredicateConverter() {
+ return p -> {
+ Optional<FieldRef> fieldRefOptional = p.fieldRefOptional();
+ if (!fieldRefOptional.isPresent()) {
+ return Optional.empty();
+ }
+ FieldRef fieldRef = fieldRefOptional.get();
+ if (fieldRef.index() < specialFields.size()) {
+ return Optional.empty();
+ }
+ return Optional.of(
+ new LeafPredicate(
+ p.function(),
+ fieldRef.type(),
+ fieldRef.index() - specialFields.size(),
+ fieldRef.name(),
+ p.literals()));
+ };
}
@Override
@@ -146,8 +161,7 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
@Override
public RowType rowType() {
- List<DataField> fields = new ArrayList<>();
- fields.add(SpecialFields.ROW_KIND);
+ List<DataField> fields = new ArrayList<>(specialFields);
fields.addAll(wrapped.rowType().getFields());
return new RowType(fields);
}
@@ -234,6 +248,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
@Override
public Table copy(Map<String, String> dynamicOptions) {
+ if (Boolean.parseBoolean(
+
dynamicOptions.getOrDefault(TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(),
"false"))) {
+ throw new UnsupportedOperationException(
+ "table-read.sequence-number.enabled is not supported by
hint.");
+ }
return new AuditLogTable(wrapped.copy(dynamicOptions));
}
@@ -244,9 +263,10 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
/** Push down predicate to dataScan and dataRead. */
private Optional<Predicate> convert(Predicate predicate) {
+ PredicateReplaceVisitor converter = createPredicateConverter();
List<Predicate> result =
PredicateBuilder.splitAnd(predicate).stream()
- .map(p -> p.visit(PREDICATE_CONVERTER))
+ .map(p -> p.visit(converter))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
@@ -636,6 +656,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
class AuditLogRead implements InnerTableRead {
+ // Special index for rowkind field
+ protected static final int ROW_KIND_INDEX = -1;
+ // _SEQUENCE_NUMBER is at index 0 by setting:
KEY_VALUE_SEQUENCE_NUMBER_ENABLED
+ protected static final int SEQUENCE_NUMBER_INDEX = 0;
+
protected final InnerTableRead dataRead;
protected int[] readProjection;
@@ -645,52 +670,61 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
this.readProjection = defaultProjection();
}
- /** Default projection, just add row kind to the first. */
+ /** Default projection, add system fields (rowkind, and optionally
_SEQUENCE_NUMBER). */
private int[] defaultProjection() {
int dataFieldCount = wrapped.rowType().getFieldCount();
- int[] projection = new int[dataFieldCount + 1];
- projection[0] = -1;
+ int[] projection = new int[dataFieldCount + specialFields.size()];
+ projection[0] = ROW_KIND_INDEX;
+ if (specialFields.contains(SpecialFields.SEQUENCE_NUMBER)) {
+ projection[1] = SEQUENCE_NUMBER_INDEX;
+ }
for (int i = 0; i < dataFieldCount; i++) {
- projection[i + 1] = i;
+ projection[specialFields.size() + i] = i +
specialFields.size() - 1;
}
return projection;
}
- @Override
- public InnerTableRead withFilter(Predicate predicate) {
- convert(predicate).ifPresent(dataRead::withFilter);
- return this;
- }
-
- @Override
- public InnerTableRead withReadType(RowType readType) {
- // data projection to push down to dataRead
- List<DataField> dataReadFields = new ArrayList<>();
-
- // read projection to handle record returned by dataRead
+ /** Build projection array from readType. */
+ private int[] buildProjection(RowType readType) {
List<DataField> fields = readType.getFields();
- int[] readProjection = new int[fields.size()];
+ int[] projection = new int[fields.size()];
+ int dataFieldIndex = 0;
- boolean rowKindAppeared = false;
for (int i = 0; i < fields.size(); i++) {
String fieldName = fields.get(i).name();
if (fieldName.equals(SpecialFields.ROW_KIND.name())) {
- rowKindAppeared = true;
- readProjection[i] = -1;
+ projection[i] = ROW_KIND_INDEX;
+ } else if
(fieldName.equals(SpecialFields.SEQUENCE_NUMBER.name())) {
+ projection[i] = SEQUENCE_NUMBER_INDEX;
} else {
- dataReadFields.add(fields.get(i));
- // There is no row kind field. Keep it as it is
- // Row kind field has occurred, and the following fields
are offset by 1
- // position
- readProjection[i] = rowKindAppeared ? i - 1 : i;
+ projection[i] = dataFieldIndex + specialFields.size() - 1;
+ dataFieldIndex++;
}
}
+ return projection;
+ }
- this.readProjection = readProjection;
- dataRead.withReadType(new RowType(readType.isNullable(),
dataReadFields));
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+ convert(predicate).ifPresent(dataRead::withFilter);
return this;
}
+ @Override
+ public InnerTableRead withReadType(RowType readType) {
+ this.readProjection = buildProjection(readType);
+ List<DataField> dataFields = extractDataFields(readType);
+ dataRead.withReadType(new RowType(readType.isNullable(),
dataFields));
+ return this;
+ }
+
+ /** Extract data fields (non-system fields) from readType. */
+ private List<DataField> extractDataFields(RowType readType) {
+ return readType.getFields().stream()
+ .filter(f -> !SpecialFields.isSystemField(f.name()))
+ .collect(Collectors.toList());
+ }
+
@Override
public TableRead withIOManager(IOManager ioManager) {
this.dataRead.withIOManager(ioManager);
@@ -707,7 +741,10 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
}
- /** A {@link ProjectedRow} which returns row kind when mapping index is
negative. */
+ /**
+ * A {@link ProjectedRow} which returns row kind and sequence number when
mapping index is
+ * negative.
+ */
static class AuditLogRow extends ProjectedRow {
AuditLogRow(int[] indexMapping, InternalRow row) {
@@ -729,7 +766,7 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
@Override
public boolean isNullAt(int pos) {
if (indexMapping[pos] < 0) {
- // row kind is always not null
+ // row kind and sequence num are always not null
return false;
}
return super.isNullAt(pos);
@@ -737,7 +774,8 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
@Override
public BinaryString getString(int pos) {
- if (indexMapping[pos] < 0) {
+ int index = indexMapping[pos];
+ if (index == AuditLogRead.ROW_KIND_INDEX) {
return BinaryString.fromString(row.getRowKind().shortString());
}
return super.getString(pos);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
index cc8d1621a3..5c1268f963 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
@@ -41,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
+import static org.apache.paimon.CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
/**
@@ -56,11 +57,8 @@ public class BinlogTable extends AuditLogTable {
public static final String BINLOG = "binlog";
- private final FileStoreTable wrapped;
-
public BinlogTable(FileStoreTable wrapped) {
super(wrapped);
- this.wrapped = wrapped;
}
@Override
@@ -70,8 +68,7 @@ public class BinlogTable extends AuditLogTable {
@Override
public RowType rowType() {
- List<DataField> fields = new ArrayList<>();
- fields.add(SpecialFields.ROW_KIND);
+ List<DataField> fields = new ArrayList<>(specialFields);
for (DataField field : wrapped.rowType().getFields()) {
// convert to nullable
fields.add(field.newType(new ArrayType(field.type().nullable())));
@@ -86,6 +83,11 @@ public class BinlogTable extends AuditLogTable {
@Override
public Table copy(Map<String, String> dynamicOptions) {
+ if (Boolean.parseBoolean(
+
dynamicOptions.getOrDefault(TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(),
"false"))) {
+ throw new UnsupportedOperationException(
+ "table-read.sequence-number.enabled is not supported by
hint.");
+ }
return new BinlogTable(wrapped.copy(dynamicOptions));
}
@@ -102,7 +104,7 @@ public class BinlogTable extends AuditLogTable {
List<DataField> fields = new ArrayList<>();
List<DataField> wrappedReadFields = new ArrayList<>();
for (DataField field : readType.getFields()) {
- if (field.name().equals(SpecialFields.ROW_KIND.name())) {
+ if (SpecialFields.isSystemField(field.name())) {
fields.add(field);
} else {
DataField origin = field.newType(((ArrayType)
field.type()).getElementType());
@@ -117,12 +119,16 @@ public class BinlogTable extends AuditLogTable {
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
DataSplit dataSplit = (DataSplit) split;
+ // When sequence number is enabled, the underlying data layout is:
+ // [_SEQUENCE_NUMBER, pk, pt, col1, ...]
+ // We need to offset the field index to skip the sequence number
field.
+ int offset = specialFields.size() - 1;
InternalRow.FieldGetter[] fieldGetters =
IntStream.range(0, wrappedReadType.getFieldCount())
.mapToObj(
i ->
InternalRow.createFieldGetter(
-
wrappedReadType.getTypeAt(i), i))
+
wrappedReadType.getTypeAt(i), i + offset))
.toArray(InternalRow.FieldGetter[]::new);
if (dataSplit.isStreaming()) {
@@ -146,15 +152,23 @@ public class BinlogTable extends AuditLogTable {
InternalRow row1,
@Nullable InternalRow row2,
InternalRow.FieldGetter[] fieldGetters) {
- GenericRow row = new GenericRow(row1.getFieldCount());
- for (int i = 0; i < row1.getFieldCount(); i++) {
+ // seqOffset is 1 if sequence number is enabled, 0 otherwise
+ int seqOffset = specialFields.size() - 1;
+ GenericRow row = new GenericRow(fieldGetters.length + seqOffset);
+
+ // Copy sequence number if enabled (it's at index 0 in input row)
+ if (seqOffset > 0) {
+ row.setField(0, row1.getLong(0));
+ }
+
+ for (int i = 0; i < fieldGetters.length; i++) {
Object o1 = fieldGetters[i].getFieldOrNull(row1);
Object o2;
if (row2 != null) {
o2 = fieldGetters[i].getFieldOrNull(row2);
- row.setField(i, new GenericArray(new Object[] {o1, o2}));
+ row.setField(i + seqOffset, new GenericArray(new Object[]
{o1, o2}));
} else {
- row.setField(i, new GenericArray(new Object[] {o1}));
+ row.setField(i + seqOffset, new GenericArray(new Object[]
{o1}));
}
}
// If no row2 provided, then follow the row1 kind.
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
index 41facd1d83..cbba947bcb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
@@ -36,7 +37,6 @@ import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -48,15 +48,62 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Unit tests for {@link AuditLogTable}. */
public class AuditLogTableTest extends TableTestBase {
- private static final String tableName = "MyTable";
- private AuditLogTable auditLogTable;
+ @Test
+ public void testReadAuditLogFromLatest() throws Exception {
+ AuditLogTable auditLogTable = createAuditLogTable("audit_table",
false);
+ assertThat(auditLogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "pk", "pt", "col1");
+ List<InternalRow> expectRow = getExpectedResult();
+ List<InternalRow> result = read(auditLogTable);
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ @Test
+ public void testReadSequenceNumberWithTableOption() throws Exception {
+ AuditLogTable auditLogTable =
createAuditLogTable("audit_table_with_seq", true);
+ assertThat(auditLogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt",
"col1");
+
+ List<InternalRow> result = read(auditLogTable);
+ List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ @Test
+ public void testReadSequenceNumberWithAlterTable() throws Exception {
+ String tableName = "audit_table_alter_seq";
+ // Create table without sequence-number option
+ AuditLogTable auditLogTable = createAuditLogTable(tableName, false);
+ assertThat(auditLogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "pk", "pt", "col1");
+
+ // Add sequence-number option via alterTable
+ catalog.alterTable(
+ identifier(tableName),
+ SchemaChange.setOption(
+ CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(),
"true"),
+ false);
- @BeforeEach
- public void before() throws Exception {
+ // Re-fetch the audit_log table to get updated schema
+ Identifier auditLogTableId =
+ identifier(tableName + SYSTEM_TABLE_SPLITTER +
AuditLogTable.AUDIT_LOG);
+ AuditLogTable updatedAuditLogTable = (AuditLogTable)
catalog.getTable(auditLogTableId);
+
+ // Verify schema now includes _SEQUENCE_NUMBER
+ assertThat(updatedAuditLogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt",
"col1");
+
+ List<InternalRow> result = read(updatedAuditLogTable);
+ List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ private AuditLogTable createAuditLogTable(String tableName, boolean
enableSequenceNumber)
+ throws Exception {
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, tableName));
FileIO fileIO = LocalFileIO.create();
- Schema schema =
+ Schema.Builder schemaBuilder =
Schema.newBuilder()
.column("pk", DataTypes.INT())
.column("pt", DataTypes.INT())
@@ -64,44 +111,54 @@ public class AuditLogTableTest extends TableTestBase {
.partitionKeys("pt")
.primaryKey("pk", "pt")
.option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
- .option("bucket", "1")
- .build();
+ .option("bucket", "1");
+ if (enableSequenceNumber) {
+
schemaBuilder.option(CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(),
"true");
+ }
TableSchema tableSchema =
- SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath),
schema);
+ SchemaUtils.forceCommit(
+ new SchemaManager(fileIO, tablePath),
schemaBuilder.build());
FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tablePath,
tableSchema);
- Identifier filesTableId =
+
+ writeTestData(table);
+
+ Identifier auditLogTableId =
identifier(tableName + SYSTEM_TABLE_SPLITTER +
AuditLogTable.AUDIT_LOG);
- auditLogTable = (AuditLogTable) catalog.getTable(filesTableId);
+ return (AuditLogTable) catalog.getTable(auditLogTableId);
+ }
+ private void writeTestData(FileStoreTable table) throws Exception {
write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1));
write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5));
write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
- write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 4, 6));
+ write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6));
write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1));
}
- @Test
- public void testReadAuditLogFromLatest() throws Exception {
- List<InternalRow> expectRow = getExpectedResult();
- List<InternalRow> result = read(auditLogTable);
- assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
- }
-
private List<InternalRow> getExpectedResult() {
List<InternalRow> expectedRow = new ArrayList<>();
expectedRow.add(
GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1, 1, 1));
expectedRow.add(
GenericRow.of(
-
BinaryString.fromString(RowKind.UPDATE_BEFORE.shortString()), 1, 2, 5));
+
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 2, 6));
+ expectedRow.add(
+
GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 2, 3, 1));
+ return expectedRow;
+ }
+
+ private List<InternalRow> getExpectedResultWithSequenceNumber() {
+ List<InternalRow> expectedRow = new ArrayList<>();
+ expectedRow.add(
+
GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1L, 1, 1,
1));
expectedRow.add(
GenericRow.of(
-
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 4, 6));
+
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 2L, 1, 2, 6));
expectedRow.add(
-
GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 2, 3, 1));
+
GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 0L, 2, 3,
1));
return expectedRow;
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java
new file mode 100644
index 0000000000..6f187fbffd
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link BinlogTable}. */
+public class BinlogTableTest extends TableTestBase {
+
+ @Test
+ public void testReadBinlogFromLatest() throws Exception {
+ BinlogTable binlogTable = createBinlogTable("binlog_table", false);
+ assertThat(binlogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "pk", "pt", "col1");
+
+ List<InternalRow> result = read(binlogTable);
+ List<InternalRow> expectRow = getExpectedResult();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ @Test
+ public void testReadSequenceNumberWithTableOption() throws Exception {
+ BinlogTable binlogTable = createBinlogTable("binlog_table_with_seq",
true);
+ assertThat(binlogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt",
"col1");
+
+ List<InternalRow> result = read(binlogTable);
+ List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ @Test
+ public void testReadSequenceNumberWithAlterTable() throws Exception {
+ String tableName = "binlog_table_alter_seq";
+ // Create table without sequence-number option
+ BinlogTable binlogTable = createBinlogTable(tableName, false);
+ assertThat(binlogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "pk", "pt", "col1");
+
+ // Add sequence-number option via alterTable
+ catalog.alterTable(
+ identifier(tableName),
+ SchemaChange.setOption(
+ CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(),
"true"),
+ false);
+
+ // Re-fetch the binlog table to get updated schema
+ Identifier binlogTableId =
+ identifier(tableName + SYSTEM_TABLE_SPLITTER +
BinlogTable.BINLOG);
+ BinlogTable updatedBinlogTable = (BinlogTable)
catalog.getTable(binlogTableId);
+
+ // Verify schema now includes _SEQUENCE_NUMBER
+ assertThat(updatedBinlogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt",
"col1");
+
+ List<InternalRow> result = read(updatedBinlogTable);
+ List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ private BinlogTable createBinlogTable(String tableName, boolean
enableSequenceNumber)
+ throws Exception {
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, tableName));
+ FileIO fileIO = LocalFileIO.create();
+
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("pk", DataTypes.INT())
+ .column("pt", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+ .option("bucket", "1");
+ if (enableSequenceNumber) {
+
schemaBuilder.option(CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(),
"true");
+ }
+
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(fileIO, tablePath),
schemaBuilder.build());
+ FileStoreTable table =
+ FileStoreTableFactory.create(LocalFileIO.create(), tablePath,
tableSchema);
+
+ writeTestData(table);
+
+ Identifier binlogTableId =
+ identifier(tableName + SYSTEM_TABLE_SPLITTER +
BinlogTable.BINLOG);
+ return (BinlogTable) catalog.getTable(binlogTableId);
+ }
+
+ private void writeTestData(FileStoreTable table) throws Exception {
+ write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1));
+ write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
+ write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5));
+ write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
+ write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6));
+ write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1));
+ }
+
+ private List<InternalRow> getExpectedResult() {
+ List<InternalRow> expectedRow = new ArrayList<>();
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.DELETE.shortString()),
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {1})));
+ expectedRow.add(
+ GenericRow.of(
+
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()),
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {2}),
+ new GenericArray(new Object[] {6})));
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.INSERT.shortString()),
+ new GenericArray(new Object[] {2}),
+ new GenericArray(new Object[] {3}),
+ new GenericArray(new Object[] {1})));
+ return expectedRow;
+ }
+
+ private List<InternalRow> getExpectedResultWithSequenceNumber() {
+ List<InternalRow> expectedRow = new ArrayList<>();
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.DELETE.shortString()),
+ 1L,
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {1})));
+ expectedRow.add(
+ GenericRow.of(
+
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()),
+ 2L,
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {2}),
+ new GenericArray(new Object[] {6})));
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.INSERT.shortString()),
+ 0L,
+ new GenericArray(new Object[] {2}),
+ new GenericArray(new Object[] {3}),
+ new GenericArray(new Object[] {1})));
+ return expectedRow;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index dfdd9dd207..e4870de583 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -28,14 +28,13 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.AbstractDataTableRead;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;
import java.io.IOException;
-import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
-
/** An {@link InnerTableRead} that reads the data changed before and after
compaction. */
public class LookupCompactDiffRead extends AbstractDataTableRead {
@@ -46,7 +45,11 @@ public class LookupCompactDiffRead extends
AbstractDataTableRead {
super(schema);
this.incrementalDiffRead = new
IncrementalCompactDiffSplitRead(mergeRead);
this.fullPhaseMergeRead =
- SplitRead.convert(mergeRead, split ->
unwrap(mergeRead.createReader(split)));
+ SplitRead.convert(
+ mergeRead,
+ split ->
+ KeyValueTableRead.unwrap(
+ mergeRead.createReader(split),
schema.options()));
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 22271c8a7e..48fcbbda71 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -56,6 +56,7 @@ import static java.util.Collections.singletonList;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/** ITCase for batch file store. */
public class BatchFileStoreITCase extends CatalogITCaseBase {
@@ -1053,6 +1054,127 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
.containsExactly(Row.of("+I", new String[] {"A"}));
}
+ @Test
+ public void testAuditLogTableWithSequenceNumberEnabled() {
+ // Creating an append-only table (no primary key) with
+ // table-read.sequence-number.enabled option should throw an exception
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ sql(
+ "CREATE TABLE test_table_err (a int, b int, c
AS a + b) "
+ + "WITH
('table-read.sequence-number.enabled'='true')"));
+
+ // Selecting an auditlog table with
+ // table-read.sequence-number.enabled option should throw an exception
+ sql("CREATE TABLE test_table_err2 (a int PRIMARY KEY NOT ENFORCED, b
int, c AS a + b);");
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ sql(
+ "SELECT * FROM `test_table_err2$audit_log`"
+ + " /*+
OPTIONS('table-read.sequence-number.enabled' = 'true') */"));
+
+ // Create primary key table with table-read.sequence-number.enabled
option
+ sql(
+ "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED,
b int, c AS a + b) "
+ + "WITH
('table-read.sequence-number.enabled'='true');");
+ sql("INSERT INTO test_table_seq VALUES (1, 2)");
+ sql("INSERT INTO test_table_seq VALUES (3, 4)");
+
+ // Test SELECT * from original table
+ assertThat(sql("SELECT * FROM `test_table_seq`"))
+ .containsExactlyInAnyOrder(Row.of(1, 2, 3), Row.of(3, 4, 7));
+
+ // Test SELECT * includes _SEQUENCE_NUMBER
+ assertThat(sql("SELECT * FROM `test_table_seq$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of("+I", 0L, 1, 2, 3),
Row.of("+I", 1L, 3, 4, 7));
+
+ // Test out-of-order select with _SEQUENCE_NUMBER
+ assertThat(sql("SELECT b, c, _SEQUENCE_NUMBER FROM
`test_table_seq$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of(2, 3, 0L), Row.of(4, 7, 1L));
+
+ // Test selecting only _SEQUENCE_NUMBER, rowkind
+ assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM
`test_table_seq$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+ }
+
+ @Test
+ public void testAuditLogTableWithSequenceNumberAlterTable() {
+ // Create primary key table without sequence-number option
+ sql("CREATE TABLE test_table_dyn (a int PRIMARY KEY NOT ENFORCED, b
int, c AS a + b);");
+ sql("INSERT INTO test_table_dyn VALUES (1, 2)");
+ sql("INSERT INTO test_table_dyn VALUES (3, 4)");
+
+ // Add table-read.sequence-number.enabled option via ALTER TABLE
+ sql("ALTER TABLE test_table_dyn SET
('table-read.sequence-number.enabled'='true')");
+
+ // Test SELECT * includes _SEQUENCE_NUMBER (same as
+ // testAuditLogTableWithSequenceNumberEnabled)
+ assertThat(sql("SELECT * FROM `test_table_dyn$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of("+I", 0L, 1, 2, 3),
Row.of("+I", 1L, 3, 4, 7));
+
+ // Test out-of-order select with _SEQUENCE_NUMBER
+ assertThat(sql("SELECT b, c, _SEQUENCE_NUMBER FROM
`test_table_dyn$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of(2, 3, 0L), Row.of(4, 7, 1L));
+
+ // Test selecting only _SEQUENCE_NUMBER, rowkind
+ assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM
`test_table_dyn$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+ }
+
+ @Test
+ public void testBinlogTableWithSequenceNumberEnabled() {
+ // Create primary key table with table-read.sequence-number.enabled
option
+ sql(
+ "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED,
b int) "
+ + "WITH
('table-read.sequence-number.enabled'='true');");
+ sql("INSERT INTO test_table_seq VALUES (1, 2)");
+ sql("INSERT INTO test_table_seq VALUES (3, 4)");
+
+ // Test SELECT * includes _SEQUENCE_NUMBER
+ assertThat(sql("SELECT * FROM `test_table_seq$binlog`"))
+ .containsExactlyInAnyOrder(
+ Row.of("+I", 0L, new Integer[] {1}, new Integer[] {2}),
+ Row.of("+I", 1L, new Integer[] {3}, new Integer[]
{4}));
+
+ // Test out-of-order select with _SEQUENCE_NUMBER
+ assertThat(sql("SELECT b, _SEQUENCE_NUMBER FROM
`test_table_seq$binlog`"))
+ .containsExactlyInAnyOrder(
+ Row.of(new Integer[] {2}, 0L), Row.of(new Integer[]
{4}, 1L));
+
+ // Test selecting only _SEQUENCE_NUMBER
+ assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM
`test_table_seq$binlog`"))
+ .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+ }
+
+ @Test
+ public void testBinlogTableWithSequenceNumberAlterTable() {
+ // Create primary key table without sequence-number option
+ sql("CREATE TABLE test_table_dyn (a int PRIMARY KEY NOT ENFORCED, b
int);");
+ sql("INSERT INTO test_table_dyn VALUES (1, 2)");
+ sql("INSERT INTO test_table_dyn VALUES (3, 4)");
+
+ // Add table-read.sequence-number.enabled option via ALTER TABLE
+ sql("ALTER TABLE test_table_dyn SET
('table-read.sequence-number.enabled'='true')");
+
+ // Test SELECT * includes _SEQUENCE_NUMBER (same as
+ // testBinlogTableWithSequenceNumberEnabled)
+ assertThat(sql("SELECT * FROM `test_table_dyn$binlog`"))
+ .containsExactlyInAnyOrder(
+ Row.of("+I", 0L, new Integer[] {1}, new Integer[] {2}),
+ Row.of("+I", 1L, new Integer[] {3}, new Integer[]
{4}));
+
+ // Test out-of-order select with _SEQUENCE_NUMBER
+ assertThat(sql("SELECT b, _SEQUENCE_NUMBER FROM
`test_table_dyn$binlog`"))
+ .containsExactlyInAnyOrder(
+ Row.of(new Integer[] {2}, 0L), Row.of(new Integer[]
{4}, 1L));
+
+ // Test selecting only _SEQUENCE_NUMBER
+ assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM
`test_table_dyn$binlog`"))
+ .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+ }
+
@Test
public void testBatchReadSourceWithSnapshot() {
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222), (3, 33,
333), (4, 44, 444)");