[ 
https://issues.apache.org/jira/browse/SPARK-26683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26683:
----------------------------------
    Fix Version/s:     (was: 3.0.0)

> Incorrect value of "internal.metrics.input.recordsRead" when reading from 
> temp hive table backed by HDFS file
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26683
>                 URL: https://issues.apache.org/jira/browse/SPARK-26683
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.1
>            Reporter: Amar Agrawal
>            Priority: Major
>         Attachments: asyncfactory.scala, input1.txt, input2.txt
>
>
> *Issue description*
> The summary of the issue is - when persisted DataFrame is used in two 
> different concurrent threads, we are getting wrong value of 
> *internal.metrics.input.recordsRead* in SparkListenerStageCompleted event.
>  
> *Issue Details* 
> The spark code I have written has 2 source temp hive tables. When the first 
> temp table is read, it's dataframe is persisted. Whereas, for the other temp 
> table, its source dataframe is not persisted. After that, we have 2 pipelines 
> which we run in async fashion. In the 1st pipeline, the persisted dataframe 
> is written to some hive target table. Whereas, in the 2nd pipeline, we are 
> performing a UNION of persisted dataframe with non-persisted dataframe, which 
> is then written to a separate hive table.
> Our expectation is, since the first dataframe is persisted, its metric for 
> recordsRead should be computed exactly once. But in our case, we are seeing 
> an increased value of the metric.
> Example - if my persisted dataframe has 2 rows, the above mentioned metric is 
> consistently reporting it as 3 rows.
>  
> *Steps to reproduce Issue:*
>  # Create directory /tmp/INFA_UNION1 and copy input1.txt to this directory.
>  # Create directory /tmp/INFA_UNION2 and copy input2.txt to this directory.
>  # Run the following in spark-shell:
> scala> :load asyncfactory.scala
> scala> : paste -raw
>  
> {code:java}
> package org.apache.spark
> import org.apache.spark.scheduler._
> import org.apache.spark.util.JsonProtocol
> import org.json4s.jackson.JsonMethods._
> class InfaListener(mode:String="ACCUMULATOR") extends 
> org.apache.spark.scheduler.SparkListener {
> def onEvent(event: SparkListenerEvent): Unit = {
> val jv = JsonProtocol.sparkEventToJson(event)
> println(compact(jv))
> }
> override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): 
> Unit = { onEvent(stageCompleted)}
> override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): 
> Unit = { onEvent(stageSubmitted)}
> }
> {code}
>  
> scala> :paste
> {code:java}
> import org.apache.spark.InfaListener
> implicit def df2idf(d:DataFrame):InfaDataFrame = new InfaDataFrame(d);
> val sqlc = spark.sqlContext
> val sc = spark.sparkContext
> val lis = new InfaListener("TAG")
> sc.addSparkListener(lis)
> sqlc.sql("DROP TABLE IF EXISTS `default`.`read1`")
> sqlc.sql("CREATE TABLE `default`.`read1` (`col0` STRING) LOCATION 
> '/tmp/INFA_UNION1'")
> sqlc.sql("DROP TABLE IF EXISTS `default`.`read2`")
> sqlc.sql("CREATE TABLE `default`.`read2` (`col0` STRING) LOCATION 
> '/tmp/INFA_UNION2'")
> sqlc.sql("DROP TABLE IF EXISTS `default`.`write1`")
> sqlc.sql("CREATE TABLE `default`.`write1` (`col0` STRING)")
> sqlc.sql("DROP TABLE IF EXISTS `default`.`write2`")
> sqlc.sql("CREATE TABLE `default`.`write2` (`col0` STRING)")
> val v0 = sqlc.sql("SELECT `read1`.`col0` as a0 FROM 
> `default`.`read1`").itoDF.persist(MEMORY_AND_DISK).where(lit(true));
> async(asBlock(sqlc.sql("INSERT OVERWRITE TABLE `default`.`write1` SELECT 
> tbl0.c0 as a0 FROM tbl0"), v0.unionAll(sqlc.sql("SELECT `read2`.`col0` as a0 
> FROM 
> `default`.`read2`").itoDF).itoDF("TGT_").itoDF("c").createOrReplaceTempView("tbl0")));
> async(asBlock(sqlc.sql("INSERT OVERWRITE TABLE `default`.`write2` SELECT 
> tbl1.c0 as a0 FROM tbl1"), 
> v0.itoDF("TGT_").itoDF("c").createOrReplaceTempView("tbl1")));
> stop;
> {code}
> *NOTE* - The above code refers to 2 file directories /tmp/INFA_UNION1 and 
> /tmp/INFA_UNION2. We have attached the files which need to be copied to the 
> above locations after these directories are created.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to