Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
kbuci commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2062622033 > @kbuci Please include heartbeat for clustering commit as well. Also, treat clustering and logcompaction as removable plans so rollback for them can happen in the ingestion itself. Considering that how do we create heart beats? @suryaprasanna Do you recall why logcompaction execution/rollback is different than compaction, in the sense that unlike compaction execution - log compaction won't retry a failed/inflight log compact plan and will instead completely roll it back - Lazy clean rollback of failed writes is allowed to rollback log compact instants I assume this is because we want to avoid a "stuck" log compact plan for preventing compaction from being scheduled, but just wanted to confirm, since @nsivabalan had the same question as well. The reason I ask is that (as per my understanding) this behavior of log compact will make it more tricky for us to schedule log compact plans and reliably defer execution to an async job. This is since if clean's failedWritesRollback can rollback log compact instants, then it can rollback a log compact plan (.requested file) before it has the chance to be "picked up" and executed by an async job. We can handle this situation by adding heartbeating (the same way as compact) and updating failedWritesRollback to only try rolling back a log compact instant if it is inflight (and ignore it if it's just in .requested state), though that still has the following consequence we should keep in mind: - If the async job is disabled or delayed (due to a configuration or orchestration issue), the log compact plan (.requested file) will remain in the timeline @suryaprasanna @nsivabalan @danny0405 (tagging all commenters) I was wondering if you had any opinions/suggestions on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
kbuci commented on code in PR #10965: URL: https://github.com/apache/hudi/pull/10965#discussion_r1568070078 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -1135,8 +1138,36 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); +Option instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() +.filterCompletedAndCompactionInstants() +.getInstants() +.stream() +.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) +.findFirst()); +try { + // Transaction serves to ensure only one compact job for this instant will start heartbeat, and any other concurrent + // compact job will abort if they attempt to execute compact before heartbeat expires + // Note that as long as all jobs for this table use this API for compact, then this alone should prevent + // compact rollbacks from running concurrently to compact commits. + txnManager.beginTransaction(instantToCompactOption, txnManager.getLastCompletedTransactionOwner()); Review Comment: > Even on REQUESTED: since we are taking a lock and checking for heart beat client, wouldn't that ensure only one writer can proceed and the other writer will fail. > why would (3) not see heartbeat? we are taking a block and start emitting heart beats right. Oh in step (3) the job (B) will see a heartbeat. Just to clarify the scenario here only applies if we were to have jobs ignore heartbeat check if the compact plan is not inflight yet (in case that might have been the source of confusion here). If you meant why Job (C) in (7) doesn't see heartbeat, then the reason for that is that Job (A) created a heartbeat but Job (B) did not, so once (A) stops then the heartbeat is expired and (C) has no way to know that there is a job (B) working on this plan. At first glance it might seem like having Job (B) also emit a heartbeat (during Step (2)) would avoid this issue, but that would cause another issue: when Job (A) terminates it would clean up the heartbeat file. So since there is no semaphore-like or shared-exclusive lock-like functionality in heartbeating I'm not sure how or if we can have concurrent compact jobs share or ignore a heartbeat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
kbuci commented on code in PR #10965: URL: https://github.com/apache/hudi/pull/10965#discussion_r1556157237 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -1135,8 +1138,36 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); +Option instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() +.filterCompletedAndCompactionInstants() +.getInstants() +.stream() +.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) +.findFirst()); +try { + // Transaction serves to ensure only one compact job for this instant will start heartbeat, and any other concurrent + // compact job will abort if they attempt to execute compact before heartbeat expires + // Note that as long as all jobs for this table use this API for compact, then this alone should prevent + // compact rollbacks from running concurrently to compact commits. + txnManager.beginTransaction(instantToCompactOption, txnManager.getLastCompletedTransactionOwner()); Review Comment: > 1.2) if the heartbeat does not expire, just can the execution of this run and log a wanning log there. Just to clarify, do you mean throwing an exception in this run? Since I'm not sure if we can make the current run a no-op if a concurrent heartbeat is detected, since a) I'm not sure what HoodieWriteMetadata value to return if we make this a no-op b) If we don't explicitly throw an exception and fail, then the caller will assume compaction happened succesfully or already happened. This wouldn't be a correct assumption, since the other concurrent writer (that is currently executing this compact plan) may either fail or take a long time to finish. > if the state if still REQUESTED, we can execute it direcly? Ah that's a good point, it might actually be safe for two jobs to execute a compact plan at same time as long as neither of them are doing a roll back. Despite that though, I don't think it's safe to skip acquiring+starting heartbeat even if the compact plan only has .requested , since the following (unlikely) scenario can happen still: 1. Table has a compact plan T.requested created in timeline 2. Job (A) calls compact on T. It starts a heartbeat of T and then starts executing T 3. Job (B) calls compact on T. Although it sees a heartbeat for T, since T has no T.inflight it starts executing T 4. Job (A) and/or Job (B) create a C.inflight 5. Job (A) fails. 6. Heartbeat that Job (A) created expires 7. Job (C) calls compact on T. It sees that there is a T.inflight and no heartbeat (because Job (B) did not start any heartbeat). Therefore, it starts executing T, and rolls back the existing C.inflight Edit: I renamed compact plan in above example from C -> T to make it easier to read. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
nsivabalan commented on code in PR #10965: URL: https://github.com/apache/hudi/pull/10965#discussion_r1566489716 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -1135,8 +1138,36 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); +Option instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() +.filterCompletedAndCompactionInstants() +.getInstants() +.stream() +.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) +.findFirst()); +try { + // Transaction serves to ensure only one compact job for this instant will start heartbeat, and any other concurrent + // compact job will abort if they attempt to execute compact before heartbeat expires + // Note that as long as all jobs for this table use this API for compact, then this alone should prevent + // compact rollbacks from running concurrently to compact commits. + txnManager.beginTransaction(instantToCompactOption, txnManager.getLastCompletedTransactionOwner()); Review Comment: 1. yeah. After reading Kishan's response, i feel we should fail the execution if compaction is being currently attempted by another concurrent writer. 2. Even on REQUESTED: since we are taking a lock and checking for heart beat client, wouldn't that ensure only one writer can proceed and the other writer will fail. ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -1135,8 +1138,34 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); +Option instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() +.filterCompletedAndCompactionInstants() +.getInstants() +.stream() +.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) +.findFirst()); +try { + // Transaction serves to ensure only one compact job for this instant will start heartbeat, and any other concurrent + // compact job will abort if they attempt to execute compact before heartbeat expires + // Note that as long as all jobs for this table use this API for compact, then this alone should prevent + // compact rollbacks from running concurrently to compact commits. + txnManager.beginTransaction(instantToCompactOption, txnManager.getLastCompletedTransactionOwner()); + try { +if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) { + throw new HoodieLockException("Cannot compact instant " + compactionInstantTime + " due to heartbeat by existing job"); +} + } catch (IOException e) { +throw new HoodieHeartbeatException("Error accessing heartbeat of instant to compact " + compactionInstantTime, e); + } + this.heartbeatClient.start(compactionInstantTime); +} finally { + txnManager.endTransaction(txnManager.getCurrentTransactionOwner()); +} preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient()); -return tableServiceClient.compact(compactionInstantTime, shouldComplete); +HoodieWriteMetadata compactMetadata = tableServiceClient.compact(compactionInstantTime, shouldComplete); +this.heartbeatClient.stop(compactionInstantTime, true); Review Comment: yeah. I see your point. probably every caller when calling stop, should remove it from map. looks like we never remove any entry from instantToHeartbeatMap as per master (except shutting down entire HeartbeatClient). @n3nash : any pointers on this regard? ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -1135,8 +1137,34 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); +Option instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() +.filterCompletedAndCompactionInstants() +.getInstants() +.stream() +.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) +.findFirst()); +try { + // Transaction serves to ensure only one compact job for this instant will start heartbeat, and any other
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
danny0405 commented on code in PR #10965: URL: https://github.com/apache/hudi/pull/10965#discussion_r1556682595 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -1135,8 +1137,34 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); +Option instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() +.filterCompletedAndCompactionInstants() +.getInstants() +.stream() +.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) Review Comment: we should only care about pending instant right? If the compaction already completed, just skip this run. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
danny0405 commented on code in PR #10965: URL: https://github.com/apache/hudi/pull/10965#discussion_r1556682151 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -1135,8 +1138,36 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); +Option instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() +.filterCompletedAndCompactionInstants() +.getInstants() +.stream() +.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) +.findFirst()); +try { + // Transaction serves to ensure only one compact job for this instant will start heartbeat, and any other concurrent + // compact job will abort if they attempt to execute compact before heartbeat expires + // Note that as long as all jobs for this table use this API for compact, then this alone should prevent + // compact rollbacks from running concurrently to compact commits. + txnManager.beginTransaction(instantToCompactOption, txnManager.getLastCompletedTransactionOwner()); Review Comment: yeah, even if the state is requested, we should check the heartbeat liveness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
danny0405 commented on code in PR #10965: URL: https://github.com/apache/hudi/pull/10965#discussion_r1554475930 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -1135,8 +1138,36 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); +Option instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() +.filterCompletedAndCompactionInstants() +.getInstants() +.stream() +.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) +.findFirst()); +try { + // Transaction serves to ensure only one compact job for this instant will start heartbeat, and any other concurrent + // compact job will abort if they attempt to execute compact before heartbeat expires + // Note that as long as all jobs for this table use this API for compact, then this alone should prevent + // compact rollbacks from running concurrently to compact commits. + txnManager.beginTransaction(instantToCompactOption, txnManager.getLastCompletedTransactionOwner()); Review Comment: When a conflict for the same compaction instant execution is detected, we can: 1. check the state of the instant, if it is in `INFLIGHT` state and 1.1) the heartbeat expires, we can just rollback the last execution and reattempt in this run; 1.2) if the heartbeat does not expire, just take the execution of this run and log a wanning log there. 2. if the state if still `REQUESTED`, we can execute it direcly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2043939357 ## CI report: * e1a6e4a24083dd8871a2fc3fbb289e1a6192593a Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23154) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2043855651 ## CI report: * c41af6435281865147967768419da5e4fb688f8b Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23153) * e1a6e4a24083dd8871a2fc3fbb289e1a6192593a Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23154) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2043839847 ## CI report: * c41af6435281865147967768419da5e4fb688f8b Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23153) * e1a6e4a24083dd8871a2fc3fbb289e1a6192593a UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2043740065 ## CI report: * c41af6435281865147967768419da5e4fb688f8b Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23153) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2043606333 ## CI report: * c8e268903a19c7ecc5cd927fd8afa3332a1c3aea Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23133) * c41af6435281865147967768419da5e4fb688f8b Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23153) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
kbuci commented on code in PR #10965: URL: https://github.com/apache/hudi/pull/10965#discussion_r1556395911 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -1135,8 +1138,34 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); +Option instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() +.filterCompletedAndCompactionInstants() +.getInstants() +.stream() +.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) +.findFirst()); +try { + // Transaction serves to ensure only one compact job for this instant will start heartbeat, and any other concurrent + // compact job will abort if they attempt to execute compact before heartbeat expires + // Note that as long as all jobs for this table use this API for compact, then this alone should prevent + // compact rollbacks from running concurrently to compact commits. + txnManager.beginTransaction(instantToCompactOption, txnManager.getLastCompletedTransactionOwner()); + try { +if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) { + throw new HoodieLockException("Cannot compact instant " + compactionInstantTime + " due to heartbeat by existing job"); +} + } catch (IOException e) { +throw new HoodieHeartbeatException("Error accessing heartbeat of instant to compact " + compactionInstantTime, e); + } + this.heartbeatClient.start(compactionInstantTime); +} finally { + txnManager.endTransaction(txnManager.getCurrentTransactionOwner()); +} preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient()); -return tableServiceClient.compact(compactionInstantTime, shouldComplete); +HoodieWriteMetadata compactMetadata = tableServiceClient.compact(compactionInstantTime, shouldComplete); +this.heartbeatClient.stop(compactionInstantTime, true); Review Comment: I was looking into a UT failure in `org.apache.hudi.table.functional.TestHoodieSparkMergeOnReadTableInsertUpdateDelete#testRepeatedRollbackOfCompaction` where two compact executions of the same instant time are called back to back (my understanding is that this is supposed to verify that the second compact does a no-op and succeeds upon seeing that plan is already committed). I realized that with this change, the second compact call was failing due to calling `isHeartbeatExpired` and seeing an active heartbeat (from the first attempt) still running, despite the fact that here we are stopping the heartbeat after a successfully completing the compact. The reason that `isHeartbeatExpired` was unexpectedly `false` here is that 1. `isHeartbeatExpired` will return false if instant time is too recent, even if the heartbeat has been stopped (in the in-memory mapping) 2. When `org.apache.hudi.client.heartbeat.HoodieHeartbeatClient#stop(java.lang.String)` is called (by the first compact call in UT) the heartbeat file is deleted and the heartbeat in in-memory mapping is stopped (as expected). But this means that the heartbeat cannot be started again (even if (1) is resolved), since heartbeat API doesn't allow caller to start a heartbeat that is present in in-memory mapping and has heartbeatStopped flag set to true. In order to get around this issue, I added another API in heartbeat API similar to stop, except that it removes the desired heartbeat from the in-memory mapping (forcing any future compact call in the same job to re-read the heartbeat files from DFS and create a new heartbeat in the in-memory mapping ). Though not sure if there might be a better approach here. I assume this existing functionality isn't a bug, as it makes sense for commits that cannot be repeatedly re-executed (like ingestion COMMITs). I wonder if the reason that this is causing an issue here stems from the fact that for compact we need to potentially repeatedly restart stopped heartbeats, and the hearttbeat API might not have been intended for this use case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2043593538 ## CI report: * c8e268903a19c7ecc5cd927fd8afa3332a1c3aea Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23133) * c41af6435281865147967768419da5e4fb688f8b UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
kbuci commented on code in PR #10965: URL: https://github.com/apache/hudi/pull/10965#discussion_r1556395911 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -1135,8 +1138,34 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); +Option instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() +.filterCompletedAndCompactionInstants() +.getInstants() +.stream() +.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) +.findFirst()); +try { + // Transaction serves to ensure only one compact job for this instant will start heartbeat, and any other concurrent + // compact job will abort if they attempt to execute compact before heartbeat expires + // Note that as long as all jobs for this table use this API for compact, then this alone should prevent + // compact rollbacks from running concurrently to compact commits. + txnManager.beginTransaction(instantToCompactOption, txnManager.getLastCompletedTransactionOwner()); + try { +if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) { + throw new HoodieLockException("Cannot compact instant " + compactionInstantTime + " due to heartbeat by existing job"); +} + } catch (IOException e) { +throw new HoodieHeartbeatException("Error accessing heartbeat of instant to compact " + compactionInstantTime, e); + } + this.heartbeatClient.start(compactionInstantTime); +} finally { + txnManager.endTransaction(txnManager.getCurrentTransactionOwner()); +} preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient()); -return tableServiceClient.compact(compactionInstantTime, shouldComplete); +HoodieWriteMetadata compactMetadata = tableServiceClient.compact(compactionInstantTime, shouldComplete); +this.heartbeatClient.stop(compactionInstantTime, true); Review Comment: I was looking into a UT failure in `org.apache.hudi.table.functional.TestHoodieSparkMergeOnReadTableInsertUpdateDelete#testRepeatedRollbackOfCompaction` where two compact executions of the same instant time are called back to back (my understanding is that this is supposed to verify that the second compact does a no-op and succeeds upon seeing that plan is already committed). I realized that with this change, the second compact call was failing due to calling `isHeartbeatExpired` and seeing an active heartbeat (from the first attempt) still running, despite the fact that here we are stopping the heartbeat after a successfully completing the compact. The reason that `isHeartbeatExpired` was unexpectedly `false` here is that 1. `isHeartbeatExpired` will return false if instant time is too recent, even if the heartbeat has been stopped (in the in-memory mapping) 2. When `org.apache.hudi.client.heartbeat.HoodieHeartbeatClient#stop(java.lang.String)` is called (by the first compact call in UT) the heartbeat file is deleted and the heartbeat in in-memory mapping is stopped (as expected). But this means that the heartbeat cannot be started again (even if (1) is resolved), since heartbeat API doesn't allow caller to start a heartbeat that is present in in-memory mapping and has heartbeatStopped flag set to true. In order to get around this issue, I added another API in heartbeat API similar to stop, except that it removes the desired heartbeat from the in-memory mapping (forcing any future compact call in the same job to re-read the heartbeat files from DFS and create a new heartbeat in the in-memory mapping ). Though not sure if there might be a better approach here. I assume this existing functionality isn't a bug, as it makes sense for commits that cannot be repeatedly re-executed (like ingestion COMMITs), and I assume the issue here stems from the fact that for compact we need to potentially repeatedly restart stopped heartbeats -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
kbuci commented on code in PR #10965: URL: https://github.com/apache/hudi/pull/10965#discussion_r1556157237 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -1135,8 +1138,36 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); +Option instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() +.filterCompletedAndCompactionInstants() +.getInstants() +.stream() +.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) +.findFirst()); +try { + // Transaction serves to ensure only one compact job for this instant will start heartbeat, and any other concurrent + // compact job will abort if they attempt to execute compact before heartbeat expires + // Note that as long as all jobs for this table use this API for compact, then this alone should prevent + // compact rollbacks from running concurrently to compact commits. + txnManager.beginTransaction(instantToCompactOption, txnManager.getLastCompletedTransactionOwner()); Review Comment: > 1.2) if the heartbeat does not expire, just can the execution of this run and log a wanning log there. Just to clarify, do you mean throwing an exception in this run? Since I'm not sure if we can make the current run a no-op if a concurrent heartbeat is detected, since a) I'm not sure what HoodieWriteMetadata value to return if we make this a no-op b) If we don't explicitly throw an exception and fail, then the caller will assume compaction happened succesfully or already happened. This wouldn't be a correct assumption, since the other concurrent writer (that is currently executing this compact plan) may either fail or take a long time to finish. > if the state if still REQUESTED, we can execute it direcly? Ah that's a good point, it might actually be safe for two jobs to execute a compact plan at same time as long as neither of them are doing a roll back. Despite that though, I don't think it's safe to skip acquiring+starting heartbeat even if the compact plan only has .requested , since the following (unlikely) scenario can happen still: 1. Table has a compact plan C.requested created in timeline 2. Job (A) calls compact on C. It starts a heartbeat of C and then starts executing C 3. Job (B) calls compact on C. Although it sees a heartbeat for C, since C has no C.inflight it starts executing C 4. Job (A) and/or Job (B) create a C.inflight 5. Job (A) fails. 6. Heartbeat that Job (A) created expires 7. Job (C) calls compact on C. It sees that there is a C.inflight and no heartbeat (because Job (B) did not start any heartbeat). Therefore, it starts executing C, and rolls back the existing C.inflight ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -1135,8 +1138,36 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); +Option instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() +.filterCompletedAndCompactionInstants() +.getInstants() +.stream() +.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) +.findFirst()); +try { + // Transaction serves to ensure only one compact job for this instant will start heartbeat, and any other concurrent + // compact job will abort if they attempt to execute compact before heartbeat expires + // Note that as long as all jobs for this table use this API for compact, then this alone should prevent + // compact rollbacks from running concurrently to compact commits. + txnManager.beginTransaction(instantToCompactOption, txnManager.getLastCompletedTransactionOwner()); Review Comment: > 1.2) if the heartbeat does not expire, just can the execution of this run and log a wanning log there. Just to clarify, do you mean throwing an exception in this run? Since I'm not sure if we can make the current run a no-op if a concurrent heartbeat is detected, since a) I'm not sure what HoodieWriteMetadata value to return if we make this a no-op b) If we don't explicitly throw an exception and fail, then the caller will assume compaction happened succesfully or already happened. This wouldn't be a correct assumption, since the other concurrent writer (that is currently executing this compact plan) may either fail or take a long time
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2040959029 ## CI report: * c8e268903a19c7ecc5cd927fd8afa3332a1c3aea Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23133) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2040940759 ## CI report: * 54e443683a42e1a59f533a50f98abb4dc0583ea7 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23132) * c8e268903a19c7ecc5cd927fd8afa3332a1c3aea Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23133) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2040929959 ## CI report: * 54e443683a42e1a59f533a50f98abb4dc0583ea7 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23132) * c8e268903a19c7ecc5cd927fd8afa3332a1c3aea UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
danny0405 commented on code in PR #10965: URL: https://github.com/apache/hudi/pull/10965#discussion_r1554475930 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -1135,8 +1138,36 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); +Option instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() +.filterCompletedAndCompactionInstants() +.getInstants() +.stream() +.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) +.findFirst()); +try { + // Transaction serves to ensure only one compact job for this instant will start heartbeat, and any other concurrent + // compact job will abort if they attempt to execute compact before heartbeat expires + // Note that as long as all jobs for this table use this API for compact, then this alone should prevent + // compact rollbacks from running concurrently to compact commits. + txnManager.beginTransaction(instantToCompactOption, txnManager.getLastCompletedTransactionOwner()); Review Comment: When a conflict for the same compaction instant execution is detected, we can: 1. check the state of the instant, if it is in `INFLIGHT` state and 1.1) the heartbeat expires, we can just rollback the last execution and reattempt in this run; 1.2) if the heartbeat does not expire, just can the execution of this run and log a wanning log there. 2. if the state if still `REQUESTED`, we can execute it direcly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2040845295 ## CI report: * 54e443683a42e1a59f533a50f98abb4dc0583ea7 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23132) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2040842997 ## CI report: * ddd6cf2655e073aacd3b48de126781abacd7e111 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23127) * 54e443683a42e1a59f533a50f98abb4dc0583ea7 UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2040635432 ## CI report: * ddd6cf2655e073aacd3b48de126781abacd7e111 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23127) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2040573327 ## CI report: * ddd6cf2655e073aacd3b48de126781abacd7e111 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23127) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
hudi-bot commented on PR #10965: URL: https://github.com/apache/hudi/pull/10965#issuecomment-2040564503 ## CI report: * ddd6cf2655e073aacd3b48de126781abacd7e111 UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [HUDI-7503] Compaction and LogCompaction executions should start a heartbeat on every attempt and block concurrent executions of same plan [hudi]
kbuci opened a new pull request, #10965: URL: https://github.com/apache/hudi/pull/10965 ### Change Logs Updated compact and logcompact to start a heartbeat (within a transaction) before attempting to execute a plan. If multiple writers attempt to execute same compact/logcompact plan at same time, only one of them will process and the rest will fail with an exception (upon seeing a heartbeat has already been started) and will abort. ### Impact Without this change, if multiple jobs are launched at the same that target executing the same compact/logcompact plan (due to a non-HUDI related user-side configuration/orchestration issue) then one job can execute the compact/logcompact plan and create an inflight/commit instant file while the other jobs can roll it back (and delete inflight instant files or data files). This can lead to timeline being in a corrupted state. ### Risk level (write none, low medium or high below) _If medium or high, explain what verification was done to mitigate the risks._ ### Documentation Update _Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none"._ - _The config description must be updated if new configs are added or the default value of the configs are changed_ - _Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the ticket number here and follow the [instruction](https://hudi.apache.org/contribute/developer-setup#website) to make changes to the website._ ### Contributor's checklist - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [ ] Change Logs and Impact were stated clearly - [ ] Adequate tests were added if applicable - [ ] CI passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org