This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new e7020128 [hotfix] Change commitable to committable to fix typo. (#1318)
e7020128 is described below
commit e702012809cb174b1b44c2c20fb3d1468775a8d5
Author: Kerwin <[email protected]>
AuthorDate: Tue Jul 15 09:40:44 2025 +0800
[hotfix] Change commitable to committable to fix typo. (#1318)
---
.../main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java | 2 +-
.../java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java | 8 ++++----
.../com/alibaba/fluss/flink/tiering/LakeTieringJobBuilder.java | 2 +-
.../fluss/flink/tiering/committer/TieringCommitOperator.java | 4 ++--
.../alibaba/fluss/flink/tiering/TestingLakeTieringFactory.java | 7 ++++---
.../alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java | 2 +-
.../fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java | 2 +-
.../com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java | 8 ++++----
website/blog/2025-07-01-tiering-service.md | 6 +++---
9 files changed, 21 insertions(+), 20 deletions(-)
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java
index 031ca943..d49ee689 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java
@@ -42,7 +42,7 @@ public interface LakeCommitter<WriteResult, CommittableT>
extends AutoCloseable
* @return the committable object
* @throws IOException if an I/O error occurs
*/
- CommittableT toCommitable(List<WriteResult> writeResults) throws
IOException;
+ CommittableT toCommittable(List<WriteResult> writeResults) throws
IOException;
/**
* Commits the given committable object.
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java
b/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java
index 388e7535..61417c9a 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java
@@ -31,11 +31,11 @@ import java.io.Serializable;
* serializers for write results and committable objects.
*
* @param <WriteResult> the type of the write result
- * @param <CommitableT> the type of the committable object
+ * @param <CommittableT> the type of the committable object
* @since 0.7
*/
@PublicEvolving
-public interface LakeTieringFactory<WriteResult, CommitableT> extends
Serializable {
+public interface LakeTieringFactory<WriteResult, CommittableT> extends
Serializable {
/**
* Creates a lake writer to write Fluss's rows to Paimon/Iceberg rows.
@@ -60,7 +60,7 @@ public interface LakeTieringFactory<WriteResult, CommitableT>
extends Serializab
* @return the lake committer
* @throws IOException if an I/O error occurs
*/
- LakeCommitter<WriteResult, CommitableT> createLakeCommitter(
+ LakeCommitter<WriteResult, CommittableT> createLakeCommitter(
CommitterInitContext committerInitContext) throws IOException;
/**
@@ -68,5 +68,5 @@ public interface LakeTieringFactory<WriteResult, CommitableT>
extends Serializab
*
* @return the serializer for committable objects
*/
- SimpleVersionedSerializer<CommitableT> getCommitableSerializer();
+ SimpleVersionedSerializer<CommittableT> getCommittableSerializer();
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/LakeTieringJobBuilder.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/LakeTieringJobBuilder.java
index 3fc776a7..ca74b895 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/LakeTieringJobBuilder.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/LakeTieringJobBuilder.java
@@ -98,7 +98,7 @@ public class LakeTieringJobBuilder {
source.transform(
"TieringCommitter",
CommittableMessageTypeInfo.of(
- () ->
lakeTieringFactory.getCommitableSerializer()),
+ () ->
lakeTieringFactory.getCommittableSerializer()),
new TieringCommitOperatorFactory(flussConfig,
lakeTieringFactory))
.setParallelism(1)
.setMaxParallelism(1)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
index 63f0dfaa..6f6720a9 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -64,7 +64,7 @@ import static
com.alibaba.fluss.utils.Preconditions.checkState;
*
* <p>When it collects all {@link TableBucketWriteResult}s of a round of
tiering for a table, it
* will combine all the {@link WriteResult}s to {@link Committable} via method
{@link
- * LakeCommitter#toCommitable(List)}, and then call method {@link
LakeCommitter#commit(Object)} to
+ * LakeCommitter#toCommittable(List)}, and then call method {@link
LakeCommitter#commit(Object)} to
* commit to lake.
*
* <p>Finally, it will also commit the commited lake snapshot to Fluss cluster
to make Fluss aware
@@ -180,7 +180,7 @@ public class TieringCommitOperator<WriteResult, Committable>
.map(TableBucketWriteResult::writeResult)
.collect(Collectors.toList());
// to committable
- Committable committable = lakeCommitter.toCommitable(writeResults);
+ Committable committable =
lakeCommitter.toCommittable(writeResults);
// before commit to lake, check fluss not missing any lake
snapshot commited by fluss
checkFlussNotMissingLakeSnapshot(tablePath, lakeCommitter,
committable);
long commitedSnapshotId = lakeCommitter.commit(committable);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/TestingLakeTieringFactory.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/TestingLakeTieringFactory.java
index 5319c25f..9e5097bc 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/TestingLakeTieringFactory.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/TestingLakeTieringFactory.java
@@ -66,8 +66,9 @@ public class TestingLakeTieringFactory
}
@Override
- public SimpleVersionedSerializer<TestingCommittable>
getCommitableSerializer() {
- throw new UnsupportedOperationException("method
getCommitableSerializer is not supported.");
+ public SimpleVersionedSerializer<TestingCommittable>
getCommittableSerializer() {
+ throw new UnsupportedOperationException(
+ "method getCommittableSerializer is not supported.");
}
private static final class TestingLakeWriter implements
LakeWriter<TestingWriteResult> {
@@ -107,7 +108,7 @@ public class TestingLakeTieringFactory
}
@Override
- public TestingCommittable toCommitable(List<TestingWriteResult>
testingWriteResults)
+ public TestingCommittable toCommittable(List<TestingWriteResult>
testingWriteResults)
throws IOException {
List<Integer> writeResults = new ArrayList<>();
for (TestingWriteResult testingWriteResult : testingWriteResults) {
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index c363044a..ab2f691b 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -65,7 +65,7 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
}
@Override
- public PaimonCommittable toCommitable(List<PaimonWriteResult>
paimonWriteResults)
+ public PaimonCommittable toCommittable(List<PaimonWriteResult>
paimonWriteResults)
throws IOException {
ManifestCommittable committable = new
ManifestCommittable(COMMIT_IDENTIFIER);
for (PaimonWriteResult paimonWriteResult : paimonWriteResults) {
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
index 2c12c0f6..45700952 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
@@ -59,7 +59,7 @@ public class PaimonLakeTieringFactory
}
@Override
- public SimpleVersionedSerializer<PaimonCommittable>
getCommitableSerializer() {
+ public SimpleVersionedSerializer<PaimonCommittable>
getCommittableSerializer() {
return new PaimonCommittableSerializer();
}
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
index 632df326..86aff158 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -120,7 +120,7 @@ class PaimonTieringTest {
SimpleVersionedSerializer<PaimonWriteResult> writeResultSerializer =
paimonLakeTieringFactory.getWriteResultSerializer();
SimpleVersionedSerializer<PaimonCommittable> committableSerializer =
- paimonLakeTieringFactory.getCommitableSerializer();
+ paimonLakeTieringFactory.getCommittableSerializer();
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter
=
createLakeCommitter(tablePath)) {
@@ -161,7 +161,7 @@ class PaimonTieringTest {
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter
=
createLakeCommitter(tablePath)) {
// serialize/deserialize committable
- PaimonCommittable paimonCommittable =
lakeCommitter.toCommitable(paimonWriteResults);
+ PaimonCommittable paimonCommittable =
lakeCommitter.toCommittable(paimonWriteResults);
byte[] serialized =
committableSerializer.serialize(paimonCommittable);
paimonCommittable =
committableSerializer.deserialize(
@@ -246,7 +246,7 @@ class PaimonTieringTest {
// Commit all data
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter
=
createLakeCommitter(tablePath)) {
- PaimonCommittable committable =
lakeCommitter.toCommitable(paimonWriteResults);
+ PaimonCommittable committable =
lakeCommitter.toCommittable(paimonWriteResults);
long snapshot = lakeCommitter.commit(committable);
assertThat(snapshot).isEqualTo(1);
}
@@ -295,7 +295,7 @@ class PaimonTieringTest {
// Commit all data
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter
=
createLakeCommitter(tablePath)) {
- PaimonCommittable committable =
lakeCommitter.toCommitable(paimonWriteResults);
+ PaimonCommittable committable =
lakeCommitter.toCommittable(paimonWriteResults);
long snapshot = lakeCommitter.commit(committable);
assertThat(snapshot).isEqualTo(1);
}
diff --git a/website/blog/2025-07-01-tiering-service.md
b/website/blog/2025-07-01-tiering-service.md
index f491d309..14c367a2 100644
--- a/website/blog/2025-07-01-tiering-service.md
+++ b/website/blog/2025-07-01-tiering-service.md
@@ -166,16 +166,16 @@ public interface LakeTieringFactory {
SimpleVersionedSerializer<WriteResult> getWriteResultSerializer();
- LakeCommitter<WriteResult, CommitableT> createLakeCommitter(
+ LakeCommitter<WriteResult, CommittableT> createLakeCommitter(
CommitterInitContext committerInitContext);
- SimpleVersionedSerializer<CommitableT> getCommitableSerializer();
+ SimpleVersionedSerializer<CommittableT> getCommittableSerializer();
}
```
- **createLakeWriter(WriterInitContext)**: builds a `LakeWriter` to convert
Fluss rows into the target table format.
- **getWriteResultSerializer()**: supplies a serializer for the writer’s
output.
- **createLakeCommitter(CommitterInitContext)**: constructs a `LakeCommitter`
to finalize and atomically commit data files.
-- **getCommitableSerializer()**: provides a serializer for committable
tokens.```
+- **getCommittableSerializer()**: provides a serializer for committable
tokens.```
By default, Fluss includes a Paimon-backed tiering factory; Iceberg support is
coming soon. Once the `TieringSourceReader` writes a batch of records through
the `LakeWriter`, it emits the resulting write metadata downstream to the
**TieringCommitOperator**, which then commits those changes both in the
lakehouse and back to the Fluss cluster.