[ 
https://issues.apache.org/jira/browse/SPARK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-26974:
---------------------------------
    Priority: Major  (was: Blocker)

> Invalid data in grouped cached dataset, formed by joining a large cached 
> dataset with a small dataset
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26974
>                 URL: https://issues.apache.org/jira/browse/SPARK-26974
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API, Spark Core, SQL
>    Affects Versions: 2.2.0
>            Reporter: Utkarsh Sharma
>            Priority: Major
>              Labels: caching, data-corruption, data-integrity
>
> The initial datasets are derived from hive tables using the spark.table() 
> functions.
> Dataset descriptions:
> *+Sales+* dataset (close to 10 billion rows) with the following columns (and 
> sample rows) : 
> ||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)||
> |1|1|20|
> |1|2|30|
> |2|1|40|
>  
> +*Customer*+ Dataset (close to 50000 rows) with the following columns (and 
> sample rows):
> ||CustomerId (bigint)||CustomerGrpNbr (smallint)||
> |1|1|
> |2|2|
> |3|1|
>  
> I am doing the following steps:
>  # Caching sales dataset with close to 10 billion rows.
>  # Doing an inner join of 'sales' with 'customer' dataset
>  
>  # Doing group by on the resultant dataset, based on CustomerGrpNbr column to 
> get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
>  # Caching the resultant grouped dataset.
>  # Doing a .count() on the grouped dataset.
> The step 5 count is supposed to return only 20, because when you do a 
> customer.select("CustomerGroupNbr").distinct().count you get 20 values. 
> However, you get a value of around 65,000 in step 5.
> Following are the commands I am running in spark-shell:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, 
> "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), 
> stddev("qty_sold"))
> sales.cache()
> finalDf.cache()
> finalDf.count() // returns around 65k rows and the count keeps on varying 
> each run
> customer.select("CustomerGrpNbr").distinct().count() //returns 20{code}
> I have been able to replicate the same behavior using the java api as well. 
> This anamolous behavior disappears however, when I remove the caching 
> statements. I.e. if i run the following in spark-shell, it works as expected:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, 
> "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), 
> stddev("qty_sold")) 
> finalDf.count() // returns 20 
> customer.select("CustomerGrpNbr").distinct().count() //returns 20
> {code}
> The tables in hive from which the datasets are built do not change during 
> this entire process. So why does the caching cause this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to