This is an automated email from the ASF dual-hosted git repository. feiwang pushed a commit to branch branch-1.9 in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.9 by this push: new 8ba7781150 [KYUUBI #6626] Fix operation never expired issue with periodical listOperations api calls 8ba7781150 is described below commit 8ba778115045db8ddada792e4cc3a74a579d8cec Author: Wang, Fei <fwan...@ebay.com> AuthorDate: Sun Aug 18 22:02:25 2024 -0700 [KYUUBI #6626] Fix operation never expired issue with periodical listOperations api calls # :mag: Description ## Issue References ๐ This pull request fixes operation never expired issue. I found that, some Kyuubi operations never expired. After investigation, the root cause: 1. our kyuubi console lookup the operations periodically 2. it retrieves the kyuubi operation events 3. the KyuubiOperationEvent::apply will get the operation status and update the operation the last access time, https://github.com/apache/kyuubi/pull/2452 4. then the operation never expired. ## Describe Your Solution ๐ง Just get the operation event without updating the operation last access time. ## Types of changes :bookmark: - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐งช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐ - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6626 from turboFei/operation_not_idle. Closes #6626 14e6898da [Wang, Fei] Get operation event Authored-by: Wang, Fei <fwan...@ebay.com> Signed-off-by: Wang, Fei <fwan...@ebay.com> (cherry picked from commit 6e5162e8a7a7c3d890aa2020b17306bba507f419) Signed-off-by: Wang, Fei <fwan...@ebay.com> --- .../kyuubi/events/KyuubiOperationEvent.scala | 31 +--------------------- .../apache/kyuubi/operation/KyuubiOperation.scala | 24 +++++++++++++++-- .../org/apache/kyuubi/server/api/ApiUtils.scala | 5 ++-- 3 files changed, 25 insertions(+), 35 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala index 2a103213e5..f67b088a50 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala @@ -18,8 +18,6 @@ package org.apache.kyuubi.events import org.apache.kyuubi.Utils -import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle} -import org.apache.kyuubi.session.KyuubiSession /** * A [[KyuubiOperationEvent]] used to tracker the lifecycle of an operation at server side. @@ -45,7 +43,7 @@ import org.apache.kyuubi.session.KyuubiSession * @param kyuubiInstance the parent session connection url * @param metrics the operation metrics */ -case class KyuubiOperationEvent private ( +case class KyuubiOperationEvent( statementId: String, remoteId: String, statement: String, @@ -67,30 +65,3 @@ case class KyuubiOperationEvent private ( override def partitions: Seq[(String, String)] = ("day", Utils.getDateFromTimestamp(createTime)) :: Nil } - -object KyuubiOperationEvent { - - /** - * Shorthand for instantiating a operation event with a [[KyuubiOperation]] instance - */ - def apply(operation: KyuubiOperation): KyuubiOperationEvent = { - val session = operation.getSession.asInstanceOf[KyuubiSession] - val status = operation.getStatus - new KyuubiOperationEvent( - operation.statementId, - Option(operation.remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull, - operation.statement, - operation.shouldRunAsync, - status.state.name(), - status.lastModified, - status.create, - status.start, - status.completed, - status.exception, - session.handle.identifier.toString, - session.user, - session.sessionType.toString, - session.connectionUrl, - operation.metrics) - } -} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala index d0289abb99..a543bddb6c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala @@ -206,7 +206,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi protected def eventEnabled: Boolean = false - if (eventEnabled) EventBus.post(KyuubiOperationEvent(this)) + if (eventEnabled) EventBus.post(getOperationEvent) override def setState(newState: OperationState): Unit = { MetricsSystem.tracing { ms => @@ -217,6 +217,26 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi ms.markMeter(MetricRegistry.name(OPERATION_STATE, newState.toString.toLowerCase)) } super.setState(newState) - if (eventEnabled) EventBus.post(KyuubiOperationEvent(this)) + if (eventEnabled) EventBus.post(getOperationEvent) + } + + def getOperationEvent: KyuubiOperationEvent = { + val kyuubiSession = session.asInstanceOf[KyuubiSession] + KyuubiOperationEvent( + statementId, + Option(remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull, + statement, + shouldRunAsync, + state.name(), + lastAccessTime, + createTime, + startTime, + completedTime, + Option(operationException), + kyuubiSession.handle.identifier.toString, + kyuubiSession.user, + kyuubiSession.sessionType.toString, + kyuubiSession.connectionUrl, + metrics) } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala index 4944216087..50900627c0 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala @@ -22,7 +22,6 @@ import scala.collection.JavaConverters._ import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.client.api.v1.dto import org.apache.kyuubi.client.api.v1.dto.{OperationData, OperationProgress, ServerData, SessionData} -import org.apache.kyuubi.events.KyuubiOperationEvent import org.apache.kyuubi.ha.client.ServiceNodeInfo import org.apache.kyuubi.operation.KyuubiOperation import org.apache.kyuubi.session.KyuubiSession @@ -80,7 +79,7 @@ object ApiUtils extends Logging { } def operationEvent(operation: KyuubiOperation): dto.KyuubiOperationEvent = { - val opEvent = KyuubiOperationEvent(operation) + val opEvent = operation.getOperationEvent dto.KyuubiOperationEvent.builder() .statementId(opEvent.statementId) .remoteId(opEvent.remoteId) @@ -102,7 +101,7 @@ object ApiUtils extends Logging { } def operationData(operation: KyuubiOperation): OperationData = { - val opEvent = KyuubiOperationEvent(operation) + val opEvent = operation.getOperationEvent new OperationData( opEvent.statementId, opEvent.remoteId,