[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16002762#comment-16002762 ] Rick Moritz commented on SPARK-20489: - Okay, I had a look at timezones, and it appears my Driver was in a different timezone, than my executors. Having put all machines into UTC, I now get the expected result. It helped, when I realized that unix timestamps aren't always in UTC, for some reason (thanks Jacek, https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-Expression-UnixTimestamp.html). I wonder what could be done to avoid these quite bizarre error situations. Especially in big data, where in theory clusters could reasonably span across timezones, and in particular drivers )in client modes, for example) could be well "outside" the actual cluster, relying on the local timezone settings appears to me to be quite fragile. As an initial measure, I wonder how much effort it would be, to make sure that when dates are used, any discrepancy in timezone settings would result in an exception being thrown. I would consider that a reasonable assertion, with not too much overhead. > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15998228#comment-15998228 ] Rick Moritz commented on SPARK-20489: - If someone could try and replicate my observations, I think that would be a great bit of help - the above code should run as-is. > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988400#comment-15988400 ] Rick Moritz commented on SPARK-20489: - The timezone thing would make sense, if there were consistently no results - but with more data, there are *some* results - more or less determined by the number of different timestamps in the data . This leads me to believe that [~frosner] may be on to something. Also note, that the timestamp/date matching is made in the same cluster, and at most the master machine had it's timezone adapted, the executors are all in the same TZ. Anyway, here is the output: +---+---+--+ |id |loadDTS|timestamp | +---+---+--+ |1 |2017-04-27T00:00:00.000|1493269200| |1 |2017-04-26T00:00:00.000|1493269200| |3 |2017-04-26T00:00:00.000|1493269200| |3 |2017-04-27T00:00:00.000|1493269200| |2 |2017-04-27T00:00:00.000|1493269200| |2 |2017-04-26T00:00:00.000|1493269200| +---+---+--+ > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988350#comment-15988350 ] Frank Rosner commented on SPARK-20489: -- In https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L569 we reuse a {{SimpleDateFormat}}. Is this {{eval}} function being called multi-threaded: https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L591? If so, the results are completely non-deterministic and possibly wrong: https://twitter.com/FRosnerd/status/856994959425236992 Reason is that {{SimpleDateFormat}} is not mutable and not synchronized. Could that be related? > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15987848#comment-15987848 ] Shixiong Zhu commented on SPARK-20489: -- Could you show the results of `loadDateResult.show(false)`? My hunch is it's a time zone issue. > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org