This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b2fc18654c [VL] Refactor gluten-it to pass structured query 
information to runner (#10623)
b2fc18654c is described below

commit b2fc18654c3b9c204a3fc8168e479363d5899f7d
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Sep 5 15:08:58 2025 +0200

    [VL] Refactor gluten-it to pass structured query information to runner 
(#10623)
---
 .../gluten/integration/command/QueriesMixin.java   |  58 ++---------
 .../apache/gluten/integration/QueryRunner.scala    |  15 ++-
 .../org/apache/gluten/integration/QuerySet.scala   | 106 +++++++++++++++++++++
 .../org/apache/gluten/integration/Suite.scala      |   4 +-
 .../apache/gluten/integration/action/Actions.scala |   4 +-
 .../gluten/integration/action/Parameterized.scala  |  21 ++--
 .../apache/gluten/integration/action/Queries.scala |  20 ++--
 .../gluten/integration/action/QueriesCompare.scala |  34 +++----
 .../gluten/integration/action/SparkShell.scala     |   2 +-
 .../integration/clickbench/ClickBenchSuite.scala   |   8 +-
 .../apache/gluten/integration/ds/TpcdsSuite.scala  |  10 +-
 .../apache/gluten/integration/h/TpchSuite.scala    |   9 +-
 .../org/apache/spark/sql/SparkQueryRunner.scala    |  30 +-----
 13 files changed, 180 insertions(+), 141 deletions(-)

diff --git 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/QueriesMixin.java
 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/QueriesMixin.java
index 567aec4c89..8c47bc8bf0 100644
--- 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/QueriesMixin.java
+++ 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/QueriesMixin.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.integration.command;
 
+import org.apache.gluten.integration.QuerySet;
 import org.apache.gluten.integration.Suite;
 import org.apache.gluten.integration.action.Actions;
 import org.apache.gluten.integration.collections.JavaCollectionConverter;
@@ -24,7 +25,6 @@ import com.google.common.base.Preconditions;
 import picocli.CommandLine;
 
 import java.util.*;
-import java.util.stream.Collectors;
 
 public class QueriesMixin {
   @CommandLine.Option(
@@ -91,63 +91,25 @@ public class QueriesMixin {
   public Actions.QuerySelector queries() {
     return new Actions.QuerySelector() {
       @Override
-      public scala.collection.immutable.Seq<String> select(Suite suite) {
-        final List<String> all = select0(suite);
-        final Division div = Division.parse(shard);
-        final List<String> out = div(all, div);
-        System.out.println("About to run queries: " + out + "... ");
-        return JavaCollectionConverter.asScalaSeq(out);
-      }
-
-      private List<String> div(List<String> from, Division div) {
-        final int queryCount = from.size();
-        final int shardCount = div.shardCount;
-        final int least = queryCount / shardCount;
-        final int shardIdx = div.shard - 1;
-        final int shardStart = shardIdx * least;
-        final int numQueriesInShard;
-        if (shardIdx == shardCount - 1) {
-          final int remaining = queryCount - least * shardCount;
-          numQueriesInShard = least + remaining;
-        } else {
-          numQueriesInShard = least;
-        }
-        final List<String> out = new ArrayList<>();
-        for (int i = shardStart; i < shardStart + numQueriesInShard; i++) {
-          out.add(from.get(i));
-        }
-        return out;
-      }
-
-      private List<String> select0(Suite suite) {
+      public QuerySet select(Suite suite) {
         final String[] queryIds = queries;
         final String[] excludedQueryIds = excludedQueries;
         if (queryIds.length > 0 && excludedQueryIds.length > 0) {
           throw new IllegalArgumentException(
               "Should not specify queries and excluded queries at the same 
time");
         }
-        String[] all = suite.allQueryIds();
-        Set<String> allSet = new HashSet<>(Arrays.asList(all));
+        QuerySet querySet = suite.allQueries();
         if (queryIds.length > 0) {
-          for (String id : queryIds) {
-            if (!allSet.contains(id)) {
-              throw new IllegalArgumentException("Invalid query ID: " + id);
-            }
-          }
-          return Arrays.asList(queryIds);
+          querySet = 
querySet.filter(JavaCollectionConverter.asScalaSeq(Arrays.asList(queryIds)));
         }
         if (excludedQueryIds.length > 0) {
-          for (String id : excludedQueryIds) {
-            if (!allSet.contains(id)) {
-              throw new IllegalArgumentException("Invalid query ID to exclude: 
" + id);
-            }
-          }
-          Set<String> excludedSet = new 
HashSet<>(Arrays.asList(excludedQueryIds));
-          return Arrays.stream(all)
-              .filter(id -> !excludedSet.contains(id))
-              .collect(Collectors.toList());
+          querySet =
+              
querySet.exclude(JavaCollectionConverter.asScalaSeq(Arrays.asList(excludedQueryIds)));
         }
-        return Arrays.asList(all);
+        final Division div = Division.parse(shard);
+        querySet = querySet.getShard(div.shard - 1, div.shardCount);
+        System.out.println("About to run queries: " + querySet.queryIds() + 
"... ");
+        return querySet;
       }
     };
   }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QueryRunner.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QueryRunner.scala
index 40fa01a04a..04685320a0 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QueryRunner.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QueryRunner.scala
@@ -25,7 +25,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils
 
 import java.io.File
 
-class QueryRunner(val queryResourceFolder: String, val source: String, val 
dataPath: String) {
+class QueryRunner(val source: String, val dataPath: String) {
   import QueryRunner._
 
   Preconditions.checkState(
@@ -40,28 +40,27 @@ class QueryRunner(val queryResourceFolder: String, val 
source: String, val dataP
   def runQuery(
       spark: SparkSession,
       desc: String,
-      caseId: String,
+      query: Query,
       explain: Boolean = false,
       sqlMetricMapper: MetricMapper = MetricMapper.dummy,
       executorMetrics: Seq[String] = Nil,
       randomKillTasks: Boolean = false): QueryResult = {
-    val path = "%s/%s.sql".format(queryResourceFolder, caseId)
     try {
       val r =
         SparkQueryRunner.runQuery(
           spark,
           desc,
-          path,
+          query,
           explain,
           sqlMetricMapper,
           executorMetrics,
           randomKillTasks)
-      println(s"Successfully ran query $caseId. Returned row count: 
${r.rows.length}")
-      Success(caseId, r)
+      println(s"Successfully ran query ${query.id}. Returned row count: 
${r.rows.length}")
+      Success(query.id, r)
     } catch {
       case e: Exception =>
-        println(s"Error running query $caseId. Error: 
${ExceptionUtils.getStackTrace(e)}")
-        Failure(caseId, e)
+        println(s"Error running query ${query.id}. Error: 
${ExceptionUtils.getStackTrace(e)}")
+        Failure(query.id, e)
     }
   }
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QuerySet.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QuerySet.scala
new file mode 100644
index 0000000000..12919e7448
--- /dev/null
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QuerySet.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.integration
+
+import java.io.ByteArrayOutputStream
+import java.nio.charset.StandardCharsets
+
+import scala.collection.mutable
+
+/** A set of SQL queries. */
+case class Query(id: String, path: String, sql: String)
+
+case class QuerySet(queryIds: Seq[String], queryMap: Map[String, Query]) {
+  assert(queryIds.size == queryMap.size)
+  private val queryCount = queryIds.size
+  private val queryIdSet = queryIds.toSet
+  require(queryIdSet.size == queryCount, s"Duplicated query IDs found in the 
set: $queryIds")
+
+  val queries: Seq[Query] = queryIds.map(queryMap(_))
+
+  def filter(filteredQueryIds: Seq[String]): QuerySet = {
+    filteredQueryIds.foreach {
+      qid => require(queryIdSet.contains(qid), s"Query ID $qid is not found in 
the set: $queryIds")
+    }
+    val filteredQueryIdSet = filteredQueryIds.toSet
+    QuerySet(
+      filteredQueryIds,
+      queryMap.filter { case (qid, _) => filteredQueryIdSet.contains(qid) })
+  }
+
+  def exclude(excludedQueryIds: Seq[String]): QuerySet = {
+    excludedQueryIds.foreach {
+      qid => require(queryIdSet.contains(qid), s"Query ID $qid is not found in 
the set: $queryIds")
+    }
+
+    val excludedQueryIdSet = excludedQueryIds.toSet
+    val remainingQueryIds = queryIds.filter(!excludedQueryIdSet.contains(_))
+    filter(remainingQueryIds)
+  }
+
+  def getShard(shardId: Int, shardCount: Int): QuerySet = {
+    val least: Int = queryCount / shardCount
+    val shardStart: Int = shardId * least
+    var numQueriesInShard: Int = 0
+    if (shardId == shardCount - 1) {
+      val remaining: Int = queryCount - least * shardCount
+      numQueriesInShard = least + remaining
+    } else {
+      numQueriesInShard = least
+    }
+    val shardQueryIds = mutable.ArrayBuffer[String]()
+    for (i <- shardStart until shardStart + numQueriesInShard) {
+      shardQueryIds += queryIds(i)
+    }
+    filter(shardQueryIds.toSeq)
+  }
+
+  def getQuery(queryId: String): Query = {
+    queryMap(queryId)
+  }
+}
+
+object QuerySet {
+  private def resourceToString(resource: String): String = {
+    val inStream = QuerySet.getClass.getResourceAsStream(resource)
+    require(inStream != null, s"Resource not found: $resource")
+    val outStream = new ByteArrayOutputStream
+    try {
+      var reading = true
+      while (reading) {
+        inStream.read() match {
+          case -1 => reading = false
+          case c => outStream.write(c)
+        }
+      }
+      outStream.flush()
+    } finally {
+      inStream.close()
+    }
+    new String(outStream.toByteArray, StandardCharsets.UTF_8)
+  }
+
+  def readFromResource(folder: String, queryIds: Seq[String]): QuerySet = {
+    val queries = queryIds.map {
+      qid =>
+        val path = s"$folder/$qid.sql"
+        val sql = resourceToString(path)
+        qid -> Query(qid, path, sql)
+    }.toMap
+    QuerySet(queryIds, queries)
+  }
+}
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
index c1ca9b575c..2ea814df27 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
@@ -192,9 +192,7 @@ abstract class Suite(
 
   private[integration] def genPartitionedData(): Boolean
 
-  private[integration] def queryResource(): String
-
-  private[integration] def allQueryIds(): Array[String]
+  private[integration] def allQueries(): QuerySet
 
   private[integration] def desc(): String
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Actions.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Actions.scala
index 4977dda708..14a6b21eff 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Actions.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Actions.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.integration.action
 
-import org.apache.gluten.integration.Suite
+import org.apache.gluten.integration.{QuerySet, Suite}
 
 trait Action {
   def execute(suite: Suite): Boolean
@@ -24,6 +24,6 @@ trait Action {
 
 object Actions {
   trait QuerySelector {
-    def select(suite: Suite): Seq[String]
+    def select(suite: Suite): QuerySet
   }
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
index 422b185888..91a7391695 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.integration.action
 
-import org.apache.gluten.integration.{QueryRunner, Suite}
+import org.apache.gluten.integration.{Query, QueryRunner, Suite}
 import org.apache.gluten.integration.QueryRunner.QueryResult
 import org.apache.gluten.integration.action.Actions.QuerySelector
 import org.apache.gluten.integration.action.TableRender.Field
@@ -108,7 +108,7 @@ class Parameterized(
 
   override def execute(suite: Suite): Boolean = {
     val runner: QueryRunner =
-      new QueryRunner(suite.queryResource(), suite.dataSource(), 
suite.dataWritePath())
+      new QueryRunner(suite.dataSource(), suite.dataWritePath())
 
     val sessionSwitcher = suite.sessionSwitcher
     val testConf = suite.getTestConf()
@@ -125,7 +125,8 @@ class Parameterized(
         sessionSwitcher.registerSession(coordinate.toString, conf)
     }
 
-    val runQueryIds = queries.select(suite).map(TestResultLine.QueryId(_))
+    val querySet = queries.select(suite)
+    val runQueryIds = querySet.queryIds.map(TestResultLine.QueryId(_))
 
     val marks: Seq[TestResultLine.CoordMark] = coordinates.flatMap {
       entry =>
@@ -143,7 +144,7 @@ class Parameterized(
                   Parameterized.warmUp(
                     runner,
                     sessionSwitcher.spark(),
-                    queryId.id,
+                    querySet.getQuery(queryId.id),
                     coordinate,
                     suite.desc())
                 } finally {
@@ -164,7 +165,7 @@ class Parameterized(
                     Parameterized.runQuery(
                       runner,
                       sessionSwitcher.spark(),
-                      queryId.id,
+                      querySet.getQuery(queryId.id),
                       coordinate,
                       suite.desc(),
                       explain,
@@ -363,22 +364,22 @@ object Parameterized {
   private def runQuery(
       runner: QueryRunner,
       spark: SparkSession,
-      id: String,
+      query: Query,
       coordinate: Coordinate,
       desc: String,
       explain: Boolean,
       metrics: Seq[String]): TestResultLine.Coord = {
-    val testDesc = "Query %s [%s] %s".format(desc, id, coordinate)
-    val result = runner.runQuery(spark, testDesc, id, explain, executorMetrics 
= metrics)
+    val testDesc = "Query %s [%s] %s".format(desc, query.id, coordinate)
+    val result = runner.runQuery(spark, testDesc, query, explain, 
executorMetrics = metrics)
     TestResultLine.Coord(coordinate, result)
   }
 
   private def warmUp(
       runner: QueryRunner,
       session: SparkSession,
-      id: String,
+      query: Query,
       coordinate: Coordinate,
       desc: String): Unit = {
-    runQuery(runner, session, id, coordinate, desc, explain = false, Nil)
+    runQuery(runner, session, query, coordinate, desc, explain = false, Nil)
   }
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Queries.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Queries.scala
index baba2fc793..4cf308835e 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Queries.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Queries.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.integration.action
 
-import org.apache.gluten.integration.{QueryRunner, Suite, TableCreator}
+import org.apache.gluten.integration.{Query, QueryRunner, Suite, TableCreator}
 import org.apache.gluten.integration.QueryRunner.QueryResult
 import org.apache.gluten.integration.action.Actions.QuerySelector
 import 
org.apache.gluten.integration.action.TableRender.RowParser.FieldAppender.RowAppender
@@ -37,23 +37,23 @@ case class Queries(
   import Queries._
 
   override def execute(suite: Suite): Boolean = {
-    val runQueryIds = queries.select(suite)
+    val querySet = queries.select(suite)
     val runner: QueryRunner =
-      new QueryRunner(suite.queryResource(), suite.dataSource(), 
suite.dataWritePath())
+      new QueryRunner(suite.dataSource(), suite.dataWritePath())
     val sessionSwitcher = suite.sessionSwitcher
     sessionSwitcher.useSession("test", "Run Queries")
     runner.createTables(suite.tableCreator(), sessionSwitcher.spark())
     val results = (0 until iterations).flatMap {
       iteration =>
         println(s"Running tests (iteration $iteration)...")
-        runQueryIds.map {
-          queryId =>
+        querySet.queries.map {
+          query =>
             try {
               Queries.runQuery(
                 runner,
                 suite.tableCreator(),
                 sessionSwitcher.spark(),
-                queryId,
+                query,
                 suite.desc(),
                 explain,
                 suite.getTestMetricMapper(),
@@ -172,18 +172,18 @@ object Queries {
       runner: QueryRunner,
       creator: TableCreator,
       session: SparkSession,
-      id: String,
+      query: Query,
       desc: String,
       explain: Boolean,
       metricMapper: MetricMapper,
       randomKillTasks: Boolean): TestResultLine = {
-    println(s"Running query: $id...")
-    val testDesc = "Query %s [%s]".format(desc, id)
+    println(s"Running query: ${query.id}...")
+    val testDesc = "Query %s [%s]".format(desc, query.id)
     val result =
       runner.runQuery(
         session,
         testDesc,
-        id,
+        query,
         explain = explain,
         sqlMetricMapper = metricMapper,
         randomKillTasks = randomKillTasks)
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/QueriesCompare.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/QueriesCompare.scala
index f8dabebd79..5f45d549b6 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/QueriesCompare.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/QueriesCompare.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.integration.action
 
-import org.apache.gluten.integration.{QueryRunner, Suite}
+import org.apache.gluten.integration.{Query, QueryRunner, Suite}
 import org.apache.gluten.integration.QueryRunner.QueryResult
 import org.apache.gluten.integration.action.Actions.QuerySelector
 import org.apache.gluten.integration.action.QueriesCompare.TestResultLine
@@ -35,23 +35,23 @@ case class QueriesCompare(
 
   override def execute(suite: Suite): Boolean = {
     val runner: QueryRunner =
-      new QueryRunner(suite.queryResource(), suite.dataSource(), 
suite.dataWritePath())
-    val runQueryIds = queries.select(suite)
+      new QueryRunner(suite.dataSource(), suite.dataWritePath())
+    val querySet = queries.select(suite)
     val sessionSwitcher = suite.sessionSwitcher
 
     sessionSwitcher.useSession("baseline", "Run Baseline Queries")
     runner.createTables(suite.tableCreator(), sessionSwitcher.spark())
     val baselineResults = (0 until iterations).flatMap {
       iteration =>
-        runQueryIds.map {
-          queryId =>
-            println(s"Running baseline query $queryId (iteration 
$iteration)...")
+        querySet.queries.map {
+          query =>
+            println(s"Running baseline query ${query.id} (iteration 
$iteration)...")
             try {
               QueriesCompare.runBaselineQuery(
                 runner,
                 sessionSwitcher.spark(),
                 suite.desc(),
-                queryId,
+                query,
                 explain)
             } finally {
               if (noSessionReuse) {
@@ -66,15 +66,15 @@ case class QueriesCompare(
     runner.createTables(suite.tableCreator(), sessionSwitcher.spark())
     val testResults = (0 until iterations).flatMap {
       iteration =>
-        runQueryIds.map {
-          queryId =>
-            println(s"Running test query $queryId (iteration $iteration)...")
+        querySet.queries.map {
+          query =>
+            println(s"Running test query ${query.id} (iteration 
$iteration)...")
             try {
               QueriesCompare.runTestQuery(
                 runner,
                 sessionSwitcher.spark(),
                 suite.desc(),
-                queryId,
+                query,
                 explain)
             } finally {
               if (noSessionReuse) {
@@ -225,10 +225,10 @@ object QueriesCompare {
       runner: QueryRunner,
       session: SparkSession,
       desc: String,
-      id: String,
+      query: Query,
       explain: Boolean): QueryResult = {
-    val testDesc = "Baseline %s [%s]".format(desc, id)
-    val result = runner.runQuery(session, testDesc, id, explain = explain)
+    val testDesc = "Baseline %s [%s]".format(desc, query.id)
+    val result = runner.runQuery(session, testDesc, query, explain = explain)
     result
   }
 
@@ -236,10 +236,10 @@ object QueriesCompare {
       runner: QueryRunner,
       session: SparkSession,
       desc: String,
-      id: String,
+      query: Query,
       explain: Boolean): QueryResult = {
-    val testDesc = "Query %s [%s]".format(desc, id)
-    val result = runner.runQuery(session, testDesc, id, explain = explain)
+    val testDesc = "Query %s [%s]".format(desc, query.id)
+    val result = runner.runQuery(session, testDesc, query, explain = explain)
     result
   }
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/SparkShell.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/SparkShell.scala
index 71f99f80a6..f920977eea 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/SparkShell.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/SparkShell.scala
@@ -24,7 +24,7 @@ case class SparkShell() extends Action {
   override def execute(suite: Suite): Boolean = {
     suite.sessionSwitcher.useSession("test", "Spark CLI")
     val runner: QueryRunner =
-      new QueryRunner(suite.queryResource(), suite.dataSource(), 
suite.dataWritePath())
+      new QueryRunner(suite.dataSource(), suite.dataWritePath())
     runner.createTables(suite.tableCreator(), suite.sessionSwitcher.spark())
     Main.sparkSession = suite.sessionSwitcher.spark()
     Main.sparkContext = suite.sessionSwitcher.spark().sparkContext
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
index ee32d47861..5e07211af3 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.integration.clickbench
 
-import org.apache.gluten.integration.{DataGen, Suite, TableCreator}
+import org.apache.gluten.integration.{DataGen, QuerySet, Suite, TableCreator}
 import org.apache.gluten.integration.action.Action
 import org.apache.gluten.integration.metrics.MetricMapper
 
@@ -87,9 +87,9 @@ class ClickBenchSuite(
     new ClickBenchDataGen(sessionSwitcher.spark(), dataWritePath())
   }
 
-  override private[integration] def queryResource(): String = 
"/clickbench-queries"
-
-  override private[integration] def allQueryIds(): Array[String] = 
ALL_QUERY_IDS
+  override private[integration] def allQueries(): QuerySet = {
+    QuerySet.readFromResource("/clickbench-queries", 
ClickBenchSuite.ALL_QUERY_IDS)
+  }
 
   override private[integration] def desc(): String = "ClickBench"
 
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
index 869041e268..3372856ba0 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
@@ -16,10 +16,8 @@
  */
 package org.apache.gluten.integration.ds
 
-import org.apache.gluten.integration.{DataGen, Suite, TableCreator}
+import org.apache.gluten.integration.{DataGen, QuerySet, Suite, TableCreator}
 import org.apache.gluten.integration.action.Action
-import org.apache.gluten.integration.ds.TpcdsSuite.{ALL_QUERY_IDS, 
HISTORY_WRITE_PATH, TPCDS_WRITE_RELATIVE_PATH}
-import org.apache.gluten.integration.h.TpchSuite.checkDataGenArgs
 import org.apache.gluten.integration.metrics.MetricMapper
 
 import org.apache.spark.SparkConf
@@ -99,12 +97,10 @@ class TpcdsSuite(
       genPartitionedData)
   }
 
-  override private[integration] def queryResource(): String = {
-    "/tpcds-queries"
+  override private[integration] def allQueries(): QuerySet = {
+    QuerySet.readFromResource("/tpcds-queries", TpcdsSuite.ALL_QUERY_IDS)
   }
 
-  override private[integration] def allQueryIds(): Array[String] = 
ALL_QUERY_IDS
-
   override private[integration] def desc(): String = "TPC-DS"
 
   override def tableCreator(): TableCreator = TableCreator.discoverSchema()
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
index a381550b32..2894d359ac 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
@@ -16,9 +16,8 @@
  */
 package org.apache.gluten.integration.h
 
-import org.apache.gluten.integration.{DataGen, Suite, TableCreator}
+import org.apache.gluten.integration.{DataGen, QuerySet, Suite, TableCreator}
 import org.apache.gluten.integration.action.Action
-import org.apache.gluten.integration.h.TpchSuite.{HISTORY_WRITE_PATH, 
TPCH_WRITE_RELATIVE_PATH}
 import org.apache.gluten.integration.metrics.MetricMapper
 
 import org.apache.spark.SparkConf
@@ -91,12 +90,10 @@ class TpchSuite(
       typeModifiers())
   }
 
-  override private[integration] def queryResource(): String = {
-    "/tpch-queries"
+  override private[integration] def allQueries(): QuerySet = {
+    QuerySet.readFromResource("/tpch-queries", TpchSuite.ALL_QUERY_IDS)
   }
 
-  override private[integration] def allQueryIds(): Array[String] = 
TpchSuite.ALL_QUERY_IDS
-
   override private[integration] def desc(): String = "TPC-H"
 
   override def tableCreator(): TableCreator = TableCreator.discoverSchema()
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala
index 09898b73bf..ed84746cb7 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql
 
+import org.apache.gluten.integration.Query
 import org.apache.gluten.integration.metrics.{MetricMapper, MetricTag, 
PlanMetric}
 
 import org.apache.spark.{SparkContext, Success, TaskKilled}
@@ -59,7 +60,7 @@ object SparkQueryRunner {
   def runQuery(
       spark: SparkSession,
       desc: String,
-      queryPath: String,
+      query: Query,
       explain: Boolean,
       metricMapper: MetricMapper,
       executorMetrics: Seq[String],
@@ -86,12 +87,11 @@ object SparkQueryRunner {
     }
     killTaskListener.foreach(sc.addSparkListener(_))
 
-    println(s"Executing SQL query from resource path $queryPath...")
+    println(s"Executing SQL query from resource path ${query.path}...")
     try {
       val tracker = new QueryPlanningTracker
-      val sql = resourceToString(queryPath)
       val prev = System.nanoTime()
-      val df = spark.sql(sql)
+      val df = spark.sql(query.sql)
       val rows = QueryPlanningTracker.withTracker(tracker) {
         df.collect()
       }
@@ -107,7 +107,7 @@ object SparkQueryRunner {
       val planMillis = sparkRulesMillis + otherRulesMillis
       val collectedExecutorMetrics =
         executorMetrics.map(name => (name, em.getMetricValue(name))).toMap
-      val collectedSQLMetrics = collectSQLMetrics(queryPath, metricMapper, 
df.queryExecution)
+      val collectedSQLMetrics = collectSQLMetrics(query.path, metricMapper, 
df.queryExecution)
       RunResult(
         rows,
         planMillis,
@@ -166,26 +166,6 @@ object SparkQueryRunner {
     }
     all.toSeq
   }
-
-  private def resourceToString(resource: String): String = {
-    val inStream = SparkQueryRunner.getClass.getResourceAsStream(resource)
-    Preconditions.checkNotNull(inStream)
-    val outStream = new ByteArrayOutputStream
-    try {
-      var reading = true
-      while (reading) {
-        inStream.read() match {
-          case -1 => reading = false
-          case c => outStream.write(c)
-        }
-      }
-      outStream.flush()
-    } finally {
-      inStream.close()
-    }
-    new String(outStream.toByteArray, StandardCharsets.UTF_8)
-  }
-
 }
 
 case class RunResult(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to