[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rick Moritz updated SPARK-20489: -------------------------------- Description: Running the following code (in Zeppelin, but I assume spark-shell would be the same), I get different results, depending on whether I am using local[*] -mode or yarn-client mode: import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import spark.implicits._ val counter = 1 to 2 val size = 1 to 3 val sampleText = spark.createDataFrame( sc.parallelize(size) .map(Row(_)), StructType(Array(StructField("id", IntegerType, nullable=false)) ) ) .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) val rddList = counter.map( count => sampleText .withColumn("loadDTS2", date_format(date_add(col("loadDTS"),count),"yyyy-MM-dd'T'HH:mm:ss.SSS")) .drop(col("loadDTS")) .withColumnRenamed("loadDTS2","loadDTS") .coalesce(4) .rdd ) val resultText = spark.createDataFrame( spark.sparkContext.union(rddList), sampleText.schema ) val testGrouped = resultText.groupBy("id") val timestamps = testGrouped.agg( max(unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS")) as "timestamp" ) val loadDateResult = resultText.join(timestamps, "id") val filteredresult = loadDateResult.filter($"timestamp" === unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS")) filteredresult.count The expected result, 3 is what I obtain in local mode, but as soon as I run fully distributed, I get 0. If Increase size to 1 to 32000, I do get some results (depending on the size of counter) - none of which makes any sense. Up to the application of the last filter, at first glance everything looks okay, but then something goes wrong. Potentially this is due to lingering re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed mode. The generated execution plan is the same in each case, as expected. was: Running the following code (in Zeppelin, but I assume spark-shell would be the same), I get different results, depending on whether I am using local[*] -mode or yarn-client mode: import org.apache.spark.sql.Row import spark.implicits._ val counter = 1 to 2 val size = 1 to 3 val sampleText = spark.createDataFrame( sc.parallelize(size) .map(Row(_)), StructType(Array(StructField("id", IntegerType, nullable=false)) ) ) .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) val rddList = counter.map( count => sampleText .withColumn("loadDTS2", date_format(date_add(col("loadDTS"),count),"yyyy-MM-dd'T'HH:mm:ss.SSS")) .drop(col("loadDTS")) .withColumnRenamed("loadDTS2","loadDTS") .coalesce(4) .rdd ) val resultText = spark.createDataFrame( spark.sparkContext.union(rddList), sampleText.schema ) val testGrouped = resultText.groupBy("id") val timestamps = testGrouped.agg( max(unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS")) as "timestamp" ) val loadDateResult = resultText.join(timestamps, "id") val filteredresult = loadDateResult.filter($"timestamp" === unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS")) filteredresult.count The expected result, 3 is what I obtain in local mode, but as soon as I run fully distributed, I get 0. If Increase size to 1 to 32000, I do get some results (depending on the size of counter) - none of which makes any sense. Up to the application of the last filter, at first glance everything looks okay, but then something goes wrong. Potentially this is due to lingering re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed mode. The generated execution plan is the same in each case, as expected. > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > ------------------------------------------------------------------------------------------------------------- > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core, SQL > Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin > Reporter: Rick Moritz > Priority: Critical > > Running the following code (in Zeppelin, but I assume spark-shell would be > the same), I get different results, depending on whether I am using local[*] > -mode or yarn-client mode: > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"yyyy-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > The expected result, 3 is what I obtain in local mode, but as soon as I run > fully distributed, I get 0. If Increase size to 1 to 32000, I do get some > results (depending on the size of counter) - none of which makes any sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org