[ https://issues.apache.org/jira/browse/SPARK-24869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiao Li updated SPARK-24869: ---------------------------- Description: {code} withTable("t") { withTempPath { path => var numTotalCachedHit = 0 val listener = new QueryExecutionListener { override def onFailure(f: String, qe: QueryExecution, e: Exception):Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { qe.withCachedData match { case c: SaveIntoDataSourceCommand if c.query.isInstanceOf[InMemoryRelation] => numTotalCachedHit += 1 case _ => println(qe.withCachedData) } } } spark.listenerManager.register(listener) val udf1 = udf({ (x: Int, y: Int) => x + y }) val df = spark.range(0, 3).toDF("a") .withColumn("b", udf1(col("a"), lit(10))) df.cache() df.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", properties) assert(numTotalCachedHit == 1, "expected to be cached in jdbc") } } {code} was: {code} withTable("t") { withTempPath { path => var numTotalCachedHit = 0 val listener = new QueryExecutionListener { override def onFailure(f: String, qe: QueryExecution, e: Exception):Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { qe.withCachedData match { case c: SaveIntoDataSourceCommand if c.query.isInstanceOf[InMemoryRelation] => numTotalCachedHit += 1 case _ => println(qe.withCachedData) } } } spark.listenerManager.register(listener) val udf1 = udf({ (x: Int, y: Int) => x + y }) val df = spark.range(0, 3).toDF("a") .withColumn("b", udf1(col("a"), lit(10))) df.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", properties) assert(numTotalCachedHit == 1, "expected to be cached in jdbc") } } {code} > SaveIntoDataSourceCommand's input Dataset does not use Cached Data > ------------------------------------------------------------------ > > Key: SPARK-24869 > URL: https://issues.apache.org/jira/browse/SPARK-24869 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.1 > Reporter: Xiao Li > Priority: Major > > {code} > withTable("t") { > withTempPath { path => > var numTotalCachedHit = 0 > val listener = new QueryExecutionListener { > override def onFailure(f: String, qe: QueryExecution, e: > Exception):Unit = {} > override def onSuccess(funcName: String, qe: QueryExecution, > duration: Long): Unit = { > qe.withCachedData match { > case c: SaveIntoDataSourceCommand > if c.query.isInstanceOf[InMemoryRelation] => > numTotalCachedHit += 1 > case _ => > println(qe.withCachedData) > } > } > } > spark.listenerManager.register(listener) > val udf1 = udf({ (x: Int, y: Int) => x + y }) > val df = spark.range(0, 3).toDF("a") > .withColumn("b", udf1(col("a"), lit(10))) > df.cache() > df.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", > properties) > assert(numTotalCachedHit == 1, "expected to be cached in jdbc") > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org