[ https://issues.apache.org/jira/browse/SPARK-28125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-28125: --------------------------------- Labels: (was: correctness data-corruption data-integrity data-loss) > dataframes created by randomSplit have overlapping rows > ------------------------------------------------------- > > Key: SPARK-28125 > URL: https://issues.apache.org/jira/browse/SPARK-28125 > Project: Spark > Issue Type: Bug > Components: ML, MLlib, PySpark, Spark Core > Affects Versions: 2.4.3 > Environment: Run with Databricks Runtime 5.3 ML (includes Apache > Spark 2.4.0, Scala 2.11) > > More details on the environment: > [https://docs.databricks.com/release-notes/runtime/5.3ml.html] > The python package versions: > [https://docs.databricks.com/release-notes/runtime/5.3ml.html#python-libraries] > Reporter: Zachary > Priority: Blocker > > It appears that the function randomSplit on a DataFrame creates a separate > execution plan for each of the result DataFrames, or at least that's the > impression I get from reading a few StackOverflow pages on it: > [https://stackoverflow.com/questions/38379522/how-does-spark-keep-track-of-the-splits-in-randomsplit/38380023#38380023] > [https://stackoverflow.com/questions/32933143/how-does-sparks-rdd-randomsplit-actually-split-the-rdd/32933366] > > Because of the separate executions, it is easy to create a situation where > the Dataframes returned by randomSplit have overlapping rows. Thus if people > are relying on it to split a dataset into training and test, then they could > easily end up with the same rows in both sets, thus causing a serious problem > when running model evaluation. > > I know that if you call .cache() on the RDD before calling .randomSplit then > you can be assured that the returned frames have unique rows, but this > work-around is definitely not obvious. I did not know about this issue and > ended up creating improper data sets when doing model training and > evaluation. Something should be adjusted in .randomSplit so that under all > circumstances, the returned Dataframes will have unique rows. > > Here is a Pyspark script I wrote that re-creates the issue and includes the > work-around line that fixes it as a temporary workaround: > > {code:java} > import numpy as np > from pyspark.sql import Row > from pyspark.sql.functions import * > from pyspark.sql.types import * > N = 100000 > ratio1 = 0.85 > ratio2 = 0.15 > gen_rand = udf(lambda x: int(np.random.random()*50000 + 2), IntegerType()) > orig_list = list(np.zeros(N)) > rdd = sc.parallelize(orig_list).map(int).map(lambda x: {'ID': x}) > df = sqlContext.createDataFrame(rdd.map(lambda x: Row(**x))) > dfA = df.withColumn("ID2", gen_rand(df['ID'])) > orig_list = list(np.zeros(N)) > rdd = sc.parallelize(orig_list).map(int).map(lambda x: {'ID': x}) > df = sqlContext.createDataFrame(rdd.map(lambda x: Row(**x))) > dfA = df.withColumn("ID2", gen_rand(df['ID'])) > dfA = dfA.select("ID2").distinct() > dfA_els = dfA.rdd.map(lambda x: x['ID2']).collect() > print("This confirms that if you look at the parent Dataframe, the ID2 col > has unqiue values") > print("Num rows parent DF: {}".format(len(dfA_els))) > print("num unique ID2 vals: {}".format(len(set(dfA_els)))) > #dfA = dfA.cache() #Uncommenting this line does fix the issue > df1, df2 = dfA.randomSplit([ratio2, ratio1]) > df1_ids = set(df1.rdd.map(lambda x: x['ID2']).distinct().collect()) > df2_ids = set(df2.rdd.map(lambda x: x['ID2']).distinct().collect()) > num_inter = len(df1_ids.intersection(df2_ids)) > print() > print("Number common IDs between the two splits: {}".format(num_inter)) > print("(should be zero if randomSplit is working as expected)") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org