[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)

2017-05-09 Thread Rick Moritz (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16002762#comment-16002762
 ] 

Rick Moritz commented on SPARK-20489:
-

Okay, I had a look at timezones, and it appears my Driver was in a different 
timezone, than my executors. Having put all machines into UTC, I now get the 
expected result.
It helped, when I realized that unix timestamps aren't always in UTC, for some 
reason (thanks Jacek, 
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-Expression-UnixTimestamp.html).

I wonder what could be done to avoid these quite bizarre error situations. 
Especially in big data, where in theory clusters could reasonably span across 
timezones, and in particular drivers )in client modes, for example) could be 
well "outside" the actual cluster, relying on the local timezone settings 
appears to me to be quite fragile.

As an initial measure, I wonder how much effort it would be, to make sure that 
when dates are used, any discrepancy in timezone settings would result in an 
exception being thrown. I would consider that a reasonable assertion, with not 
too much overhead.

> Different results in local mode and yarn mode when working with dates (race 
> condition with SimpleDateFormat?)
> -
>
> Key: SPARK-20489
> URL: https://issues.apache.org/jira/browse/SPARK-20489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
> Environment: yarn-client mode in Zeppelin, Cloudera 
> Spark2-distribution
>Reporter: Rick Moritz
>Priority: Critical
>
> Running the following code (in Zeppelin, or spark-shell), I get different 
> results, depending on whether I am using local[*] -mode or yarn-client mode:
> {code:title=test case|borderStyle=solid}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import spark.implicits._
> val counter = 1 to 2
> val size = 1 to 3
> val sampleText = spark.createDataFrame(
> sc.parallelize(size)
> .map(Row(_)),
> StructType(Array(StructField("id", IntegerType, nullable=false))
> )
> )
> .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
> 
> val rddList = counter.map(
> count => sampleText
> .withColumn("loadDTS2", 
> date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS"))
> .drop(col("loadDTS"))
> .withColumnRenamed("loadDTS2","loadDTS")
> .coalesce(4)
> .rdd
> )
> val resultText = spark.createDataFrame(
> spark.sparkContext.union(rddList),
> sampleText.schema
> )
> val testGrouped = resultText.groupBy("id")
> val timestamps = testGrouped.agg(
> max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as 
> "timestamp"
> )
> val loadDateResult = resultText.join(timestamps, "id")
> val filteredresult = loadDateResult.filter($"timestamp" === 
> unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS"))
> filteredresult.count
> {code}
> The expected result, *3* is what I obtain in local mode, but as soon as I run 
> fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get 
> some results (depending on the size of counter) - none of which makes any 
> sense.
> Up to the application of the last filter, at first glance everything looks 
> okay, but then something goes wrong. Potentially this is due to lingering 
> re-use of SimpleDateFormats, but I can't get it to happen in a 
> non-distributed mode. The generated execution plan is the same in each case, 
> as expected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)

2017-05-05 Thread Rick Moritz (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15998228#comment-15998228
 ] 

Rick Moritz commented on SPARK-20489:
-

If someone could try and replicate my observations, I think that would be a 
great bit of help - the above code should run as-is.

> Different results in local mode and yarn mode when working with dates (race 
> condition with SimpleDateFormat?)
> -
>
> Key: SPARK-20489
> URL: https://issues.apache.org/jira/browse/SPARK-20489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
> Environment: yarn-client mode in Zeppelin, Cloudera 
> Spark2-distribution
>Reporter: Rick Moritz
>Priority: Critical
>
> Running the following code (in Zeppelin, or spark-shell), I get different 
> results, depending on whether I am using local[*] -mode or yarn-client mode:
> {code:title=test case|borderStyle=solid}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import spark.implicits._
> val counter = 1 to 2
> val size = 1 to 3
> val sampleText = spark.createDataFrame(
> sc.parallelize(size)
> .map(Row(_)),
> StructType(Array(StructField("id", IntegerType, nullable=false))
> )
> )
> .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
> 
> val rddList = counter.map(
> count => sampleText
> .withColumn("loadDTS2", 
> date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS"))
> .drop(col("loadDTS"))
> .withColumnRenamed("loadDTS2","loadDTS")
> .coalesce(4)
> .rdd
> )
> val resultText = spark.createDataFrame(
> spark.sparkContext.union(rddList),
> sampleText.schema
> )
> val testGrouped = resultText.groupBy("id")
> val timestamps = testGrouped.agg(
> max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as 
> "timestamp"
> )
> val loadDateResult = resultText.join(timestamps, "id")
> val filteredresult = loadDateResult.filter($"timestamp" === 
> unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS"))
> filteredresult.count
> {code}
> The expected result, *3* is what I obtain in local mode, but as soon as I run 
> fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get 
> some results (depending on the size of counter) - none of which makes any 
> sense.
> Up to the application of the last filter, at first glance everything looks 
> okay, but then something goes wrong. Potentially this is due to lingering 
> re-use of SimpleDateFormats, but I can't get it to happen in a 
> non-distributed mode. The generated execution plan is the same in each case, 
> as expected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)

2017-04-28 Thread Rick Moritz (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988400#comment-15988400
 ] 

Rick Moritz commented on SPARK-20489:
-

The timezone thing would make sense, if there were consistently no results - 
but with more data, there are *some* results - more or less determined by the 
number of different timestamps in the data
. This leads me to believe that [~frosner] may be on to something.

 Also note, that the timestamp/date matching is made in the same cluster, and 
at most the master machine had it's timezone adapted, the executors are all in 
the same TZ.

Anyway, here is the output:
+---+---+--+
|id |loadDTS|timestamp |
+---+---+--+
|1  |2017-04-27T00:00:00.000|1493269200|
|1  |2017-04-26T00:00:00.000|1493269200|
|3  |2017-04-26T00:00:00.000|1493269200|
|3  |2017-04-27T00:00:00.000|1493269200|
|2  |2017-04-27T00:00:00.000|1493269200|
|2  |2017-04-26T00:00:00.000|1493269200|
+---+---+--+

> Different results in local mode and yarn mode when working with dates (race 
> condition with SimpleDateFormat?)
> -
>
> Key: SPARK-20489
> URL: https://issues.apache.org/jira/browse/SPARK-20489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
> Environment: yarn-client mode in Zeppelin, Cloudera 
> Spark2-distribution
>Reporter: Rick Moritz
>Priority: Critical
>
> Running the following code (in Zeppelin, or spark-shell), I get different 
> results, depending on whether I am using local[*] -mode or yarn-client mode:
> {code:title=test case|borderStyle=solid}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import spark.implicits._
> val counter = 1 to 2
> val size = 1 to 3
> val sampleText = spark.createDataFrame(
> sc.parallelize(size)
> .map(Row(_)),
> StructType(Array(StructField("id", IntegerType, nullable=false))
> )
> )
> .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
> 
> val rddList = counter.map(
> count => sampleText
> .withColumn("loadDTS2", 
> date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS"))
> .drop(col("loadDTS"))
> .withColumnRenamed("loadDTS2","loadDTS")
> .coalesce(4)
> .rdd
> )
> val resultText = spark.createDataFrame(
> spark.sparkContext.union(rddList),
> sampleText.schema
> )
> val testGrouped = resultText.groupBy("id")
> val timestamps = testGrouped.agg(
> max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as 
> "timestamp"
> )
> val loadDateResult = resultText.join(timestamps, "id")
> val filteredresult = loadDateResult.filter($"timestamp" === 
> unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS"))
> filteredresult.count
> {code}
> The expected result, *3* is what I obtain in local mode, but as soon as I run 
> fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get 
> some results (depending on the size of counter) - none of which makes any 
> sense.
> Up to the application of the last filter, at first glance everything looks 
> okay, but then something goes wrong. Potentially this is due to lingering 
> re-use of SimpleDateFormats, but I can't get it to happen in a 
> non-distributed mode. The generated execution plan is the same in each case, 
> as expected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)

2017-04-28 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988350#comment-15988350
 ] 

Frank Rosner commented on SPARK-20489:
--

In 
https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L569
 we reuse a {{SimpleDateFormat}}. Is this {{eval}} function being called 
multi-threaded: 
https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L591?

If so, the results are completely non-deterministic and possibly wrong: 
https://twitter.com/FRosnerd/status/856994959425236992

Reason is that {{SimpleDateFormat}} is not mutable and not synchronized. Could 
that be related?

> Different results in local mode and yarn mode when working with dates (race 
> condition with SimpleDateFormat?)
> -
>
> Key: SPARK-20489
> URL: https://issues.apache.org/jira/browse/SPARK-20489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
> Environment: yarn-client mode in Zeppelin, Cloudera 
> Spark2-distribution
>Reporter: Rick Moritz
>Priority: Critical
>
> Running the following code (in Zeppelin, or spark-shell), I get different 
> results, depending on whether I am using local[*] -mode or yarn-client mode:
> {code:title=test case|borderStyle=solid}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import spark.implicits._
> val counter = 1 to 2
> val size = 1 to 3
> val sampleText = spark.createDataFrame(
> sc.parallelize(size)
> .map(Row(_)),
> StructType(Array(StructField("id", IntegerType, nullable=false))
> )
> )
> .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
> 
> val rddList = counter.map(
> count => sampleText
> .withColumn("loadDTS2", 
> date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS"))
> .drop(col("loadDTS"))
> .withColumnRenamed("loadDTS2","loadDTS")
> .coalesce(4)
> .rdd
> )
> val resultText = spark.createDataFrame(
> spark.sparkContext.union(rddList),
> sampleText.schema
> )
> val testGrouped = resultText.groupBy("id")
> val timestamps = testGrouped.agg(
> max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as 
> "timestamp"
> )
> val loadDateResult = resultText.join(timestamps, "id")
> val filteredresult = loadDateResult.filter($"timestamp" === 
> unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS"))
> filteredresult.count
> {code}
> The expected result, *3* is what I obtain in local mode, but as soon as I run 
> fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get 
> some results (depending on the size of counter) - none of which makes any 
> sense.
> Up to the application of the last filter, at first glance everything looks 
> okay, but then something goes wrong. Potentially this is due to lingering 
> re-use of SimpleDateFormats, but I can't get it to happen in a 
> non-distributed mode. The generated execution plan is the same in each case, 
> as expected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)

2017-04-27 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15987848#comment-15987848
 ] 

Shixiong Zhu commented on SPARK-20489:
--

Could you show the results of `loadDateResult.show(false)`? My hunch is it's a 
time zone issue.

> Different results in local mode and yarn mode when working with dates (race 
> condition with SimpleDateFormat?)
> -
>
> Key: SPARK-20489
> URL: https://issues.apache.org/jira/browse/SPARK-20489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
> Environment: yarn-client mode in Zeppelin, Cloudera 
> Spark2-distribution
>Reporter: Rick Moritz
>Priority: Critical
>
> Running the following code (in Zeppelin, or spark-shell), I get different 
> results, depending on whether I am using local[*] -mode or yarn-client mode:
> {code:title=test case|borderStyle=solid}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import spark.implicits._
> val counter = 1 to 2
> val size = 1 to 3
> val sampleText = spark.createDataFrame(
> sc.parallelize(size)
> .map(Row(_)),
> StructType(Array(StructField("id", IntegerType, nullable=false))
> )
> )
> .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
> 
> val rddList = counter.map(
> count => sampleText
> .withColumn("loadDTS2", 
> date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS"))
> .drop(col("loadDTS"))
> .withColumnRenamed("loadDTS2","loadDTS")
> .coalesce(4)
> .rdd
> )
> val resultText = spark.createDataFrame(
> spark.sparkContext.union(rddList),
> sampleText.schema
> )
> val testGrouped = resultText.groupBy("id")
> val timestamps = testGrouped.agg(
> max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as 
> "timestamp"
> )
> val loadDateResult = resultText.join(timestamps, "id")
> val filteredresult = loadDateResult.filter($"timestamp" === 
> unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS"))
> filteredresult.count
> {code}
> The expected result, *3* is what I obtain in local mode, but as soon as I run 
> fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get 
> some results (depending on the size of counter) - none of which makes any 
> sense.
> Up to the application of the last filter, at first glance everything looks 
> okay, but then something goes wrong. Potentially this is due to lingering 
> re-use of SimpleDateFormats, but I can't get it to happen in a 
> non-distributed mode. The generated execution plan is the same in each case, 
> as expected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org