This is an automated email from the ASF dual-hosted git repository.
liguojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9bd60203d [Core] Support branch batch/streaming read and write (#2748)
9bd60203d is described below
commit 9bd60203d1e1053e6f96d0a9ec42361a5ff01628
Author: TaoZex <[email protected]>
AuthorDate: Wed Feb 7 14:49:22 2024 +0800
[Core] Support branch batch/streaming read and write (#2748)
[Core] Support branch batch/streaming read and write
---
.../java/org/apache/paimon/AbstractFileStore.java | 7 ++
.../org/apache/paimon/AppendOnlyFileStore.java | 14 ++-
.../src/main/java/org/apache/paimon/FileStore.java | 4 +
.../java/org/apache/paimon/KeyValueFileStore.java | 14 ++-
.../paimon/operation/AbstractFileStoreScan.java | 7 +-
.../paimon/operation/AppendOnlyFileStoreScan.java | 6 +-
.../paimon/operation/FileStoreCommitImpl.java | 27 ++++-
.../paimon/operation/KeyValueFileStoreScan.java | 6 +-
.../org/apache/paimon/schema/SchemaManager.java | 15 ++-
.../java/org/apache/paimon/schema/TableSchema.java | 4 +
.../paimon/table/AbstractFileStoreTable.java | 16 ++-
.../java/org/apache/paimon/table/DataTable.java | 2 +
.../org/apache/paimon/table/FileStoreTable.java | 2 +
.../apache/paimon/table/system/AuditLogTable.java | 5 +
.../apache/paimon/table/system/BucketsTable.java | 5 +
.../paimon/table/system/FileMonitorTable.java | 5 +
.../paimon/table/system/ReadOptimizedTable.java | 5 +
.../org/apache/paimon/utils/BranchManager.java | 6 ++
.../org/apache/paimon/utils/SnapshotManager.java | 88 +++++++++++----
.../java/org/apache/paimon/utils/TagManager.java | 5 +
.../apache/paimon/operation/FileDeletionTest.java | 2 +
.../paimon/table/AppendOnlyFileStoreTableTest.java | 72 +++++++++++++
.../paimon/table/FileStoreTableTestBase.java | 118 ++++++++++++++++++---
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 68 ++++++++++++
.../paimon/table/SchemaEvolutionTableTestBase.java | 6 ++
25 files changed, 451 insertions(+), 58 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 0b4ebba19..896b53794 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -53,6 +53,8 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+
/**
* Base {@link FileStore} implementation.
*
@@ -169,6 +171,10 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
@Override
public FileStoreCommitImpl newCommit(String commitUser) {
+ return newCommit(commitUser, DEFAULT_MAIN_BRANCH);
+ }
+
+ public FileStoreCommitImpl newCommit(String commitUser, String branchName)
{
return new FileStoreCommitImpl(
fileIO,
schemaManager,
@@ -186,6 +192,7 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
options.manifestMergeMinCount(),
partitionType.getFieldCount() > 0 &&
options.dynamicPartitionOverwrite(),
newKeyComparator(),
+ branchName,
newStatsFileHandler());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index ec1e7cb58..8be8f8178 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -38,6 +38,7 @@ import java.util.List;
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
/** {@link FileStore} for reading and writing {@link InternalRow}. */
public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {
@@ -69,7 +70,11 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
@Override
public AppendOnlyFileStoreScan newScan() {
- return newScan(false);
+ return newScan(DEFAULT_MAIN_BRANCH);
+ }
+
+ public AppendOnlyFileStoreScan newScan(String branchName) {
+ return newScan(false, branchName);
}
@Override
@@ -99,12 +104,12 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
rowType,
pathFactory(),
snapshotManager(),
- newScan(true).withManifestCacheFilter(manifestFilter),
+ newScan(true,
DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
options,
tableName);
}
- private AppendOnlyFileStoreScan newScan(boolean forWrite) {
+ private AppendOnlyFileStoreScan newScan(boolean forWrite, String
branchName) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
@@ -138,7 +143,8 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
manifestListFactory(forWrite),
options.bucket(),
forWrite,
- options.scanManifestParallelism());
+ options.scanManifestParallelism(),
+ branchName);
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index b8346f986..cd38d2061 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -63,6 +63,8 @@ public interface FileStore<T> extends Serializable {
FileStoreScan newScan();
+ FileStoreScan newScan(String branchName);
+
ManifestList.Factory manifestListFactory();
ManifestFile.Factory manifestFileFactory();
@@ -79,6 +81,8 @@ public interface FileStore<T> extends Serializable {
FileStoreCommit newCommit(String commitUser);
+ FileStoreCommit newCommit(String commitUser, String branchName);
+
SnapshotDeletion newSnapshotDeletion();
TagManager newTagManager();
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 710b01585..373bce35c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -52,6 +52,7 @@ import java.util.function.Supplier;
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** {@link FileStore} for querying and updating {@link KeyValue}s. */
@@ -107,7 +108,11 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
@Override
public KeyValueFileStoreScan newScan() {
- return newScan(false);
+ return newScan(DEFAULT_MAIN_BRANCH);
+ }
+
+ public KeyValueFileStoreScan newScan(String branchName) {
+ return newScan(false, branchName);
}
@Override
@@ -159,7 +164,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
pathFactory(),
format2PathFactory(),
snapshotManager(),
- newScan(true).withManifestCacheFilter(manifestFilter),
+ newScan(true,
DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
indexFactory,
options,
keyValueFieldsExtractor,
@@ -182,7 +187,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
return pathFactoryMap;
}
- private KeyValueFileStoreScan newScan(boolean forWrite) {
+ private KeyValueFileStoreScan newScan(boolean forWrite, String branchName)
{
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
@@ -212,7 +217,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
manifestListFactory(forWrite),
options.bucket(),
forWrite,
- options.scanManifestParallelism());
+ options.scanManifestParallelism(),
+ branchName);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 65857daed..adcab4191 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -84,6 +84,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private final Integer scanManifestParallelism;
private ScanMetrics scanMetrics = null;
+ private String branchName;
public AbstractFileStoreScan(
RowType partitionType,
@@ -94,7 +95,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
- Integer scanManifestParallelism) {
+ Integer scanManifestParallelism,
+ String branchName) {
this.partitionType = partitionType;
this.bucketKeyFilter = bucketKeyFilter;
this.snapshotManager = snapshotManager;
@@ -105,6 +107,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
this.checkNumOfBuckets = checkNumOfBuckets;
this.tableSchemas = new ConcurrentHashMap<>();
this.scanManifestParallelism = scanManifestParallelism;
+ this.branchName = branchName;
}
@Override
@@ -245,7 +248,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
if (manifests == null) {
snapshot =
specifiedSnapshot == null
- ? snapshotManager.latestSnapshot()
+ ? snapshotManager.latestSnapshot(branchName)
: specifiedSnapshot;
if (snapshot == null) {
manifests = Collections.emptyList();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 1cd7db0d2..90aa988b6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -49,7 +49,8 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
- Integer scanManifestParallelism) {
+ Integer scanManifestParallelism,
+ String branchName) {
super(
partitionType,
bucketFilter,
@@ -59,7 +60,8 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
- scanManifestParallelism);
+ scanManifestParallelism,
+ branchName);
this.fieldStatsConverters =
new FieldStatsConverters(sid -> scanTableSchema(sid).fields(),
schemaId);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 044eb7f6e..0264f1e45 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -71,6 +71,8 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+
/**
* Default implementation of {@link FileStoreCommit}.
*
@@ -112,6 +114,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private final int manifestMergeMinCount;
private final boolean dynamicPartitionOverwrite;
@Nullable private final Comparator<InternalRow> keyComparator;
+ private final String branchName;
@Nullable private Lock lock;
private boolean ignoreEmptyCommit;
@@ -137,6 +140,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
int manifestMergeMinCount,
boolean dynamicPartitionOverwrite,
@Nullable Comparator<InternalRow> keyComparator,
+ String branchName,
StatsFileHandler statsFileHandler) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
@@ -155,6 +159,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.manifestMergeMinCount = manifestMergeMinCount;
this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
this.keyComparator = keyComparator;
+ this.branchName = branchName;
+
this.lock = null;
this.ignoreEmptyCommit = true;
this.commitMetrics = null;
@@ -233,7 +239,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of
times we read from
// files.
- latestSnapshot = snapshotManager.latestSnapshot();
+ latestSnapshot = snapshotManager.latestSnapshot(branchName);
if (latestSnapshot != null) {
// it is possible that some partitions only have compact
changes,
// so we need to contain all changes
@@ -254,6 +260,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.logOffsets(),
Snapshot.CommitKind.APPEND,
safeLatestSnapshotId,
+ branchName,
null);
generatedSnapshot += 1;
}
@@ -283,6 +290,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
safeLatestSnapshotId,
+ branchName,
null);
generatedSnapshot += 1;
}
@@ -428,6 +436,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
null,
+ branchName,
null);
generatedSnapshot += 1;
}
@@ -523,6 +532,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
Collections.emptyMap(),
Snapshot.CommitKind.ANALYZE,
null,
+ branchName,
statsFileName);
}
@@ -596,10 +606,11 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
@Nullable Long safeLatestSnapshotId,
+ String branchName,
@Nullable String statsFileName) {
int cnt = 0;
while (true) {
- Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+ Snapshot latestSnapshot =
snapshotManager.latestSnapshot(branchName);
cnt++;
if (tryCommitOnce(
tableFiles,
@@ -611,6 +622,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
commitKind,
latestSnapshot,
safeLatestSnapshotId,
+ branchName,
statsFileName)) {
break;
}
@@ -672,6 +684,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
Snapshot.CommitKind.OVERWRITE,
latestSnapshot,
null,
+ branchName,
null)) {
break;
}
@@ -690,10 +703,14 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
Snapshot.CommitKind commitKind,
@Nullable Snapshot latestSnapshot,
@Nullable Long safeLatestSnapshotId,
+ String branchName,
@Nullable String newStatsFileName) {
long newSnapshotId =
latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID :
latestSnapshot.id() + 1;
- Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId);
+ Path newSnapshotPath =
+ branchName.equals(DEFAULT_MAIN_BRANCH)
+ ? snapshotManager.snapshotPath(newSnapshotId)
+ : snapshotManager.branchSnapshotPath(branchName,
newSnapshotId);
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot #" +
newSnapshotId);
@@ -775,7 +792,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
newIndexManifest = indexManifest;
}
- long latestSchemaId = schemaManager.latest().get().id();
+ long latestSchemaId = schemaManager.latest(branchName).get().id();
// write new stats or inherit from the previous snapshot
String statsFileName = null;
@@ -840,7 +857,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
boolean committed =
fileIO.writeFileUtf8(newSnapshotPath,
newSnapshot.toJson());
if (committed) {
- snapshotManager.commitLatestHint(newSnapshotId);
+ snapshotManager.commitLatestHint(newSnapshotId,
branchName);
}
return committed;
};
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index d528e5e79..02086ceb3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -53,7 +53,8 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
- Integer scanManifestParallelism) {
+ Integer scanManifestParallelism,
+ String branchName) {
super(
partitionType,
bucketFilter,
@@ -63,7 +64,8 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
- scanManifestParallelism);
+ scanManifestParallelism,
+ branchName);
this.fieldKeyStatsConverters =
new FieldStatsConverters(
sid ->
keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schemaId);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 07dda6196..a6d274688 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -64,6 +64,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -91,8 +92,16 @@ public class SchemaManager implements Serializable {
/** @return latest schema. */
public Optional<TableSchema> latest() {
+ return latest(DEFAULT_MAIN_BRANCH);
+ }
+
+ public Optional<TableSchema> latest(String branchName) {
+ Path directoryPath =
+ branchName.equals(DEFAULT_MAIN_BRANCH)
+ ? schemaDirectory()
+ : branchSchemaDirectory(branchName);
try {
- return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
+ return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX)
.reduce(Math::max)
.map(this::schema);
} catch (IOException e) {
@@ -482,6 +491,10 @@ public class SchemaManager implements Serializable {
return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
}
+ public Path branchSchemaDirectory(String branchName) {
+ return new Path(getBranchPath(tableRoot, branchName) + "/schema");
+ }
+
public Path branchSchemaPath(String branchName, long schemaId) {
return new Path(
getBranchPath(tableRoot, branchName) + "/schema/" +
SCHEMA_PREFIX + schemaId);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index 6fe65aca1..398aa98d1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -285,6 +285,10 @@ public class TableSchema implements Serializable {
timeMillis);
}
+ public static TableSchema fromJson(String json) {
+ return JsonSerdeUtil.fromJson(json, TableSchema.class);
+ }
+
@Override
public String toString() {
return JsonSerdeUtil.toJson(this);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 9900d9ecf..d00ce913d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -68,6 +68,7 @@ import java.util.Optional;
import java.util.function.BiConsumer;
import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -134,8 +135,13 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public SnapshotReader newSnapshotReader() {
+ return newSnapshotReader(DEFAULT_MAIN_BRANCH);
+ }
+
+ @Override
+ public SnapshotReader newSnapshotReader(String branchName) {
return new SnapshotReaderImpl(
- store().newScan(),
+ store().newScan(branchName),
tableSchema,
coreOptions(),
snapshotManager(),
@@ -289,6 +295,11 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public TableCommitImpl newCommit(String commitUser) {
+ // Compatibility with previous design, the main branch is written by
default
+ return newCommit(commitUser, DEFAULT_MAIN_BRANCH);
+ }
+
+ public TableCommitImpl newCommit(String commitUser, String branchName) {
CoreOptions options = coreOptions();
Runnable snapshotExpire = null;
if (!options.writeOnly()) {
@@ -304,8 +315,9 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
.olderThanMills(System.currentTimeMillis()
- snapshotTimeRetain)
.expire();
}
+
return new TableCommitImpl(
- store().newCommit(commitUser),
+ store().newCommit(commitUser, branchName),
createCommitCallbacks(),
snapshotExpire,
options.writeOnly() ? null :
store().newPartitionExpire(commitUser),
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index b5bebe2a7..1d8921304 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -31,6 +31,8 @@ public interface DataTable extends InnerTable {
SnapshotReader newSnapshotReader();
+ SnapshotReader newSnapshotReader(String branchName);
+
CoreOptions coreOptions();
SnapshotManager snapshotManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index b183d6ad6..ab1b2e961 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -97,6 +97,8 @@ public interface FileStoreTable extends DataTable {
@Override
TableCommitImpl newCommit(String commitUser);
+ TableCommitImpl newCommit(String commitUser, String branchName);
+
LocalTableQuery newLocalTableQuery();
default BinaryTableStats getSchemaFieldStats(DataFileMeta dataFileMeta) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 96c2621b6..59ceed137 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -131,6 +131,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return new AuditLogDataReader(dataTable.newSnapshotReader());
}
+ @Override
+ public SnapshotReader newSnapshotReader(String branchName) {
+ return new AuditLogDataReader(dataTable.newSnapshotReader(branchName));
+ }
+
@Override
public InnerTableScan newScan() {
return new AuditLogBatchScan(dataTable.newScan());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 4c9b9a601..2ab88a346 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -148,6 +148,11 @@ public class BucketsTable implements DataTable,
ReadonlyTable {
return wrapped.newSnapshotReader();
}
+ @Override
+ public SnapshotReader newSnapshotReader(String branchName) {
+ return wrapped.newSnapshotReader(branchName);
+ }
+
@Override
public InnerTableScan newScan() {
return wrapped.newScan();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index 7825b93e4..bedc19ac3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -135,6 +135,11 @@ public class FileMonitorTable implements DataTable,
ReadonlyTable {
return wrapped.newSnapshotReader();
}
+ @Override
+ public SnapshotReader newSnapshotReader(String branchName) {
+ return wrapped.newSnapshotReader(branchName);
+ }
+
@Override
public InnerTableScan newScan() {
return wrapped.newScan();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 8daff265f..0deac172b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -97,6 +97,11 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
}
}
+ @Override
+ public SnapshotReader newSnapshotReader(String branchName) {
+ return dataTable.newSnapshotReader(branchName);
+ }
+
@Override
public InnerTableScan newScan() {
return new InnerTableScanImpl(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 9d14d5368..2bf167fe2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -42,6 +42,7 @@ public class BranchManager {
private static final Logger LOG =
LoggerFactory.getLogger(BranchManager.class);
public static final String BRANCH_PREFIX = "branch-";
+ public static final String DEFAULT_MAIN_BRANCH = "main";
private final FileIO fileIO;
private final Path tablePath;
@@ -78,6 +79,11 @@ public class BranchManager {
}
public void createBranch(String branchName, String tagName) {
+ checkArgument(
+ !branchName.equals(DEFAULT_MAIN_BRANCH),
+ String.format(
+ "Branch name '%s' is the default branch and cannot be
used.",
+ DEFAULT_MAIN_BRANCH));
checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is
blank.", branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already
exists.", branchName);
checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not
exists.", tagName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index c69653855..2f0a1d859 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -43,6 +43,7 @@ import java.util.function.BinaryOperator;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
@@ -81,13 +82,34 @@ public class SnapshotManager implements Serializable {
return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX +
snapshotId);
}
+ public Path branchSnapshotDirectory(String branchName) {
+ return new Path(getBranchPath(tablePath, branchName) + "/snapshot");
+ }
+
public Path branchSnapshotPath(String branchName, long snapshotId) {
return new Path(
getBranchPath(tablePath, branchName) + "/snapshot/" +
SNAPSHOT_PREFIX + snapshotId);
}
+ public Path snapshotPathByBranch(String branchName, long snapshotId) {
+ return branchName.equals(DEFAULT_MAIN_BRANCH)
+ ? snapshotPath(snapshotId)
+ : branchSnapshotPath(branchName, snapshotId);
+ }
+
+ public Path snapshotDirByBranch(String branchName) {
+ return branchName.equals(DEFAULT_MAIN_BRANCH)
+ ? snapshotDirectory()
+ : branchSnapshotDirectory(branchName);
+ }
+
public Snapshot snapshot(long snapshotId) {
- return Snapshot.fromPath(fileIO, snapshotPath(snapshotId));
+ return snapshot(DEFAULT_MAIN_BRANCH, snapshotId);
+ }
+
+ public Snapshot snapshot(String branchName, long snapshotId) {
+ Path snapshotPath = snapshotPathByBranch(branchName, snapshotId);
+ return Snapshot.fromPath(fileIO, snapshotPath);
}
public boolean snapshotExists(long snapshotId) {
@@ -102,13 +124,21 @@ public class SnapshotManager implements Serializable {
}
public @Nullable Snapshot latestSnapshot() {
- Long snapshotId = latestSnapshotId();
- return snapshotId == null ? null : snapshot(snapshotId);
+ return latestSnapshot(DEFAULT_MAIN_BRANCH);
+ }
+
+ public @Nullable Snapshot latestSnapshot(String branchName) {
+ Long snapshotId = latestSnapshotId(branchName);
+ return snapshotId == null ? null : snapshot(branchName, snapshotId);
}
public @Nullable Long latestSnapshotId() {
+ return latestSnapshotId(DEFAULT_MAIN_BRANCH);
+ }
+
+ public @Nullable Long latestSnapshotId(String branchName) {
try {
- return findLatest();
+ return findLatest(branchName);
} catch (IOException e) {
throw new RuntimeException("Failed to find latest snapshot id", e);
}
@@ -120,8 +150,12 @@ public class SnapshotManager implements Serializable {
}
public @Nullable Long earliestSnapshotId() {
+ return earliestSnapshotId(DEFAULT_MAIN_BRANCH);
+ }
+
+ public @Nullable Long earliestSnapshotId(String branchName) {
try {
- return findEarliest();
+ return findEarliest(branchName);
} catch (IOException e) {
throw new RuntimeException("Failed to find earliest snapshot id",
e);
}
@@ -352,13 +386,13 @@ public class SnapshotManager implements Serializable {
return null;
}
- private @Nullable Long findLatest() throws IOException {
- Path snapshotDir = snapshotDirectory();
+ private @Nullable Long findLatest(String branchName) throws IOException {
+ Path snapshotDir = snapshotDirByBranch(branchName);
if (!fileIO.exists(snapshotDir)) {
return null;
}
- Long snapshotId = readHint(LATEST);
+ Long snapshotId = readHint(LATEST, branchName);
if (snapshotId != null) {
long nextSnapshot = snapshotId + 1;
// it is the latest only there is no next one
@@ -367,26 +401,30 @@ public class SnapshotManager implements Serializable {
}
}
- return findByListFiles(Math::max);
+ return findByListFiles(Math::max, branchName);
}
- private @Nullable Long findEarliest() throws IOException {
- Path snapshotDir = snapshotDirectory();
+ private @Nullable Long findEarliest(String branchName) throws IOException {
+ Path snapshotDir = snapshotDirByBranch(branchName);
if (!fileIO.exists(snapshotDir)) {
return null;
}
- Long snapshotId = readHint(EARLIEST);
+ Long snapshotId = readHint(EARLIEST, branchName);
// null and it is the earliest only it exists
if (snapshotId != null && snapshotExists(snapshotId)) {
return snapshotId;
}
- return findByListFiles(Math::min);
+ return findByListFiles(Math::min, branchName);
}
public Long readHint(String fileName) {
- Path snapshotDir = snapshotDirectory();
+ return readHint(fileName, DEFAULT_MAIN_BRANCH);
+ }
+
+ public Long readHint(String fileName, String branchName) {
+ Path snapshotDir = snapshotDirByBranch(branchName);
Path path = new Path(snapshotDir, fileName);
int retryNumber = 0;
while (retryNumber++ < READ_HINT_RETRY_NUM) {
@@ -404,23 +442,33 @@ public class SnapshotManager implements Serializable {
return null;
}
- private Long findByListFiles(BinaryOperator<Long> reducer) throws
IOException {
- Path snapshotDir = snapshotDirectory();
+ private Long findByListFiles(BinaryOperator<Long> reducer, String
branchName)
+ throws IOException {
+ Path snapshotDir = snapshotDirByBranch(branchName);
return listVersionedFiles(fileIO, snapshotDir, SNAPSHOT_PREFIX)
.reduce(reducer)
.orElse(null);
}
public void commitLatestHint(long snapshotId) throws IOException {
- commitHint(snapshotId, LATEST);
+ commitLatestHint(snapshotId, DEFAULT_MAIN_BRANCH);
+ }
+
+ public void commitLatestHint(long snapshotId, String branchName) throws
IOException {
+ commitHint(snapshotId, LATEST, branchName);
}
public void commitEarliestHint(long snapshotId) throws IOException {
- commitHint(snapshotId, EARLIEST);
+ commitEarliestHint(snapshotId, DEFAULT_MAIN_BRANCH);
+ }
+
+ public void commitEarliestHint(long snapshotId, String branchName) throws
IOException {
+ commitHint(snapshotId, EARLIEST, branchName);
}
- private void commitHint(long snapshotId, String fileName) throws
IOException {
- Path snapshotDir = snapshotDirectory();
+ private void commitHint(long snapshotId, String fileName, String
branchName)
+ throws IOException {
+ Path snapshotDir = snapshotDirByBranch(branchName);
Path hintFile = new Path(snapshotDir, fileName);
fileIO.delete(hintFile, false);
fileIO.writeFileUtf8(hintFile, String.valueOf(snapshotId));
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 96167b91d..3f9599431 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -69,6 +69,11 @@ public class TagManager {
return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName);
}
+ /** Return the path of tag directory in branch. */
+ public Path branchTagDirectory(String branchName) {
+ return new Path(getBranchPath(tablePath, branchName) + "/tag");
+ }
+
/** Return the path of a tag in branch. */
public Path branchTagPath(String branchName, String tagName) {
return new Path(getBranchPath(tablePath, branchName) + "/tag/" +
TAG_PREFIX + tagName);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index c41741c08..49e55ee4e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -63,6 +63,7 @@ import static
org.apache.paimon.operation.FileStoreTestUtils.assertPathExists;
import static
org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists;
import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
import static org.apache.paimon.operation.FileStoreTestUtils.partitionedData;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.assertj.core.api.Assertions.assertThat;
/**
@@ -723,6 +724,7 @@ public class FileDeletionTest {
Snapshot.CommitKind.APPEND,
store.snapshotManager().latestSnapshot(),
null,
+ DEFAULT_MAIN_BRANCH,
null);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index d91e0a320..9e0c6d2ab 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -85,6 +85,30 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
"2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
}
+ @Test
+ public void testBranchBatchReadWrite() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ generateBranch(table);
+ writeBranchData(table);
+ List<Split> splits =
toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits());
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(
+ Arrays.asList(
+
"1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+
"1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+
"1|12|102|binary|varbinary|mapKey:mapVal|multiset",
+
"1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+
"1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(
+ Arrays.asList(
+
"2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+
"2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+
"2|22|202|binary|varbinary|mapKey:mapVal|multiset",
+
"2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
+ }
+
@Test
public void testBatchProjection() throws Exception {
writeData();
@@ -241,6 +265,31 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
assertThat(partitions).containsExactly(1, 2, 3);
}
+ @Test
+ public void testBranchStreamingReadWrite() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ generateBranch(table);
+ writeBranchData(table);
+
+ List<Split> splits =
+ toSplits(
+ table.newSnapshotReader(BRANCH_NAME)
+ .withMode(ScanMode.DELTA)
+ .read()
+ .dataSplits());
+ TableRead read = table.newRead();
+
+ assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
+ .isEqualTo(
+ Arrays.asList(
+
"+1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+
"+1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
+ .isEqualTo(
+ Collections.singletonList(
+
"+2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
+ }
+
@Test
public void testStreamingSplitInUnawareBucketMode() throws Exception {
// in unaware-bucket mode, we split files into splits all the time
@@ -451,6 +500,29 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
commit.close();
}
+ private void writeBranchData(FileStoreTable table) throws Exception {
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(2, 20, 200L));
+ write.write(rowData(1, 11, 101L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 12, 102L));
+ write.write(rowData(2, 21, 201L));
+ write.write(rowData(2, 22, 202L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.write(rowData(1, 11, 101L));
+ write.write(rowData(2, 21, 201L));
+ write.write(rowData(1, 12, 102L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ write.close();
+ commit.close();
+ }
+
@Override
protected FileStoreTable createFileStoreTable(Consumer<Options> configure)
throws Exception {
Options conf = new Options();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index c07fc2029..b9b117182 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -117,6 +117,8 @@ import static
org.junit.jupiter.params.provider.Arguments.arguments;
/** Base test class for {@link FileStoreTable}. */
public abstract class FileStoreTableTestBase {
+ protected static final String BRANCH_NAME = "branch1";
+
protected static final RowType ROW_TYPE =
RowType.of(
new DataType[] {
@@ -941,14 +943,7 @@ public abstract class FileStoreTableTestBase {
table.createBranch("test-branch", "test-tag");
// verify that branch file exist
- TraceableFileIO fileIO = new TraceableFileIO();
- BranchManager branchManager =
- new BranchManager(
- fileIO,
- tablePath,
- new SnapshotManager(fileIO, tablePath),
- new TagManager(fileIO, tablePath),
- new SchemaManager(fileIO, tablePath));
+ BranchManager branchManager = table.branchManager();
assertThat(branchManager.branchExists("test-branch")).isTrue();
// verify test-tag in test-branch is equal to snapshot 2
@@ -987,6 +982,12 @@ public abstract class FileStoreTableTestBase {
table.createTag("test-tag", 1);
table.createBranch("branch0", "test-tag");
+ assertThatThrownBy(() -> table.createBranch("main", "tag1"))
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ IllegalArgumentException.class,
+ "Branch name 'main' is the default branch and
cannot be used."));
+
assertThatThrownBy(() -> table.createBranch("branch-1", "tag1"))
.satisfies(
AssertionUtils.anyCauseMatches(
@@ -1026,14 +1027,7 @@ public abstract class FileStoreTableTestBase {
table.deleteBranch("branch1");
// verify that branch file not exist
- TraceableFileIO fileIO = new TraceableFileIO();
- BranchManager branchManager =
- new BranchManager(
- fileIO,
- tablePath,
- new SnapshotManager(fileIO, tablePath),
- new TagManager(fileIO, tablePath),
- new SchemaManager(fileIO, tablePath));
+ BranchManager branchManager = table.branchManager();
assertThat(branchManager.branchExists("branch1")).isFalse();
assertThatThrownBy(() -> table.deleteBranch("branch1"))
@@ -1240,6 +1234,66 @@ public abstract class FileStoreTableTestBase {
assertThat(schemaPath).isEqualTo(tablePath);
}
+ @Test
+ public void testBranchWriteAndRead() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ generateBranch(table);
+
+ // Write data to branch1
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser,
BRANCH_NAME)) {
+ write.write(rowData(2, 20, 200L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // Validate data in main branch
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+ // Validate data in branch1
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+ // Write two rows data to branch1 again
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser,
BRANCH_NAME)) {
+ write.write(rowData(3, 30, 300L));
+ write.write(rowData(4, 40, 400L));
+ commit.commit(2, write.prepareCommit(false, 3));
+ }
+
+ // Validate data in main branch
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+ // Verify data in branch1
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+ "3|30|300|binary|varbinary|mapKey:mapVal|multiset",
+ "4|40|400|binary|varbinary|mapKey:mapVal|multiset");
+ }
+
protected List<String> getResult(
TableRead read,
List<Split> splits,
@@ -1420,4 +1474,36 @@ public abstract class FileStoreTableTestBase {
protected List<Split> toSplits(List<DataSplit> dataSplits) {
return new ArrayList<>(dataSplits);
}
+
+ // create a branch which named branch1
+ protected void generateBranch(FileStoreTable table) throws Exception {
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+
+ // verify that branch1 file exist
+ TraceableFileIO fileIO = new TraceableFileIO();
+ BranchManager branchManager = table.branchManager();
+ assertThat(branchManager.branchExists(BRANCH_NAME)).isTrue();
+
+ // Verify branch1 and the main branch have the same data
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 92172ae4f..2412e4b7c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -261,6 +261,24 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
"2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
}
+ @Test
+ public void testBranchBatchReadWrite() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ generateBranch(table);
+ writeBranchData(table);
+ List<Split> splits =
toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits());
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
+ .isEqualTo(
+ Collections.singletonList(
+
"1|10|1000|binary|varbinary|mapKey:mapVal|multiset"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
+ .isEqualTo(
+ Arrays.asList(
+
"2|21|20001|binary|varbinary|mapKey:mapVal|multiset",
+
"2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
+ }
+
@Test
public void testBatchProjection() throws Exception {
writeData();
@@ -314,6 +332,31 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
"+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
}
+ @Test
+ public void testBranchStreamingReadWrite() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ generateBranch(table);
+ writeBranchData(table);
+
+ List<Split> splits =
+ toSplits(
+ table.newSnapshotReader(BRANCH_NAME)
+ .withMode(ScanMode.DELTA)
+ .read()
+ .dataSplits());
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
+ .isEqualTo(
+ Collections.singletonList(
+
"-1|11|1001|binary|varbinary|mapKey:mapVal|multiset"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
+ .isEqualTo(
+ Arrays.asList(
+
"-2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+
"+2|21|20001|binary|varbinary|mapKey:mapVal|multiset",
+
"+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
+ }
+
@Test
public void testStreamingProjection() throws Exception {
writeData();
@@ -611,6 +654,31 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
commit.close();
}
+ private void writeBranchData(FileStoreTable table) throws Exception {
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(2, 20, 200L));
+ write.write(rowData(1, 11, 101L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 1000L));
+ write.write(rowData(2, 21, 201L));
+ write.write(rowData(2, 21, 2001L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.write(rowData(1, 11, 1001L));
+ write.write(rowData(2, 21, 20001L));
+ write.write(rowData(2, 22, 202L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 11, 1001L));
+ write.write(rowDataWithKind(RowKind.DELETE, 2, 20, 200L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ write.close();
+ commit.close();
+ }
+
@Override
@Test
public void testReadFilter() throws Exception {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
index f5874bed7..a55bda911 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
@@ -502,6 +502,12 @@ public abstract class SchemaEvolutionTableTestBase {
.orElseThrow(IllegalStateException::new)));
}
+ @Override
+ public Optional<TableSchema> latest(String branchName) {
+ // for compatibility test
+ return latest();
+ }
+
@Override
public List<TableSchema> listAll() {
return new ArrayList<>(tableSchemas.values());