This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 68d75269d [Gluten-5256][CH]optimizing table after spark restart bug 
(#5258)
68d75269d is described below

commit 68d75269d8af6819a1f48da9939ef36b44ffb141
Author: Hongbin Ma <mahong...@apache.org>
AuthorDate: Wed Apr 3 13:27:17 2024 +0800

    [Gluten-5256][CH]optimizing table after spark restart bug (#5258)
    
    [Gluten-5256][CH]optimizing table after spark restart bug
---
 .../delta/commands/OptimizeTableCommandBase.scala  |   1 +
 .../sql/delta/commands/OptimizeTableCommand.scala  |   2 +
 .../commands/OptimizeTableCommandOverwrites.scala  | 323 ---------------------
 .../spark/sql/delta/commands/CommandUtils.scala    |  43 +++
 .../commands/OptimizeTableCommandOverwrites.scala  |   7 +-
 .../GlutenClickHouseTableAfterRestart.scala        | 178 ++++++++++--
 6 files changed, 210 insertions(+), 344 deletions(-)

diff --git 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandBase.scala
 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandBase.scala
index bbfa2ecee..2d6ac48f5 100644
--- 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandBase.scala
+++ 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandBase.scala
@@ -126,6 +126,7 @@ case class OptimizeTableCommand(
   override val otherCopyArgs: Seq[AnyRef] = zOrderBy :: Nil
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    CommandUtils.ensureClickHouseTableV2(tableId, sparkSession)
     val deltaLog = getDeltaLogClickhouse(sparkSession, path, tableId, 
"OPTIMIZE")
 
     val partitionColumns = deltaLog.snapshot.metadata.partitionColumns
diff --git 
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
 
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
index 57427957d..346943671 100644
--- 
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
+++ 
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
@@ -126,6 +126,8 @@ case class OptimizeTableCommand(
   override val otherCopyArgs: Seq[AnyRef] = zOrderBy :: Nil
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    CommandUtils.ensureClickHouseTableV2(tableId, sparkSession)
+
     val deltaLog = getDeltaLogClickhouse(sparkSession, path, tableId, 
"OPTIMIZE", options)
 
     val txn = deltaLog.startTransaction()
diff --git 
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
 
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
deleted file mode 100644
index 498b7ff4f..000000000
--- 
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.delta.commands
-
-import org.apache.gluten.expression.ConverterUtils
-
-import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
-import org.apache.spark.internal.io.SparkHadoopWriterUtils
-import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, 
DeltaTableIdentifier, OptimisticTransaction}
-import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
-import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
-import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.CHDatasourceJniWrapper
-import org.apache.spark.sql.execution.datasources.v1.CHMergeTreeWriterInjects
-import org.apache.spark.sql.execution.datasources.v1.clickhouse._
-import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.{AddFileTags, 
AddMergeTreeParts}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.{SerializableConfiguration, SystemClock, Utils}
-
-import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
-import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, 
TaskType}
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-
-import java.util.{Date, UUID}
-
-import scala.collection.mutable.ArrayBuffer
-
-object OptimizeTableCommandOverwrites extends Logging {
-
-  case class TaskDescription(
-      path: String,
-      database: String,
-      tableName: String,
-      orderByKeyOption: Option[Seq[String]],
-      lowCardKeyOption: Option[Seq[String]],
-      minmaxIndexKeyOption: Option[Seq[String]],
-      bfIndexKeyOption: Option[Seq[String]],
-      setIndexKeyOption: Option[Seq[String]],
-      primaryKeyOption: Option[Seq[String]],
-      partitionColumns: Seq[String],
-      partList: Seq[String],
-      tableSchema: StructType,
-      clickhouseTableConfigs: Map[String, String],
-      serializableHadoopConf: SerializableConfiguration,
-      jobIdInstant: Long,
-      partitionDir: Option[String],
-      bucketDir: Option[String]
-  )
-
-  private def executeTask(
-      description: TaskDescription,
-      sparkStageId: Int,
-      sparkPartitionId: Int,
-      sparkAttemptNumber: Int
-  ): MergeTreeWriteTaskResult = {
-
-    val jobId = SparkHadoopWriterUtils.createJobID(new 
Date(description.jobIdInstant), sparkStageId)
-    val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
-    val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
-
-    // Set up the attempt context required to use in the output committer.
-    val taskAttemptContext: TaskAttemptContext = {
-      // Set up the configuration object
-      val hadoopConf = description.serializableHadoopConf.value
-      hadoopConf.set("mapreduce.job.id", jobId.toString)
-      hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
-      hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
-      hadoopConf.setBoolean("mapreduce.task.ismap", true)
-      hadoopConf.setInt("mapreduce.task.partition", 0)
-
-      new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
-    }
-
-    try {
-      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-
-        val uuid = UUID.randomUUID.toString
-
-        val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel(
-          description.path,
-          description.database,
-          description.tableName,
-          description.orderByKeyOption,
-          description.lowCardKeyOption,
-          description.minmaxIndexKeyOption,
-          description.bfIndexKeyOption,
-          description.setIndexKeyOption,
-          description.primaryKeyOption,
-          description.partitionColumns,
-          description.partList,
-          ConverterUtils.convertNamedStructJson(description.tableSchema),
-          description.clickhouseTableConfigs,
-          description.tableSchema.toAttributes
-        )
-
-        val datasourceJniWrapper = new CHDatasourceJniWrapper()
-        val returnedMetrics =
-          datasourceJniWrapper.nativeMergeMTParts(
-            planWithSplitInfo.plan,
-            planWithSplitInfo.splitInfo,
-            uuid,
-            taskId.getId.toString,
-            description.partitionDir.getOrElse(""),
-            description.bucketDir.getOrElse("")
-          )
-        if (returnedMetrics != null && returnedMetrics.nonEmpty) {
-          val addFiles = AddFileTags.partsMetricsToAddFile(
-            description.database,
-            description.tableName,
-            description.path,
-            returnedMetrics,
-            Seq(Utils.localHostName()))
-
-          val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
-            // committer.commitTask(taskAttemptContext)
-            new TaskCommitMessage(addFiles.toSeq)
-          }
-
-//          val summary = MergeTreeExecutedWriteSummary(
-//            updatedPartitions = updatedPartitions.toSet,
-//            stats = statsTrackers.map(_.getFinalStats(taskCommitTime)))
-          MergeTreeWriteTaskResult(taskCommitMessage, null)
-        } else {
-          throw new IllegalStateException()
-        }
-      })(
-        catchBlock = {
-          // If there is an error, abort the task
-          logError(s"Job $jobId aborted.")
-        },
-        finallyBlock = {})
-    } catch {
-      case e: FetchFailedException =>
-        throw e
-      case f: FileAlreadyExistsException if 
SQLConf.get.fastFailFileFormatOutput =>
-        // If any output file to write already exists, it does not make sense 
to re-run this task.
-        // We throw the exception and let Executor throw ExceptionFailure to 
abort the job.
-        throw new TaskOutputFileAlreadyExistException(f)
-      case t: Throwable =>
-        throw QueryExecutionErrors.taskFailedWhileWritingRowsError(t)
-    }
-
-  }
-
-  def runOptimizeBinJobClickhouse(
-      txn: OptimisticTransaction,
-      partitionValues: Map[String, String],
-      bucketNum: String,
-      bin: Seq[AddFile],
-      maxFileSize: Long): Seq[FileAction] = {
-    val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog);
-
-    val sparkSession = SparkSession.getActiveSession.get
-
-    val rddWithNonEmptyPartitions =
-      sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1)
-
-    val jobIdInstant = new Date().getTime
-    val ret = new 
Array[MergeTreeWriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
-
-    val serializableHadoopConf = new SerializableConfiguration(
-      sparkSession.sessionState.newHadoopConfWithOptions(
-        txn.metadata.configuration ++ txn.deltaLog.options))
-
-    val partitionDir = if (tableV2.partitionColumns.isEmpty) {
-      None
-    } else {
-      Some(tableV2.partitionColumns.map(c => c + "=" + 
partitionValues(c)).mkString("/"))
-    }
-
-    val bucketDir = if (tableV2.bucketOption.isEmpty) {
-      None
-    } else {
-      Some(bucketNum)
-    }
-
-    val description = TaskDescription.apply(
-      txn.deltaLog.dataPath.toString,
-      tableV2.dataBaseName,
-      tableV2.tableName,
-      tableV2.orderByKeyOption,
-      tableV2.lowCardKeyOption,
-      tableV2.minmaxIndexKeyOption,
-      tableV2.bfIndexKeyOption,
-      tableV2.setIndexKeyOption,
-      tableV2.primaryKeyOption,
-      tableV2.partitionColumns,
-      bin.map(_.asInstanceOf[AddMergeTreeParts].name),
-      tableV2.schema(),
-      tableV2.clickhouseTableConfigs,
-      serializableHadoopConf,
-      jobIdInstant,
-      partitionDir,
-      bucketDir
-    )
-    sparkSession.sparkContext.runJob(
-      rddWithNonEmptyPartitions,
-      (taskContext: TaskContext, _: Iterator[InternalRow]) => {
-        executeTask(
-          description,
-          taskContext.stageId(),
-          taskContext.partitionId(),
-          taskContext.taskAttemptId().toInt & Integer.MAX_VALUE
-        )
-      },
-      rddWithNonEmptyPartitions.partitions.indices,
-      (index, res: MergeTreeWriteTaskResult) => {
-        ret(index) = res
-      }
-    )
-
-    val addFiles = ret
-      .flatMap(_.commitMsg.obj.asInstanceOf[Seq[AddFile]])
-      .toSeq
-
-    val removeFiles =
-      bin.map(f => f.removeWithTimestamp(new SystemClock().getTimeMillis(), 
dataChange = false))
-    addFiles ++ removeFiles
-
-  }
-
-  private def isDeltaTable(spark: SparkSession, tableName: TableIdentifier): 
Boolean = {
-    val catalog = spark.sessionState.catalog
-    val tableIsNotTemporaryTable = !catalog.isTempView(tableName)
-    val tableExists = {
-      (tableName.database.isEmpty || 
catalog.databaseExists(tableName.database.get)) &&
-      catalog.tableExists(tableName)
-    }
-    tableIsNotTemporaryTable && tableExists && catalog
-      .getTableMetadata(tableName)
-      .provider
-      .get
-      .toLowerCase()
-      .equals("clickhouse")
-  }
-
-  def getDeltaLogClickhouse(
-      spark: SparkSession,
-      path: Option[String],
-      tableIdentifier: Option[TableIdentifier],
-      operationName: String,
-      hadoopConf: Map[String, String] = Map.empty): DeltaLog = {
-    val tablePath =
-      if (tableIdentifier.nonEmpty && isDeltaTable(spark, 
tableIdentifier.get)) {
-        val sessionCatalog = spark.sessionState.catalog
-        lazy val metadata = 
sessionCatalog.getTableMetadata(tableIdentifier.get)
-        new Path(metadata.location)
-      } else {
-        throw new UnsupportedOperationException("OPTIMIZE is ony supported for 
clickhouse tables")
-      }
-
-    val startTime = Some(System.currentTimeMillis)
-    val deltaLog = DeltaLog.forTable(spark, tablePath, hadoopConf)
-    if (deltaLog.update(checkIfUpdatedSinceTs = startTime).version < 0) {
-      throw DeltaErrors.notADeltaTableException(
-        operationName,
-        DeltaTableIdentifier(path, tableIdentifier))
-    }
-    deltaLog
-  }
-
-  def groupFilesIntoBinsClickhouse(
-      partitionsToCompact: Seq[((String, Map[String, String]), Seq[AddFile])],
-      maxTargetFileSize: Long): Seq[((String, Map[String, String]), 
Seq[AddFile])] = {
-    partitionsToCompact.flatMap {
-      case (partition, files) =>
-        val bins = new ArrayBuffer[Seq[AddFile]]()
-
-        val currentBin = new ArrayBuffer[AddFile]()
-        var currentBinSize = 0L
-
-        files.sortBy(_.size).foreach {
-          file =>
-            // Generally, a bin is a group of existing files, whose total size 
does not exceed the
-            // desired maxFileSize. They will be coalesced into a single 
output file.
-            // However, if isMultiDimClustering = true, all files in a 
partition will be read by the
-            // same job, the data will be range-partitioned and numFiles = 
totalFileSize / maxFileSize
-            // will be produced. See below.
-
-            // isMultiDimClustering is always false for Gluten Clickhouse for 
now
-            if (file.size + currentBinSize > maxTargetFileSize /*&& 
!isMultiDimClustering */ ) {
-              bins += currentBin.toVector
-              currentBin.clear()
-              currentBin += file
-              currentBinSize = file.size
-            } else {
-              currentBin += file
-              currentBinSize += file.size
-            }
-        }
-
-        if (currentBin.nonEmpty) {
-          bins += currentBin.toVector
-        }
-
-        bins
-          .map(b => (partition, b))
-          // select bins that have at least two files or in case of multi-dim 
clustering
-          // select all bins
-          .filter(_._2.size > 1 /*|| isMultiDimClustering*/ )
-    }
-  }
-}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/CommandUtils.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/CommandUtils.scala
new file mode 100644
index 000000000..262c37eff
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/CommandUtils.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.commands
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.connector.catalog.Identifier
+
+object CommandUtils {
+  // Ensure ClickHouseTableV2 table exists
+  def ensureClickHouseTableV2(
+      tableId: Option[TableIdentifier],
+      sparkSession: SparkSession): Unit = {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+    if (tableId.isEmpty) {
+      throw new UnsupportedOperationException("Current command requires table 
identifier.")
+    }
+    // If user comes into this function without previously triggering loadTable
+    // (which creates ClickhouseTableV2), we have to load the table manually
+    // Notice: Multi-catalog case is not well considered!
+    
sparkSession.sessionState.catalogManager.currentCatalog.asTableCatalog.loadTable(
+      Identifier.of(
+        Array(
+          tableId.get.database.getOrElse(
+            sparkSession.sessionState.catalogManager.currentNamespace.head)),
+        tableId.get.table)
+    )
+  }
+}
diff --git 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
similarity index 98%
rename from 
backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
rename to 
backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
index 498b7ff4f..5aeafbf81 100644
--- 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
@@ -294,11 +294,12 @@ object OptimizeTableCommandOverwrites extends Logging {
             // Generally, a bin is a group of existing files, whose total size 
does not exceed the
             // desired maxFileSize. They will be coalesced into a single 
output file.
             // However, if isMultiDimClustering = true, all files in a 
partition will be read by the
-            // same job, the data will be range-partitioned and numFiles = 
totalFileSize / maxFileSize
+            // same job, the data will be range-partitioned and
+            // numFiles = totalFileSize / maxFileSize
             // will be produced. See below.
 
             // isMultiDimClustering is always false for Gluten Clickhouse for 
now
-            if (file.size + currentBinSize > maxTargetFileSize /*&& 
!isMultiDimClustering */ ) {
+            if (file.size + currentBinSize > maxTargetFileSize /* && 
!isMultiDimClustering */ ) {
               bins += currentBin.toVector
               currentBin.clear()
               currentBin += file
@@ -317,7 +318,7 @@ object OptimizeTableCommandOverwrites extends Logging {
           .map(b => (partition, b))
           // select bins that have at least two files or in case of multi-dim 
clustering
           // select all bins
-          .filter(_._2.size > 1 /*|| isMultiDimClustering*/ )
+          .filter(_._2.size > 1 /* || isMultiDimClustering */ )
     }
   }
 }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
index c6abbd444..e751c2fda 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
@@ -30,6 +30,7 @@ import java.io.File
 // Some sqls' line length exceeds 100
 // scalastyle:off line.size.limit
 
+// This suite is to make sure clickhouse commands works well even after spark 
restart
 class GlutenClickHouseTableAfterRestart
   extends GlutenClickHouseTPCHAbstractSuite
   with AdaptiveSparkPlanHelper {
@@ -73,7 +74,9 @@ class GlutenClickHouseTableAfterRestart
 
   override protected def initializeSession(): Unit = {
     if (_hiveSpark == null) {
-      val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
+      val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db_" + 
current_db_num
+      current_db_num += 1
+
       _hiveSpark = SparkSession
         .builder()
         .config(sparkConf)
@@ -108,6 +111,8 @@ class GlutenClickHouseTableAfterRestart
     }
   }
 
+  var current_db_num: Int = 0
+
   test("test mergetree after restart") {
     spark.sql(s"""
                  |DROP TABLE IF EXISTS lineitem_mergetree;
@@ -182,13 +187,157 @@ class GlutenClickHouseTableAfterRestart
       assert(stats2.missCount() - oldMissingCount2 == 0)
     }
 
+    val oldMissingCount1 = 
ClickhouseSnapshot.deltaScanCache.stats().missCount()
+    val oldMissingCount2 = 
ClickhouseSnapshot.addFileToAddMTPCache.stats().missCount()
+
+    restartSpark()
+
+    runTPCHQueryBySQL(1, sqlStr)(_ => {})
+
+    // after restart, additionally check stats of delta scan cache
+    val stats1 = ClickhouseSnapshot.deltaScanCache.stats()
+    assert(stats1.missCount() - oldMissingCount1 == 1)
+    val stats2 = ClickhouseSnapshot.addFileToAddMTPCache.stats()
+    assert(stats2.missCount() - oldMissingCount2 == 6)
+
+  }
+
+  test("test optimize after restart") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS table_restart_optimize;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS table_restart_optimize (id 
bigint,  name string)
+                 |USING clickhouse
+                 |LOCATION '$basePath/table_restart_optimize'
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table table_restart_optimize values (1,"tom"), 
(2, "jim")
+                 |""".stripMargin)
+    // second file
+    spark.sql(s"""
+                 | insert into table table_restart_optimize values (1,"tom"), 
(2, "jim")
+                 |""".stripMargin)
+
+    restartSpark()
+
+    spark.sql("optimize table_restart_optimize")
+    assert(spark.sql("select count(*) from 
table_restart_optimize").collect().apply(0).get(0) == 4)
+  }
+
+  test("test vacuum after restart") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS table_restart_vacuum;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS table_restart_vacuum (id bigint,  
name string)
+                 |USING clickhouse
+                 |LOCATION '$basePath/table_restart_vacuum'
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table table_restart_vacuum values (1,"tom"), 
(2, "jim")
+                 |""".stripMargin)
+    // second file
+    spark.sql(s"""
+                 | insert into table table_restart_vacuum values (1,"tom"), 
(2, "jim")
+                 |""".stripMargin)
+
+    spark.sql("optimize table_restart_vacuum")
+
+    restartSpark()
+
+    spark.sql("set spark.gluten.enabled=false")
+    spark.sql("vacuum table_restart_vacuum")
+    spark.sql("set spark.gluten.enabled=true")
+
+    assert(spark.sql("select count(*) from 
table_restart_vacuum").collect().apply(0).get(0) == 4)
+  }
+
+  test("test update after restart") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS table_restart_update;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS table_restart_update (id bigint,  
name string)
+                 |USING clickhouse
+                 |LOCATION '$basePath/table_restart_update'
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table table_restart_update values (1,"tom"), 
(2, "jim")
+                 |""".stripMargin)
+    // second file
+    spark.sql(s"""
+                 | insert into table table_restart_update values (1,"tom"), 
(2, "jim")
+                 |""".stripMargin)
+
+    restartSpark()
+
+    spark.sql("update table_restart_update set name = 'tom' where id = 1")
+
+    assert(spark.sql("select count(*) from 
table_restart_update").collect().apply(0).get(0) == 4)
+  }
+
+  test("test delete after restart") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS table_restart_delete;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS table_restart_delete (id bigint,  
name string)
+                 |USING clickhouse
+                 |LOCATION '$basePath/table_restart_delete'
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table table_restart_delete values (1,"tom"), 
(2, "jim")
+                 |""".stripMargin)
+    // second file
+    spark.sql(s"""
+                 | insert into table table_restart_delete values (1,"tom"), 
(2, "jim")
+                 |""".stripMargin)
+
+    restartSpark()
+
+    spark.sql("delete from table_restart_delete where where id = 1")
+
+    assert(spark.sql("select count(*) from 
table_restart_delete").collect().apply(0).get(0) == 2)
+  }
+
+  test("test drop after restart") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS table_restart_drop;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS table_restart_drop (id bigint,  
name string)
+                 |USING clickhouse
+                 |LOCATION '$basePath/table_restart_drop'
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table table_restart_drop values (1,"tom"), (2, 
"jim")
+                 |""".stripMargin)
+    // second file
+    spark.sql(s"""
+                 | insert into table table_restart_drop values (1,"tom"), (2, 
"jim")
+                 |""".stripMargin)
+
+    restartSpark()
+
+    spark.sql("drop table table_restart_drop")
+  }
+
+  private def restartSpark(): Unit = {
     // now restart
     ClickHouseTableV2.clearCache()
     ClickhouseSnapshot.clearAllFileStatusCache()
 
-    val oldMissingCount1 = 
ClickhouseSnapshot.deltaScanCache.stats().missCount()
-    val oldMissingCount2 = 
ClickhouseSnapshot.addFileToAddMTPCache.stats().missCount()
-
     val session = getActiveSession.orElse(getDefaultSession)
     if (session.isDefined) {
       session.get.stop()
@@ -196,29 +345,22 @@ class GlutenClickHouseTableAfterRestart
       SparkSession.clearDefaultSession()
     }
 
-    val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
+    val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db_"
     // use metastore_db2 to avoid issue: "Another instance of Derby may have 
already booted the database"
-    val destDir = new File(hiveMetaStoreDB + "2")
+    val destDir = new File(hiveMetaStoreDB + current_db_num)
     destDir.mkdirs()
-    FileUtils.copyDirectory(new File(hiveMetaStoreDB), destDir)
+    FileUtils.copyDirectory(new File(hiveMetaStoreDB + (current_db_num - 1)), 
destDir)
     _hiveSpark = null
     _hiveSpark = SparkSession
       .builder()
       .config(sparkConf)
       .enableHiveSupport()
-      .config("javax.jdo.option.ConnectionURL", 
s"jdbc:derby:;databaseName=${hiveMetaStoreDB}2")
+      .config(
+        "javax.jdo.option.ConnectionURL",
+        s"jdbc:derby:;databaseName=$hiveMetaStoreDB$current_db_num")
       .master("local[2]")
       .getOrCreate()
-
-    runTPCHQueryBySQL(1, sqlStr)(_ => {})
-
-    // after restart, additionally check stats of delta scan cache
-    val stats1 = ClickhouseSnapshot.deltaScanCache.stats()
-    assert(stats1.missCount() - oldMissingCount1 == 1)
-    val stats2 = ClickhouseSnapshot.addFileToAddMTPCache.stats()
-    assert(stats2.missCount() - oldMissingCount2 == 6)
-
+    current_db_num += 1
   }
-
 }
 // scalastyle:off line.size.limit


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to