[ https://issues.apache.org/jira/browse/SPARK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-26974: --------------------------------- Priority: Major (was: Blocker) > Invalid data in grouped cached dataset, formed by joining a large cached > dataset with a small dataset > ----------------------------------------------------------------------------------------------------- > > Key: SPARK-26974 > URL: https://issues.apache.org/jira/browse/SPARK-26974 > Project: Spark > Issue Type: Bug > Components: Java API, Spark Core, SQL > Affects Versions: 2.2.0 > Reporter: Utkarsh Sharma > Priority: Major > Labels: caching, data-corruption, data-integrity > > The initial datasets are derived from hive tables using the spark.table() > functions. > Dataset descriptions: > *+Sales+* dataset (close to 10 billion rows) with the following columns (and > sample rows) : > ||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)|| > |1|1|20| > |1|2|30| > |2|1|40| > > +*Customer*+ Dataset (close to 50000 rows) with the following columns (and > sample rows): > ||CustomerId (bigint)||CustomerGrpNbr (smallint)|| > |1|1| > |2|2| > |3|1| > > I am doing the following steps: > # Caching sales dataset with close to 10 billion rows. > # Doing an inner join of 'sales' with 'customer' dataset > > # Doing group by on the resultant dataset, based on CustomerGrpNbr column to > get sum(qty_sold) and stddev(qty_sold) vales in the customer groups. > # Caching the resultant grouped dataset. > # Doing a .count() on the grouped dataset. > The step 5 count is supposed to return only 20, because when you do a > customer.select("CustomerGroupNbr").distinct().count you get 20 values. > However, you get a value of around 65,000 in step 5. > Following are the commands I am running in spark-shell: > {code:java} > var sales = spark.table("sales_table") > var customer = spark.table("customer_table") > var finalDf = sales.join(customer, > "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), > stddev("qty_sold")) > sales.cache() > finalDf.cache() > finalDf.count() // returns around 65k rows and the count keeps on varying > each run > customer.select("CustomerGrpNbr").distinct().count() //returns 20{code} > I have been able to replicate the same behavior using the java api as well. > This anamolous behavior disappears however, when I remove the caching > statements. I.e. if i run the following in spark-shell, it works as expected: > {code:java} > var sales = spark.table("sales_table") > var customer = spark.table("customer_table") > var finalDf = sales.join(customer, > "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), > stddev("qty_sold")) > finalDf.count() // returns 20 > customer.select("CustomerGrpNbr").distinct().count() //returns 20 > {code} > The tables in hive from which the datasets are built do not change during > this entire process. So why does the caching cause this problem? -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org