luoyuxia commented on code in PR #1289:
URL: https://github.com/apache/fluss/pull/1289#discussion_r2218124272
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -178,6 +179,10 @@ private Schema toPaimonSchema(TableDescriptor
tableDescriptor) {
tableDescriptor
.getCustomProperties()
.forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
+ // set commit callback class
Review Comment:
no need to set commit callback class in here, we can always set it while
commiting.
It's strange for user to have a option for commit callback.
In `getTable` method of `PaimonLakeCommitter`, you can add the option
`paimonCatalog.getTable(toPaimon(tablePath))
.copy(commit callback, xxxx)`
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java:
##########
@@ -56,6 +56,7 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
private final FileStoreTable fileStoreTable;
private FileStoreCommit fileStoreCommit;
private final TablePath tablePath;
+ private static Long tempCommitSnapshotId;
Review Comment:
don't use `static`, otherwise, all lake commiter will share this
`tempCommitSnapshotId`. It's not thread safe although our commit operator is
thread safe which commit one by one. I'd like to suggest to use thread local
variable.
```
private static final ThreadLocal<Long> currentCommitSnapshotId = new
ThreadLocal<>();
```
```
Long commitSnapshotId = currentCommitSnapshotId.get();
currentCommitSnapshotId.remove();
```
```
@Override
public void call(
List<ManifestEntry> list, List<IndexManifestEntry>
indexFiles, Snapshot snapshot) {
currentCommitSnapshotId.set(snapshot.id());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]