This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 75fcad5ae [VL] Improve checkNativeWrite logic in
VeloxParquetWriteForHiveSuite (#5496)
75fcad5ae is described below
commit 75fcad5aeedff6dce77d4759aed1d22332b2a72d
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Apr 25 16:14:10 2024 +0800
[VL] Improve checkNativeWrite logic in VeloxParquetWriteForHiveSuite (#5496)
---
.../execution/VeloxParquetWriteForHiveSuite.scala | 83 +++++++++-------------
1 file changed, 35 insertions(+), 48 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index bb338d530..9597e3110 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -16,17 +16,17 @@
*/
package org.apache.spark.sql.execution
-import org.apache.gluten.sql.shims.SparkShimLoader
-
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.{GlutenQueryTest, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.util.QueryExecutionListener
class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
private var _spark: SparkSession = null
@@ -79,24 +79,30 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest
with SQLTestUtils {
.set("spark.gluten.sql.native.writer.enabled", "true")
}
- private def checkNativeWrite(sqlStr: String, native: Boolean): Unit = {
- val testAppender = new LogAppender("native write tracker")
- withLogAppender(testAppender) {
- spark.sql(sqlStr)
+ private def checkNativeWrite(sqlStr: String, checkNative: Boolean): Unit = {
+ var nativeUsed = false
+ val queryListener = new QueryExecutionListener {
+ override def onFailure(f: String, qe: QueryExecution, e: Exception):
Unit = {}
+ override def onSuccess(funcName: String, qe: QueryExecution, duration:
Long): Unit = {
+ if (!nativeUsed) {
+ nativeUsed = if (isSparkVersionGE("3.4")) {
+
qe.executedPlan.find(_.isInstanceOf[VeloxColumnarWriteFilesExec]).isDefined
+ } else {
+ qe.executedPlan.find(_.isInstanceOf[FakeRowAdaptor]).isDefined
+ }
+ }
+ }
}
- assert(
- testAppender.loggingEvents.exists(
- _.getMessage.toString.contains("Use Gluten parquet write for hive"))
== native)
- }
-
- private def checkNativeStaticPartitionWrite(sqlStr: String, native:
Boolean): Unit = {
- val testAppender = new LogAppender("native write tracker")
- withLogAppender(testAppender) {
+ try {
+ spark.listenerManager.register(queryListener)
spark.sql(sqlStr)
+ spark.sparkContext.listenerBus.waitUntilEmpty()
+ if (checkNative) {
+ assert(nativeUsed)
+ }
+ } finally {
+ spark.listenerManager.unregister(queryListener)
}
- assert(
- testAppender.loggingEvents.exists(
- _.getMessage.toString.contains("Use Gluten partition write for hive"))
== native)
}
test("test hive static partition write table") {
@@ -105,21 +111,10 @@ class VeloxParquetWriteForHiveSuite extends
GlutenQueryTest with SQLTestUtils {
"CREATE TABLE t (c int, d long, e long)" +
" STORED AS PARQUET partitioned by (c, d)")
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
- if (
- SparkShimLoader.getSparkVersion.startsWith("3.4") ||
- SparkShimLoader.getSparkVersion.startsWith("3.5")
- ) {
- checkNativeStaticPartitionWrite(
- "INSERT OVERWRITE TABLE t partition(c=1, d=2)" +
- " SELECT 3 as e",
- native = false)
- } else {
- checkNativeStaticPartitionWrite(
- "INSERT OVERWRITE TABLE t partition(c=1, d=2)" +
- " SELECT 3 as e",
- native = true)
- }
-
+ checkNativeWrite(
+ "INSERT OVERWRITE TABLE t partition(c=1, d=2)" +
+ " SELECT 3 as e",
+ checkNative = true)
}
checkAnswer(spark.table("t"), Row(3, 1, 2))
}
@@ -131,10 +126,10 @@ class VeloxParquetWriteForHiveSuite extends
GlutenQueryTest with SQLTestUtils {
"CREATE TABLE t (c int, d long, e long)" +
" STORED AS PARQUET partitioned by (c, d)")
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
- checkNativeStaticPartitionWrite(
+ checkNativeWrite(
"INSERT OVERWRITE TABLE t partition(c=1, d)" +
" SELECT 3 as e, 2 as e",
- native = false)
+ checkNative = false)
}
checkAnswer(spark.table("t"), Row(3, 1, 2))
}
@@ -144,15 +139,11 @@ class VeloxParquetWriteForHiveSuite extends
GlutenQueryTest with SQLTestUtils {
withTable("t") {
spark.sql("CREATE TABLE t (c int) STORED AS PARQUET")
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
- if (
- SparkShimLoader.getSparkVersion.startsWith("3.4") ||
- SparkShimLoader.getSparkVersion.startsWith("3.5")
- ) {
- checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", native =
false)
+ if (isSparkVersionGE("3.4")) {
+ checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c",
checkNative = false)
} else {
- checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", native =
true)
+ checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c",
checkNative = true)
}
-
}
checkAnswer(spark.table("t"), Row(1))
}
@@ -163,25 +154,21 @@ class VeloxParquetWriteForHiveSuite extends
GlutenQueryTest with SQLTestUtils {
f =>
// compatible with Spark3.3 and later
withSQLConf("spark.sql.hive.convertMetastoreInsertDir" -> "false") {
- if (
- SparkShimLoader.getSparkVersion.startsWith("3.4") ||
- SparkShimLoader.getSparkVersion.startsWith("3.5")
- ) {
+ if (isSparkVersionGE("3.4")) {
checkNativeWrite(
s"""
|INSERT OVERWRITE DIRECTORY '${f.getCanonicalPath}' STORED AS
PARQUET SELECT 1 as c
|""".stripMargin,
- native = false
+ checkNative = false
)
} else {
checkNativeWrite(
s"""
|INSERT OVERWRITE DIRECTORY '${f.getCanonicalPath}' STORED AS
PARQUET SELECT 1 as c
|""".stripMargin,
- native = true
+ checkNative = true
)
}
-
checkAnswer(spark.read.parquet(f.getCanonicalPath), Row(1))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]