wForget commented on code in PR #6793:
URL: https://github.com/apache/kyuubi/pull/6793#discussion_r1830748074
##########
extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala:
##########
@@ -99,23 +102,49 @@ trait KyuubiSparkSQLExtensionTest extends QueryTest
withListener(sql(sqlString))(callback)
}
- def withListener(df: => DataFrame)(callback: DataWritingCommand => Unit):
Unit = {
+ def withListener(df: => DataFrame)(
+ callback: DataWritingCommand => Unit,
+ failIfNotCallback: Boolean = true): Unit = {
+ val writes = Collections.synchronizedList(new
java.util.ArrayList[DataWritingCommand]())
+
val listener = new QueryExecutionListener {
override def onFailure(f: String, qe: QueryExecution, e: Exception):
Unit = {}
override def onSuccess(funcName: String, qe: QueryExecution, duration:
Long): Unit = {
- qe.executedPlan match {
- case write: DataWritingCommandExec => callback(write.cmd)
- case _ =>
+ def collectWrite(plan: SparkPlan): Unit = {
+ plan match {
+ case write: DataWritingCommandExec =>
+ writes.add(write.cmd)
+ case a: AdaptiveSparkPlanExec => collectWrite(a.executedPlan)
+ case _ =>
+ }
}
+ collectWrite(qe.executedPlan)
}
}
+ // Make sure the listener is registered after all previous events have
been processed
+ sparkContext.listenerBus.waitUntilEmpty()
spark.listenerManager.register(listener)
try {
df.collect()
sparkContext.listenerBus.waitUntilEmpty()
} finally {
spark.listenerManager.unregister(listener)
}
+ if (failIfNotCallback && writes.isEmpty) {
+ fail("No write command found")
+ }
+ writes.forEach(callback(_))
+ }
+
+ def collectRebalancePartitions(plan: LogicalPlan): Seq[RebalancePartitions]
= {
+ def collect(p: LogicalPlan): Seq[RebalancePartitions] = {
+ p.flatMap {
+ case r: RebalancePartitions => Seq(r)
+ case s: LogicalQueryStage => collect(s.logicalPlan)
Review Comment:
I guess it's because the logical plan is obtained by `write.cmd`
https://github.com/apache/kyuubi/blob/b4838b40e6a9074697918ccecd3dfe71cc52442d/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala#L108
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]