This is an automated email from the ASF dual-hosted git repository. bowenliang pushed a commit to branch branch-1.8 in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push: new 0d7329850 [KYUUBI #5853] Ensure closing the operation log for batch submission in fast-failed case 0d7329850 is described below commit 0d73298508f03c5f5278aed1ae6901b57ac2fdb7 Author: Bowen Liang <liangbo...@gf.com.cn> AuthorDate: Thu Dec 14 20:24:48 2023 +0800 [KYUUBI #5853] Ensure closing the operation log for batch submission in fast-failed case # :mag: Description ## Issue References ๐ This pull request fixes comment in PR [#5733](https://github.com/apache/kyuubi/pull/5733#issuecomment-1855055891) ## Describe Your Solution ๐ง To ensure the operation log closed even in fast-failed case. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐งช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklists ## ๐ Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [x] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## ๐ Committer Pre-Merge Checklist - [ ] Pull request title is okay. - [ ] No license issues. - [ ] Milestone correctly set? - [ ] Test coverage is ok - [ ] Assignees are selected. - [ ] Minimum number of approvals - [ ] No changes are requested **Be nice. Be informative.** Closes #5853 from bowenliang123/close-oplog. Closes #5853 34192fb6b [Bowen Liang] import 4fe004613 [Bowen Liang] withClosingOperationLog Authored-by: Bowen Liang <liangbo...@gf.com.cn> Signed-off-by: liangbowen <liangbo...@gf.com.cn> (cherry picked from commit a0fdead6d257be54127d84f526918e4fd52a18ce) Signed-off-by: liangbowen <liangbo...@gf.com.cn> --- .../org/apache/kyuubi/operation/AbstractOperation.scala | 16 ++++++++++++++++ .../org/apache/kyuubi/operation/BatchJobSubmission.scala | 11 ++--------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index 0a185b942..fe33e3036 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.operation +import java.io.IOException import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} import java.util.concurrent.locks.ReentrantLock @@ -247,4 +248,19 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin ok.setInfoMessages(hints.asJava) ok } + + /** + * Close the OperationLog, after running the block + */ + def withClosingOperationLog[T](f: => T): T = { + try { + f + } finally { + try { + getOperationLog.foreach(_.close()) + } catch { + case e: IOException => error(e.getMessage, e) + } + } + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index ba11dcb27..3bb17e1aa 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -17,7 +17,6 @@ package org.apache.kyuubi.operation -import java.io.IOException import java.nio.file.{Files, Paths} import java.util.Locale import java.util.concurrent.TimeUnit @@ -336,7 +335,7 @@ class BatchJobSubmission( } } - override def close(): Unit = withLockRequired { + override def close(): Unit = withLockRequired(withClosingOperationLog { if (!isClosedOrCanceled) { MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType))) @@ -373,13 +372,7 @@ class BatchJobSubmission( } } } - - try { - getOperationLog.foreach(_.close()) - } catch { - case e: IOException => error(e.getMessage, e) - } - } + }) override def cancel(): Unit = { throw new IllegalStateException("Use close instead.")