[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988350#comment-15988350 ]
Frank Rosner commented on SPARK-20489: -------------------------------------- In https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L569 we reuse a {{SimpleDateFormat}}. Is this {{eval}} function being called multi-threaded: https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L591? If so, the results are completely non-deterministic and possibly wrong: https://twitter.com/FRosnerd/status/856994959425236992 Reason is that {{SimpleDateFormat}} is not mutable and not synchronized. Could that be related? > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > ------------------------------------------------------------------------------------------------------------- > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution > Reporter: Rick Moritz > Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"yyyy-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org