This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 7605900db4 [KYUUBI #7176] Cleanup metadata with batch size and interval
7605900db4 is described below
commit 7605900db40e343569997f232af81dafc30ad79f
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Sep 15 10:10:49 2025 -0700
[KYUUBI #7176] Cleanup metadata with batch size and interval
### Why are the changes needed?
Cleanup metadata with batch size and interval to prevent hold the lock for
long time if the metadata size is huge.
### How was this patch tested?
GA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #7176 from turboFei/clean_batch.
Closes #7176
b4d6a264c [Wang, Fei] fix ut
61786af3b [Wang, Fei] Merge branch 'master' into clean_batch
11170756f [Cheng Pan] Update
kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
a5ffc9da9 [Wang, Fei] log for max loops
b301b527b [Wang, Fei] remove unused method
d172a61da [Wang, Fei] address all comments
e4da02369 [Wang, Fei] Save
c94d25451 [Wang, Fei] saev
852e624c8 [Wang, Fei] save
Lead-authored-by: Wang, Fei <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
docs/configuration/settings.md | 1 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 18 ++++++++++
.../kyuubi/server/metadata/MetadataManager.scala | 40 ++++++++++++++++++++--
.../kyuubi/server/metadata/MetadataStore.scala | 6 ++--
.../server/metadata/jdbc/JDBCMetadataStore.scala | 17 +++++----
.../server/metadata/jdbc/JdbcDatabaseDialect.scala | 14 +++++++-
.../server/metadata/MetadataManagerSuite.scala | 18 +++++++++-
.../metadata/jdbc/JDBCMetadataStoreSuite.scala | 4 +--
8 files changed, 104 insertions(+), 14 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 83485a5217..ab1966cde6 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -382,6 +382,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| Key |
Default |
Meaning
[...]
|-------------------------------------------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| kyuubi.metadata.cleaner.batch.size | 1000
| The batch size for cleaning expired metadata.
This is used to avoid holding the delete lock for a long time when there are
too many expired metadata to be cleaned.
[...]
| kyuubi.metadata.cleaner.enabled | true
| Whether to clean the metadata periodically. If
it is enabled, Kyuubi will clean the metadata that is in the terminate state
with max age limitation.
[...]
| kyuubi.metadata.cleaner.interval | PT30M
| The interval to check and clean expired
metadata.
[...]
| kyuubi.metadata.max.age | PT72H
| The maximum age of metadata, the metadata
exceeding the age will be cleaned.
[...]
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 06c8e7a9d5..6586498655 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2077,6 +2077,24 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofMinutes(30).toMillis)
+ val METADATA_CLEANER_BATCH_SIZE: ConfigEntry[Int] =
+ buildConf("kyuubi.metadata.cleaner.batch.size")
+ .serverOnly
+ .doc("The batch size for cleaning expired metadata. " +
+ "This is used to avoid holding the delete lock for a long time " +
+ "when there are too many expired metadata to be cleaned.")
+ .version("1.11.0")
+ .intConf
+ .createWithDefault(1000)
+
+ val METADATA_CLEANER_BATCH_INTERVAL: ConfigEntry[Long] =
+ buildConf("kyuubi.metadata.cleaner.batch.interval")
+ .serverOnly
+ .internal
+ .doc("The interval to wait before next batch of cleaning expired
metadata.")
+ .timeConf
+ .createWithDefault(Duration.ofSeconds(3).toMillis)
+
val METADATA_RECOVERY_THREADS: ConfigEntry[Int] =
buildConf("kyuubi.metadata.recovery.threads")
.serverOnly
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
index 522ac32dd6..c5182979ee 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
@@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
+import com.google.common.annotations.VisibleForTesting
+
import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.client.api.v1.dto.Batch
import org.apache.kyuubi.config.KyuubiConf
@@ -236,10 +238,11 @@ class MetadataManager extends
AbstractService("MetadataManager") {
private def startMetadataCleaner(): Unit = {
val stateMaxAge = conf.get(METADATA_MAX_AGE)
val interval = conf.get(KyuubiConf.METADATA_CLEANER_INTERVAL)
+ val batchSize = conf.get(KyuubiConf.METADATA_CLEANER_BATCH_SIZE)
+ val batchInterval = conf.get(KyuubiConf.METADATA_CLEANER_BATCH_INTERVAL)
val cleanerTask: Runnable = () => {
try {
-
withMetadataRequestMetrics(_metadataStore.cleanupMetadataByAge(stateMaxAge))
-
withMetadataRequestMetrics(_metadataStore.cleanupKubernetesEngineInfoByAge(stateMaxAge))
+ cleanupMetadata(stateMaxAge, batchSize, batchInterval)
} catch {
case e: Throwable => error("Error cleaning up the metadata by age", e)
}
@@ -253,6 +256,39 @@ class MetadataManager extends
AbstractService("MetadataManager") {
TimeUnit.MILLISECONDS)
}
+ @VisibleForTesting
+ private[metadata] def cleanupMetadata(maxAge: Long, batchSize: Int,
batchInterval: Long): Unit = {
+ var needToCleanMetadata = true
+ var needToCleanKubernetesInfo = true
+ var cleanupLoop = 0
+
+ val MAX_CLEANUP_LOOPS = 100 // a guard in case it runs into an infinite
loop
+ while ((needToCleanMetadata || needToCleanKubernetesInfo) && cleanupLoop <
MAX_CLEANUP_LOOPS) {
+ cleanupLoop += 1
+ if (needToCleanMetadata) {
+ needToCleanMetadata =
+ withMetadataRequestMetrics(_metadataStore.cleanupMetadataByAge(
+ maxAge,
+ batchSize)) >= batchSize
+ }
+ if (needToCleanKubernetesInfo) {
+ needToCleanKubernetesInfo =
+
withMetadataRequestMetrics(_metadataStore.cleanupKubernetesEngineInfoByAge(
+ maxAge,
+ batchSize)) >= batchSize
+ }
+ if (needToCleanMetadata || needToCleanKubernetesInfo) {
+ if (cleanupLoop < MAX_CLEANUP_LOOPS) {
+ info(s"Sleep $batchInterval ms before next batch of metadata
cleaning.")
+ Thread.sleep(batchInterval)
+ } else {
+ warn(s"Metadata cleaning reaches the maximum loop
$MAX_CLEANUP_LOOPS, " +
+ s"will continue in the next round.")
+ }
+ }
+ }
+ }
+
def addMetadataRetryRequest(request: MetadataRequest): Unit = {
val maxRequestsAsyncRetryRefs: Int =
conf.get(KyuubiConf.METADATA_REQUEST_ASYNC_RETRY_QUEUE_SIZE)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
index 46a5505afe..492b286e3b 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
@@ -102,8 +102,9 @@ trait MetadataStore extends Closeable {
/**
* Check and cleanup the terminated batches information with maxAge
limitation.
* @param maxAge the batch state info maximum age.
+ * @param limit the maximum number of metadata to be cleaned up.
*/
- def cleanupMetadataByAge(maxAge: Long): Unit
+ def cleanupMetadataByAge(maxAge: Long, limit: Int): Int
/**
* Cleanup kubernetes engine info by identifier.
@@ -113,6 +114,7 @@ trait MetadataStore extends Closeable {
/**
* Check and cleanup the kubernetes engine info with maxAge limitation.
* @param maxAge the kubernetes engine info maximum age.
+ * @param limit the maximum number of kubernetes engine info to be cleaned
up.
*/
- def cleanupKubernetesEngineInfoByAge(maxAge: Long): Unit
+ def cleanupKubernetesEngineInfoByAge(maxAge: Long, limit: Int): Int
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
index f3c0b2d7de..62395fd644 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
@@ -409,13 +409,15 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
}
}
- override def cleanupMetadataByAge(maxAge: Long): Unit = {
+ override def cleanupMetadataByAge(maxAge: Long, limit: Int): Int = {
val minEndTime = System.currentTimeMillis() - maxAge
val query =
- s"DELETE FROM $METADATA_TABLE WHERE end_time > 0 AND end_time < ? AND
create_time < ?"
+ s"DELETE FROM $METADATA_TABLE WHERE end_time > 0 AND end_time < ? AND
create_time < ?" +
+ s" ${dialect.deleteFromLimitClause(limit)}"
JdbcUtils.withConnection { connection =>
withUpdateCount(connection, query, minEndTime, minEndTime) { count =>
- info(s"Cleaned up $count records older than $maxAge ms from
$METADATA_TABLE.")
+ info(s"Cleaned up $count records older than $maxAge ms from
$METADATA_TABLE limit:$limit.")
+ count
}
}
}
@@ -462,12 +464,15 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
}
}
- override def cleanupKubernetesEngineInfoByAge(maxAge: Long): Unit = {
+ override def cleanupKubernetesEngineInfoByAge(maxAge: Long, limit: Int): Int
= {
val minUpdateTime = System.currentTimeMillis() - maxAge
- val query = s"DELETE FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE update_time
< ?"
+ val query = s"DELETE FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE update_time
< ?" +
+ s" ${dialect.deleteFromLimitClause(limit)}"
JdbcUtils.withConnection { connection =>
withUpdateCount(connection, query, minUpdateTime) { count =>
- info(s"Cleaned up $count records older than $maxAge ms from
$KUBERNETES_ENGINE_INFO_TABLE.")
+ info(s"Cleaned up $count records older than $maxAge ms from
$KUBERNETES_ENGINE_INFO_TABLE" +
+ s" limit $limit.")
+ count
}
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala
index 24408e1613..9d400a6f16 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala
@@ -17,8 +17,11 @@
package org.apache.kyuubi.server.metadata.jdbc
+import org.apache.kyuubi.Logging
+
trait JdbcDatabaseDialect {
def limitClause(limit: Int, offset: Int): String
+ def deleteFromLimitClause(limit: Int): String
def insertOrReplace(
table: String,
cols: Seq[String],
@@ -26,11 +29,16 @@ trait JdbcDatabaseDialect {
keyVal: String): String
}
-class GenericDatabaseDialect extends JdbcDatabaseDialect {
+class GenericDatabaseDialect extends JdbcDatabaseDialect with Logging {
override def limitClause(limit: Int, offset: Int): String = {
s"LIMIT $limit OFFSET $offset"
}
+ override def deleteFromLimitClause(limit: Int): String = {
+ warn("Generic dialect does not support LIMIT in DELETE statements")
+ ""
+ }
+
override def insertOrReplace(
table: String,
cols: Seq[String],
@@ -71,6 +79,10 @@ class MySQLDatabaseDialect extends GenericDatabaseDialect {
|${cols.filterNot(_ == keyCol).map(c => s"$c = new.$c").mkString(",")}
|""".stripMargin
}
+
+ override def deleteFromLimitClause(limit: Int): String = {
+ s"LIMIT $limit"
+ }
}
class PostgreSQLDatabaseDialect extends GenericDatabaseDialect {
override def insertOrReplace(
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
index fe7fa58685..de2e651f44 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
@@ -212,8 +212,24 @@ class MetadataManagerSuite extends KyuubiFunSuite {
f(metadataManager)
} finally {
metadataManager.getBatches(MetadataFilter(), 0, Int.MaxValue).foreach {
batch =>
- metadataManager.cleanupMetadataById(batch.getId)
+ // close the batch if not ended
+ if (batch.getEndTime == 0) {
+ metadataManager.updateMetadata(
+ Metadata(
+ identifier = batch.getId,
+ state = OperationState.CLOSED.toString,
+ endTime = System.currentTimeMillis()),
+ false)
+ }
}
+
+ metadataManager.cleanupMetadata(Int.MinValue, 1, 0)
+
+ // ensure all metadata are cleaned up
+ eventually(timeout(3.seconds), interval(200.milliseconds)) {
+ assert(metadataManager.getBatches(MetadataFilter(), 0,
Int.MaxValue).isEmpty)
+ }
+
// ensure no metadata request leak
eventually(timeout(5.seconds), interval(200.milliseconds)) {
assert(MetricsSystem.counterValue(METADATA_REQUEST_OPENED).getOrElse(0L) === 0)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
index 77f4ca57d5..713739d5f5 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
@@ -45,7 +45,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
batch =>
jdbcMetadataStore.cleanupMetadataByIdentifier(batch.identifier)
}
- jdbcMetadataStore.cleanupKubernetesEngineInfoByAge(0)
+ jdbcMetadataStore.cleanupKubernetesEngineInfoByAge(0, Int.MaxValue)
jdbcMetadataStore.close()
}
@@ -242,7 +242,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
Int.MaxValue).isEmpty)
eventually(Timeout(3.seconds)) {
- jdbcMetadataStore.cleanupMetadataByAge(1000)
+ jdbcMetadataStore.cleanupMetadataByAge(1000, Int.MaxValue)
assert(jdbcMetadataStore.getMetadata(batchId) == null)
}
}