[ https://issues.apache.org/jira/browse/SPARK-26683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-26683: ---------------------------------- Fix Version/s: (was: 3.0.0) > Incorrect value of "internal.metrics.input.recordsRead" when reading from > temp hive table backed by HDFS file > ------------------------------------------------------------------------------------------------------------- > > Key: SPARK-26683 > URL: https://issues.apache.org/jira/browse/SPARK-26683 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.3.1 > Reporter: Amar Agrawal > Priority: Major > Attachments: asyncfactory.scala, input1.txt, input2.txt > > > *Issue description* > The summary of the issue is - when persisted DataFrame is used in two > different concurrent threads, we are getting wrong value of > *internal.metrics.input.recordsRead* in SparkListenerStageCompleted event. > > *Issue Details* > The spark code I have written has 2 source temp hive tables. When the first > temp table is read, it's dataframe is persisted. Whereas, for the other temp > table, its source dataframe is not persisted. After that, we have 2 pipelines > which we run in async fashion. In the 1st pipeline, the persisted dataframe > is written to some hive target table. Whereas, in the 2nd pipeline, we are > performing a UNION of persisted dataframe with non-persisted dataframe, which > is then written to a separate hive table. > Our expectation is, since the first dataframe is persisted, its metric for > recordsRead should be computed exactly once. But in our case, we are seeing > an increased value of the metric. > Example - if my persisted dataframe has 2 rows, the above mentioned metric is > consistently reporting it as 3 rows. > > *Steps to reproduce Issue:* > # Create directory /tmp/INFA_UNION1 and copy input1.txt to this directory. > # Create directory /tmp/INFA_UNION2 and copy input2.txt to this directory. > # Run the following in spark-shell: > scala> :load asyncfactory.scala > scala> : paste -raw > > {code:java} > package org.apache.spark > import org.apache.spark.scheduler._ > import org.apache.spark.util.JsonProtocol > import org.json4s.jackson.JsonMethods._ > class InfaListener(mode:String="ACCUMULATOR") extends > org.apache.spark.scheduler.SparkListener { > def onEvent(event: SparkListenerEvent): Unit = { > val jv = JsonProtocol.sparkEventToJson(event) > println(compact(jv)) > } > override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): > Unit = { onEvent(stageCompleted)} > override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): > Unit = { onEvent(stageSubmitted)} > } > {code} > > scala> :paste > {code:java} > import org.apache.spark.InfaListener > implicit def df2idf(d:DataFrame):InfaDataFrame = new InfaDataFrame(d); > val sqlc = spark.sqlContext > val sc = spark.sparkContext > val lis = new InfaListener("TAG") > sc.addSparkListener(lis) > sqlc.sql("DROP TABLE IF EXISTS `default`.`read1`") > sqlc.sql("CREATE TABLE `default`.`read1` (`col0` STRING) LOCATION > '/tmp/INFA_UNION1'") > sqlc.sql("DROP TABLE IF EXISTS `default`.`read2`") > sqlc.sql("CREATE TABLE `default`.`read2` (`col0` STRING) LOCATION > '/tmp/INFA_UNION2'") > sqlc.sql("DROP TABLE IF EXISTS `default`.`write1`") > sqlc.sql("CREATE TABLE `default`.`write1` (`col0` STRING)") > sqlc.sql("DROP TABLE IF EXISTS `default`.`write2`") > sqlc.sql("CREATE TABLE `default`.`write2` (`col0` STRING)") > val v0 = sqlc.sql("SELECT `read1`.`col0` as a0 FROM > `default`.`read1`").itoDF.persist(MEMORY_AND_DISK).where(lit(true)); > async(asBlock(sqlc.sql("INSERT OVERWRITE TABLE `default`.`write1` SELECT > tbl0.c0 as a0 FROM tbl0"), v0.unionAll(sqlc.sql("SELECT `read2`.`col0` as a0 > FROM > `default`.`read2`").itoDF).itoDF("TGT_").itoDF("c").createOrReplaceTempView("tbl0"))); > async(asBlock(sqlc.sql("INSERT OVERWRITE TABLE `default`.`write2` SELECT > tbl1.c0 as a0 FROM tbl1"), > v0.itoDF("TGT_").itoDF("c").createOrReplaceTempView("tbl1"))); > stop; > {code} > *NOTE* - The above code refers to 2 file directories /tmp/INFA_UNION1 and > /tmp/INFA_UNION2. We have attached the files which need to be copied to the > above locations after these directories are created. > -- This message was sent by Atlassian JIRA (v7.6.14#76016) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org