yihua commented on code in PR #13269:
URL: https://github.com/apache/hudi/pull/13269#discussion_r2076772858
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -940,46 +954,53 @@ public String startCommit(String actionType,
HoodieTableMetaClient metaClient) {
*/
public void startCommitWithTime(String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
- startCommitWithTime(instantTime, metaClient.getCommitActionType(),
metaClient);
+ startCommitWithTime(Option.of(instantTime),
metaClient.getCommitActionType(), metaClient);
}
/**
* Completes a new commit time for a write operation
(insert/update/delete/insert_overwrite/insert_overwrite_table) with specified
action.
*/
public void startCommitWithTime(String instantTime, String actionType) {
HoodieTableMetaClient metaClient = createMetaClient(true);
- startCommitWithTime(instantTime, actionType, metaClient);
+ startCommitWithTime(Option.of(instantTime), actionType, metaClient);
}
/**
* Starts a new commit time for a write operation (insert/update/delete)
with specified action.
*/
- private void startCommitWithTime(String instantTime, String actionType,
HoodieTableMetaClient metaClient) {
+ private String startCommitWithTime(Option<String> providedInstantTime,
String actionType, HoodieTableMetaClient metaClient) {
if (needsUpgrade(metaClient)) {
// unclear what instant to use, since upgrade does have a given instant.
executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient,
Option.empty()));
}
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.COMMIT_ACTION, () ->
tableServiceClient.rollbackFailedWrites(metaClient));
- LOG.info("Generate a new instant time: {} action: {}", instantTime,
actionType);
- // check there are no inflight restore before starting a new commit.
- HoodieTimeline inflightRestoreTimeline =
metaClient.getActiveTimeline().getRestoreTimeline().filterInflightsAndRequested();
- ValidationUtils.checkArgument(inflightRestoreTimeline.countInstants() == 0,
- () -> "Found pending restore in active timeline. Please complete the
restore fully before proceeding. As of now, "
- + "table could be in an inconsistent state. Pending restores: "
- +
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream().map(HoodieInstant::requestedTime).toArray()));
-
- if (config.getFailedWritesCleanPolicy().isLazy()) {
- this.heartbeatClient.start(instantTime);
- }
+ txnManager.beginTransaction(Option.empty(), Option.empty());
Review Comment:
Is the intention to get rid of locking logic inside `TimeGeneratorBase` in
the future? I see that `LockManager`'s constructor logic invoked by
`TransactionManager` has similar logic in `TimeGeneratorBase`'s constructor.
So using transaction manager here for locking has the same behavior as using
time generator for locking.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -940,46 +954,53 @@ public String startCommit(String actionType,
HoodieTableMetaClient metaClient) {
*/
public void startCommitWithTime(String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
- startCommitWithTime(instantTime, metaClient.getCommitActionType(),
metaClient);
+ startCommitWithTime(Option.of(instantTime),
metaClient.getCommitActionType(), metaClient);
}
/**
* Completes a new commit time for a write operation
(insert/update/delete/insert_overwrite/insert_overwrite_table) with specified
action.
*/
public void startCommitWithTime(String instantTime, String actionType) {
HoodieTableMetaClient metaClient = createMetaClient(true);
- startCommitWithTime(instantTime, actionType, metaClient);
+ startCommitWithTime(Option.of(instantTime), actionType, metaClient);
}
/**
* Starts a new commit time for a write operation (insert/update/delete)
with specified action.
*/
- private void startCommitWithTime(String instantTime, String actionType,
HoodieTableMetaClient metaClient) {
+ private String startCommitWithTime(Option<String> providedInstantTime,
String actionType, HoodieTableMetaClient metaClient) {
if (needsUpgrade(metaClient)) {
// unclear what instant to use, since upgrade does have a given instant.
executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient,
Option.empty()));
}
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.COMMIT_ACTION, () ->
tableServiceClient.rollbackFailedWrites(metaClient));
- LOG.info("Generate a new instant time: {} action: {}", instantTime,
actionType);
- // check there are no inflight restore before starting a new commit.
- HoodieTimeline inflightRestoreTimeline =
metaClient.getActiveTimeline().getRestoreTimeline().filterInflightsAndRequested();
- ValidationUtils.checkArgument(inflightRestoreTimeline.countInstants() == 0,
- () -> "Found pending restore in active timeline. Please complete the
restore fully before proceeding. As of now, "
- + "table could be in an inconsistent state. Pending restores: "
- +
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream().map(HoodieInstant::requestedTime).toArray()));
-
- if (config.getFailedWritesCleanPolicy().isLazy()) {
- this.heartbeatClient.start(instantTime);
- }
+ txnManager.beginTransaction(Option.empty(), Option.empty());
+ String instantTime;
+ try {
+ instantTime = providedInstantTime.orElseGet(() ->
createNewInstantTime(false));
+ LOG.info("Generate a new instant time: {} action: {}", instantTime,
actionType);
+ // check there are no inflight restore before starting a new commit.
+ HoodieTimeline inflightRestoreTimeline =
metaClient.reloadActiveTimeline().getRestoreTimeline().filterInflightsAndRequested();
Review Comment:
This timeline reloading is part of the locking now. Should the transaction
manager only guard `instantTime = providedInstantTime.orElseGet(() ->
createNewInstantTime(false));`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -940,46 +954,53 @@ public String startCommit(String actionType,
HoodieTableMetaClient metaClient) {
*/
public void startCommitWithTime(String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
- startCommitWithTime(instantTime, metaClient.getCommitActionType(),
metaClient);
+ startCommitWithTime(Option.of(instantTime),
metaClient.getCommitActionType(), metaClient);
}
/**
* Completes a new commit time for a write operation
(insert/update/delete/insert_overwrite/insert_overwrite_table) with specified
action.
*/
public void startCommitWithTime(String instantTime, String actionType) {
HoodieTableMetaClient metaClient = createMetaClient(true);
- startCommitWithTime(instantTime, actionType, metaClient);
+ startCommitWithTime(Option.of(instantTime), actionType, metaClient);
}
/**
* Starts a new commit time for a write operation (insert/update/delete)
with specified action.
*/
- private void startCommitWithTime(String instantTime, String actionType,
HoodieTableMetaClient metaClient) {
+ private String startCommitWithTime(Option<String> providedInstantTime,
String actionType, HoodieTableMetaClient metaClient) {
if (needsUpgrade(metaClient)) {
// unclear what instant to use, since upgrade does have a given instant.
executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient,
Option.empty()));
}
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.COMMIT_ACTION, () ->
tableServiceClient.rollbackFailedWrites(metaClient));
- LOG.info("Generate a new instant time: {} action: {}", instantTime,
actionType);
- // check there are no inflight restore before starting a new commit.
- HoodieTimeline inflightRestoreTimeline =
metaClient.getActiveTimeline().getRestoreTimeline().filterInflightsAndRequested();
- ValidationUtils.checkArgument(inflightRestoreTimeline.countInstants() == 0,
- () -> "Found pending restore in active timeline. Please complete the
restore fully before proceeding. As of now, "
- + "table could be in an inconsistent state. Pending restores: "
- +
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream().map(HoodieInstant::requestedTime).toArray()));
-
- if (config.getFailedWritesCleanPolicy().isLazy()) {
- this.heartbeatClient.start(instantTime);
- }
+ txnManager.beginTransaction(Option.empty(), Option.empty());
+ String instantTime;
+ try {
+ instantTime = providedInstantTime.orElseGet(() ->
createNewInstantTime(false));
+ LOG.info("Generate a new instant time: {} action: {}", instantTime,
actionType);
+ // check there are no inflight restore before starting a new commit.
+ HoodieTimeline inflightRestoreTimeline =
metaClient.reloadActiveTimeline().getRestoreTimeline().filterInflightsAndRequested();
+ ValidationUtils.checkArgument(inflightRestoreTimeline.countInstants() ==
0,
+ () -> "Found pending restore in active timeline. Please complete the
restore fully before proceeding. As of now, "
+ + "table could be in an inconsistent state. Pending restores: "
+ +
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream().map(HoodieInstant::requestedTime).toArray()));
+
+ if (config.getFailedWritesCleanPolicy().isLazy()) {
+ this.heartbeatClient.start(instantTime);
+ }
- if (ClusteringUtils.isClusteringOrReplaceCommitAction(actionType)) {
-
metaClient.getActiveTimeline().createRequestedCommitWithReplaceMetadata(instantTime,
actionType);
- } else {
-
metaClient.getActiveTimeline().createNewInstant(metaClient.createNewInstant(HoodieInstant.State.REQUESTED,
actionType,
- instantTime));
+ if (ClusteringUtils.isClusteringOrReplaceCommitAction(actionType)) {
+
metaClient.getActiveTimeline().createRequestedCommitWithReplaceMetadata(instantTime,
actionType);
+ } else {
+
metaClient.getActiveTimeline().createNewInstant(metaClient.createNewInstant(HoodieInstant.State.REQUESTED,
actionType, instantTime));
Review Comment:
Previous assumption is that it is OK for `t1.request` to be created on
storage after `t2.request` is created on storage, so there is no need for
locking here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]