[ 
https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Moritz updated SPARK-20489:
--------------------------------
    Description: 
Running the following code (in Zeppelin, or spark-shell), I get different 
results, depending on whether I am using local[*] -mode or yarn-client mode:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._

val counter = 1 to 2
val size = 1 to 3
val sampleText = spark.createDataFrame(
    sc.parallelize(size)
    .map(Row(_)),
    StructType(Array(StructField("id", IntegerType, nullable=false))
        )
    )
    .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
    
val rddList = counter.map(
            count => sampleText
            .withColumn("loadDTS2", 
date_format(date_add(col("loadDTS"),count),"yyyy-MM-dd'T'HH:mm:ss.SSS"))
            .drop(col("loadDTS"))
            .withColumnRenamed("loadDTS2","loadDTS")
            .coalesce(4)
            .rdd
        )
val resultText = spark.createDataFrame(
    spark.sparkContext.union(rddList),
    sampleText.schema
)
val testGrouped = resultText.groupBy("id")
val timestamps = testGrouped.agg(
    max(unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS")) as "timestamp"
)
val loadDateResult = resultText.join(timestamps, "id")
val filteredresult = loadDateResult.filter($"timestamp" === 
unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS"))
filteredresult.count

The expected result, 3 is what I obtain in local mode, but as soon as I run 
fully distributed, I get 0. If Increase size to 1 to 32000, I do get some 
results (depending on the size of counter) - none of which makes any sense.

Up to the application of the last filter, at first glance everything looks 
okay, but then something goes wrong. Potentially this is due to lingering 
re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed 
mode. The generated execution plan is the same in each case, as expected.


  was:
Running the following code (in Zeppelin, but I assume spark-shell would be the 
same), I get different results, depending on whether I am using local[*] -mode 
or yarn-client mode:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._

val counter = 1 to 2
val size = 1 to 3
val sampleText = spark.createDataFrame(
    sc.parallelize(size)
    .map(Row(_)),
    StructType(Array(StructField("id", IntegerType, nullable=false))
        )
    )
    .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
    
val rddList = counter.map(
            count => sampleText
            .withColumn("loadDTS2", 
date_format(date_add(col("loadDTS"),count),"yyyy-MM-dd'T'HH:mm:ss.SSS"))
            .drop(col("loadDTS"))
            .withColumnRenamed("loadDTS2","loadDTS")
            .coalesce(4)
            .rdd
        )
val resultText = spark.createDataFrame(
    spark.sparkContext.union(rddList),
    sampleText.schema
)
val testGrouped = resultText.groupBy("id")
val timestamps = testGrouped.agg(
    max(unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS")) as "timestamp"
)
val loadDateResult = resultText.join(timestamps, "id")
val filteredresult = loadDateResult.filter($"timestamp" === 
unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS"))
filteredresult.count

The expected result, 3 is what I obtain in local mode, but as soon as I run 
fully distributed, I get 0. If Increase size to 1 to 32000, I do get some 
results (depending on the size of counter) - none of which makes any sense.

Up to the application of the last filter, at first glance everything looks 
okay, but then something goes wrong. Potentially this is due to lingering 
re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed 
mode. The generated execution plan is the same in each case, as expected.



> Different results in local mode and yarn mode when working with dates (race 
> condition with SimpleDateFormat?)
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-20489
>                 URL: https://issues.apache.org/jira/browse/SPARK-20489
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, Spark Core, SQL
>    Affects Versions: 2.0.0, 2.0.1, 2.0.2
>         Environment: yarn-client mode in Zeppelin
>            Reporter: Rick Moritz
>            Priority: Critical
>
> Running the following code (in Zeppelin, or spark-shell), I get different 
> results, depending on whether I am using local[*] -mode or yarn-client mode:
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import spark.implicits._
> val counter = 1 to 2
> val size = 1 to 3
> val sampleText = spark.createDataFrame(
>     sc.parallelize(size)
>     .map(Row(_)),
>     StructType(Array(StructField("id", IntegerType, nullable=false))
>         )
>     )
>     .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
>     
> val rddList = counter.map(
>             count => sampleText
>             .withColumn("loadDTS2", 
> date_format(date_add(col("loadDTS"),count),"yyyy-MM-dd'T'HH:mm:ss.SSS"))
>             .drop(col("loadDTS"))
>             .withColumnRenamed("loadDTS2","loadDTS")
>             .coalesce(4)
>             .rdd
>         )
> val resultText = spark.createDataFrame(
>     spark.sparkContext.union(rddList),
>     sampleText.schema
> )
> val testGrouped = resultText.groupBy("id")
> val timestamps = testGrouped.agg(
>     max(unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS")) as 
> "timestamp"
> )
> val loadDateResult = resultText.join(timestamps, "id")
> val filteredresult = loadDateResult.filter($"timestamp" === 
> unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS"))
> filteredresult.count
> The expected result, 3 is what I obtain in local mode, but as soon as I run 
> fully distributed, I get 0. If Increase size to 1 to 32000, I do get some 
> results (depending on the size of counter) - none of which makes any sense.
> Up to the application of the last filter, at first glance everything looks 
> okay, but then something goes wrong. Potentially this is due to lingering 
> re-use of SimpleDateFormats, but I can't get it to happen in a 
> non-distributed mode. The generated execution plan is the same in each case, 
> as expected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to