[jira] [Comment Edited] (SPARK-20489) Different results in local mode and yarn mode when working with dates (silent corruption due to system timezone setting)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010304#comment-16010304 ] Rick Moritz edited comment on SPARK-20489 at 5/15/17 10:40 AM: --- I think what happens is the following: {{val timestamps = testGrouped.agg(max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp")}} travels across the driver and therefore the result is tainted by the bad system/default time zone, and won't be compatible with what the final filter calculates on each executor. was (Author: rpcmoritz): I think what happens is the following: {{val timestamps = testGrouped.agg(max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp")}} travels across the driver and therefore the result is tainted, and won't be compatible with what the final filter calculates on each executor. > Different results in local mode and yarn mode when working with dates (silent > corruption due to system timezone setting) > > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20489) Different results in local mode and yarn mode when working with dates (silent corruption due to system timezone setting)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010304#comment-16010304 ] Rick Moritz edited comment on SPARK-20489 at 5/15/17 10:39 AM: --- I think what happens is the following: {{val timestamps = testGrouped.agg(max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp")}} travels across the driver and therefore the result is tainted, and won't be compatible with what the final filter calculates on each executor. was (Author: rpcmoritz): I think what happens is the following: ```val timestamps = testGrouped.agg(max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp")``` travels across the driver and therefore the result is tainted, and won't be compatible with what the final filter calculates on each executor. > Different results in local mode and yarn mode when working with dates (silent > corruption due to system timezone setting) > > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (silent corruption due to system timezone setting)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010304#comment-16010304 ] Rick Moritz commented on SPARK-20489: - I think what happens is the following: ```val timestamps = testGrouped.agg(max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp")``` travels across the driver and therefore the result is tainted, and won't be compatible with what the final filter calculates on each executor. > Different results in local mode and yarn mode when working with dates (silent > corruption due to system timezone setting) > > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20489) Different results in local mode and yarn mode when working with dates (silent corruption due to system timezone setting)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Moritz updated SPARK-20489: Summary: Different results in local mode and yarn mode when working with dates (silent corruption due to system timezone setting) (was: Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)) > Different results in local mode and yarn mode when working with dates (silent > corruption due to system timezone setting) > > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16002762#comment-16002762 ] Rick Moritz commented on SPARK-20489: - Okay, I had a look at timezones, and it appears my Driver was in a different timezone, than my executors. Having put all machines into UTC, I now get the expected result. It helped, when I realized that unix timestamps aren't always in UTC, for some reason (thanks Jacek, https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-Expression-UnixTimestamp.html). I wonder what could be done to avoid these quite bizarre error situations. Especially in big data, where in theory clusters could reasonably span across timezones, and in particular drivers )in client modes, for example) could be well "outside" the actual cluster, relying on the local timezone settings appears to me to be quite fragile. As an initial measure, I wonder how much effort it would be, to make sure that when dates are used, any discrepancy in timezone settings would result in an exception being thrown. I would consider that a reasonable assertion, with not too much overhead. > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15998228#comment-15998228 ] Rick Moritz commented on SPARK-20489: - If someone could try and replicate my observations, I think that would be a great bit of help - the above code should run as-is. > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988400#comment-15988400 ] Rick Moritz edited comment on SPARK-20489 at 4/28/17 8:18 AM: -- The timezone thing would make sense, if there were consistently no results - but with more data, there are *some* results - more or less determined by the number of different timestamps in the data . This leads me to believe that [~frosner] may be on to something. Also note, that the timestamp/date matching is made in the same cluster, and at most the master machine had it's timezone adapted, the executors are all in the same TZ. Anyway, here is the output: {noformat} +---+---+--+ |id |loadDTS|timestamp | +---+---+--+ |1 |2017-04-27T00:00:00.000|1493269200| |1 |2017-04-26T00:00:00.000|1493269200| |3 |2017-04-26T00:00:00.000|1493269200| |3 |2017-04-27T00:00:00.000|1493269200| |2 |2017-04-27T00:00:00.000|1493269200| |2 |2017-04-26T00:00:00.000|1493269200| +---+---+--+ {noformat} was (Author: rpcmoritz): The timezone thing would make sense, if there were consistently no results - but with more data, there are *some* results - more or less determined by the number of different timestamps in the data . This leads me to believe that [~frosner] may be on to something. Also note, that the timestamp/date matching is made in the same cluster, and at most the master machine had it's timezone adapted, the executors are all in the same TZ. Anyway, here is the output: {{ +---+---+--+ |id |loadDTS|timestamp | +---+---+--+ |1 |2017-04-27T00:00:00.000|1493269200| |1 |2017-04-26T00:00:00.000|1493269200| |3 |2017-04-26T00:00:00.000|1493269200| |3 |2017-04-27T00:00:00.000|1493269200| |2 |2017-04-27T00:00:00.000|1493269200| |2 |2017-04-26T00:00:00.000|1493269200| +---+---+--+ }} > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988400#comment-15988400 ] Rick Moritz edited comment on SPARK-20489 at 4/28/17 8:17 AM: -- The timezone thing would make sense, if there were consistently no results - but with more data, there are *some* results - more or less determined by the number of different timestamps in the data . This leads me to believe that [~frosner] may be on to something. Also note, that the timestamp/date matching is made in the same cluster, and at most the master machine had it's timezone adapted, the executors are all in the same TZ. Anyway, here is the output: {{ +---+---+--+ |id |loadDTS|timestamp | +---+---+--+ |1 |2017-04-27T00:00:00.000|1493269200| |1 |2017-04-26T00:00:00.000|1493269200| |3 |2017-04-26T00:00:00.000|1493269200| |3 |2017-04-27T00:00:00.000|1493269200| |2 |2017-04-27T00:00:00.000|1493269200| |2 |2017-04-26T00:00:00.000|1493269200| +---+---+--+ }} was (Author: rpcmoritz): The timezone thing would make sense, if there were consistently no results - but with more data, there are *some* results - more or less determined by the number of different timestamps in the data . This leads me to believe that [~frosner] may be on to something. Also note, that the timestamp/date matching is made in the same cluster, and at most the master machine had it's timezone adapted, the executors are all in the same TZ. Anyway, here is the output: +---+---+--+ |id |loadDTS|timestamp | +---+---+--+ |1 |2017-04-27T00:00:00.000|1493269200| |1 |2017-04-26T00:00:00.000|1493269200| |3 |2017-04-26T00:00:00.000|1493269200| |3 |2017-04-27T00:00:00.000|1493269200| |2 |2017-04-27T00:00:00.000|1493269200| |2 |2017-04-26T00:00:00.000|1493269200| +---+---+--+ > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988400#comment-15988400 ] Rick Moritz commented on SPARK-20489: - The timezone thing would make sense, if there were consistently no results - but with more data, there are *some* results - more or less determined by the number of different timestamps in the data . This leads me to believe that [~frosner] may be on to something. Also note, that the timestamp/date matching is made in the same cluster, and at most the master machine had it's timezone adapted, the executors are all in the same TZ. Anyway, here is the output: +---+---+--+ |id |loadDTS|timestamp | +---+---+--+ |1 |2017-04-27T00:00:00.000|1493269200| |1 |2017-04-26T00:00:00.000|1493269200| |3 |2017-04-26T00:00:00.000|1493269200| |3 |2017-04-27T00:00:00.000|1493269200| |2 |2017-04-27T00:00:00.000|1493269200| |2 |2017-04-26T00:00:00.000|1493269200| +---+---+--+ > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > {code} > The expected result, *3* is what I obtain in local mode, but as soon as I run > fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get > some results (depending on the size of counter) - none of which makes any > sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Moritz updated SPARK-20489: Description: Running the following code (in Zeppelin, or spark-shell), I get different results, depending on whether I am using local[*] -mode or yarn-client mode: {code:title=test case|borderStyle=solid} import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import spark.implicits._ val counter = 1 to 2 val size = 1 to 3 val sampleText = spark.createDataFrame( sc.parallelize(size) .map(Row(_)), StructType(Array(StructField("id", IntegerType, nullable=false)) ) ) .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) val rddList = counter.map( count => sampleText .withColumn("loadDTS2", date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) .drop(col("loadDTS")) .withColumnRenamed("loadDTS2","loadDTS") .coalesce(4) .rdd ) val resultText = spark.createDataFrame( spark.sparkContext.union(rddList), sampleText.schema ) val testGrouped = resultText.groupBy("id") val timestamps = testGrouped.agg( max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp" ) val loadDateResult = resultText.join(timestamps, "id") val filteredresult = loadDateResult.filter($"timestamp" === unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) filteredresult.count {code} The expected result, *3* is what I obtain in local mode, but as soon as I run fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get some results (depending on the size of counter) - none of which makes any sense. Up to the application of the last filter, at first glance everything looks okay, but then something goes wrong. Potentially this is due to lingering re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed mode. The generated execution plan is the same in each case, as expected. was: Running the following code (in Zeppelin, or spark-shell), I get different results, depending on whether I am using local[*] -mode or yarn-client mode: {code:title=test case|borderStyle=solid} import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import spark.implicits._ val counter = 1 to 2 val size = 1 to 3 val sampleText = spark.createDataFrame( sc.parallelize(size) .map(Row(_)), StructType(Array(StructField("id", IntegerType, nullable=false)) ) ) .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) val rddList = counter.map( count => sampleText .withColumn("loadDTS2", date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) .drop(col("loadDTS")) .withColumnRenamed("loadDTS2","loadDTS") .coalesce(4) .rdd ) val resultText = spark.createDataFrame( spark.sparkContext.union(rddList), sampleText.schema ) val testGrouped = resultText.groupBy("id") val timestamps = testGrouped.agg( max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp" ) val loadDateResult = resultText.join(timestamps, "id") val filteredresult = loadDateResult.filter($"timestamp" === unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) filteredresult.count {code} The expected result, 3 is what I obtain in local mode, but as soon as I run fully distributed, I get 0. If Increase size to 1 to 32000, I do get some results (depending on the size of counter) - none of which makes any sense. Up to the application of the last filter, at first glance everything looks okay, but then something goes wrong. Potentially this is due to lingering re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed mode. The generated execution plan is the same in each case, as expected. > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core, SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( >
[jira] [Updated] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Moritz updated SPARK-20489: Description: Running the following code (in Zeppelin, or spark-shell), I get different results, depending on whether I am using local[*] -mode or yarn-client mode: {code:title=test case|borderStyle=solid} import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import spark.implicits._ val counter = 1 to 2 val size = 1 to 3 val sampleText = spark.createDataFrame( sc.parallelize(size) .map(Row(_)), StructType(Array(StructField("id", IntegerType, nullable=false)) ) ) .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) val rddList = counter.map( count => sampleText .withColumn("loadDTS2", date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) .drop(col("loadDTS")) .withColumnRenamed("loadDTS2","loadDTS") .coalesce(4) .rdd ) val resultText = spark.createDataFrame( spark.sparkContext.union(rddList), sampleText.schema ) val testGrouped = resultText.groupBy("id") val timestamps = testGrouped.agg( max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp" ) val loadDateResult = resultText.join(timestamps, "id") val filteredresult = loadDateResult.filter($"timestamp" === unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) filteredresult.count {code} The expected result, 3 is what I obtain in local mode, but as soon as I run fully distributed, I get 0. If Increase size to 1 to 32000, I do get some results (depending on the size of counter) - none of which makes any sense. Up to the application of the last filter, at first glance everything looks okay, but then something goes wrong. Potentially this is due to lingering re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed mode. The generated execution plan is the same in each case, as expected. was: Running the following code (in Zeppelin, or spark-shell), I get different results, depending on whether I am using local[*] -mode or yarn-client mode: import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import spark.implicits._ val counter = 1 to 2 val size = 1 to 3 val sampleText = spark.createDataFrame( sc.parallelize(size) .map(Row(_)), StructType(Array(StructField("id", IntegerType, nullable=false)) ) ) .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) val rddList = counter.map( count => sampleText .withColumn("loadDTS2", date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) .drop(col("loadDTS")) .withColumnRenamed("loadDTS2","loadDTS") .coalesce(4) .rdd ) val resultText = spark.createDataFrame( spark.sparkContext.union(rddList), sampleText.schema ) val testGrouped = resultText.groupBy("id") val timestamps = testGrouped.agg( max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp" ) val loadDateResult = resultText.join(timestamps, "id") val filteredresult = loadDateResult.filter($"timestamp" === unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) filteredresult.count The expected result, 3 is what I obtain in local mode, but as soon as I run fully distributed, I get 0. If Increase size to 1 to 32000, I do get some results (depending on the size of counter) - none of which makes any sense. Up to the application of the last filter, at first glance everything looks okay, but then something goes wrong. Potentially this is due to lingering re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed mode. The generated execution plan is the same in each case, as expected. > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core, SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > {code:title=test case|borderStyle=solid} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), >
[jira] [Updated] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Moritz updated SPARK-20489: Environment: yarn-client mode in Zeppelin, Cloudera Spark2-distribution (was: yarn-client mode in Zeppelin) > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core, SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin, Cloudera > Spark2-distribution >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) > .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) > > val rddList = counter.map( > count => sampleText > .withColumn("loadDTS2", > date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) > .drop(col("loadDTS")) > .withColumnRenamed("loadDTS2","loadDTS") > .coalesce(4) > .rdd > ) > val resultText = spark.createDataFrame( > spark.sparkContext.union(rddList), > sampleText.schema > ) > val testGrouped = resultText.groupBy("id") > val timestamps = testGrouped.agg( > max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as > "timestamp" > ) > val loadDateResult = resultText.join(timestamps, "id") > val filteredresult = loadDateResult.filter($"timestamp" === > unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) > filteredresult.count > The expected result, 3 is what I obtain in local mode, but as soon as I run > fully distributed, I get 0. If Increase size to 1 to 32000, I do get some > results (depending on the size of counter) - none of which makes any sense. > Up to the application of the last filter, at first glance everything looks > okay, but then something goes wrong. Potentially this is due to lingering > re-use of SimpleDateFormats, but I can't get it to happen in a > non-distributed mode. The generated execution plan is the same in each case, > as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Moritz updated SPARK-20489: Description: Running the following code (in Zeppelin, or spark-shell), I get different results, depending on whether I am using local[*] -mode or yarn-client mode: import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import spark.implicits._ val counter = 1 to 2 val size = 1 to 3 val sampleText = spark.createDataFrame( sc.parallelize(size) .map(Row(_)), StructType(Array(StructField("id", IntegerType, nullable=false)) ) ) .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) val rddList = counter.map( count => sampleText .withColumn("loadDTS2", date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) .drop(col("loadDTS")) .withColumnRenamed("loadDTS2","loadDTS") .coalesce(4) .rdd ) val resultText = spark.createDataFrame( spark.sparkContext.union(rddList), sampleText.schema ) val testGrouped = resultText.groupBy("id") val timestamps = testGrouped.agg( max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp" ) val loadDateResult = resultText.join(timestamps, "id") val filteredresult = loadDateResult.filter($"timestamp" === unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) filteredresult.count The expected result, 3 is what I obtain in local mode, but as soon as I run fully distributed, I get 0. If Increase size to 1 to 32000, I do get some results (depending on the size of counter) - none of which makes any sense. Up to the application of the last filter, at first glance everything looks okay, but then something goes wrong. Potentially this is due to lingering re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed mode. The generated execution plan is the same in each case, as expected. was: Running the following code (in Zeppelin, but I assume spark-shell would be the same), I get different results, depending on whether I am using local[*] -mode or yarn-client mode: import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import spark.implicits._ val counter = 1 to 2 val size = 1 to 3 val sampleText = spark.createDataFrame( sc.parallelize(size) .map(Row(_)), StructType(Array(StructField("id", IntegerType, nullable=false)) ) ) .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) val rddList = counter.map( count => sampleText .withColumn("loadDTS2", date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) .drop(col("loadDTS")) .withColumnRenamed("loadDTS2","loadDTS") .coalesce(4) .rdd ) val resultText = spark.createDataFrame( spark.sparkContext.union(rddList), sampleText.schema ) val testGrouped = resultText.groupBy("id") val timestamps = testGrouped.agg( max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp" ) val loadDateResult = resultText.join(timestamps, "id") val filteredresult = loadDateResult.filter($"timestamp" === unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) filteredresult.count The expected result, 3 is what I obtain in local mode, but as soon as I run fully distributed, I get 0. If Increase size to 1 to 32000, I do get some results (depending on the size of counter) - none of which makes any sense. Up to the application of the last filter, at first glance everything looks okay, but then something goes wrong. Potentially this is due to lingering re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed mode. The generated execution plan is the same in each case, as expected. > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core, SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, or spark-shell), I get different > results, depending on whether I am using local[*] -mode or yarn-client mode: > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) >
[jira] [Updated] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Moritz updated SPARK-20489: Description: Running the following code (in Zeppelin, but I assume spark-shell would be the same), I get different results, depending on whether I am using local[*] -mode or yarn-client mode: import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import spark.implicits._ val counter = 1 to 2 val size = 1 to 3 val sampleText = spark.createDataFrame( sc.parallelize(size) .map(Row(_)), StructType(Array(StructField("id", IntegerType, nullable=false)) ) ) .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) val rddList = counter.map( count => sampleText .withColumn("loadDTS2", date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) .drop(col("loadDTS")) .withColumnRenamed("loadDTS2","loadDTS") .coalesce(4) .rdd ) val resultText = spark.createDataFrame( spark.sparkContext.union(rddList), sampleText.schema ) val testGrouped = resultText.groupBy("id") val timestamps = testGrouped.agg( max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp" ) val loadDateResult = resultText.join(timestamps, "id") val filteredresult = loadDateResult.filter($"timestamp" === unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) filteredresult.count The expected result, 3 is what I obtain in local mode, but as soon as I run fully distributed, I get 0. If Increase size to 1 to 32000, I do get some results (depending on the size of counter) - none of which makes any sense. Up to the application of the last filter, at first glance everything looks okay, but then something goes wrong. Potentially this is due to lingering re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed mode. The generated execution plan is the same in each case, as expected. was: Running the following code (in Zeppelin, but I assume spark-shell would be the same), I get different results, depending on whether I am using local[*] -mode or yarn-client mode: import org.apache.spark.sql.Row import spark.implicits._ val counter = 1 to 2 val size = 1 to 3 val sampleText = spark.createDataFrame( sc.parallelize(size) .map(Row(_)), StructType(Array(StructField("id", IntegerType, nullable=false)) ) ) .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) val rddList = counter.map( count => sampleText .withColumn("loadDTS2", date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) .drop(col("loadDTS")) .withColumnRenamed("loadDTS2","loadDTS") .coalesce(4) .rdd ) val resultText = spark.createDataFrame( spark.sparkContext.union(rddList), sampleText.schema ) val testGrouped = resultText.groupBy("id") val timestamps = testGrouped.agg( max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp" ) val loadDateResult = resultText.join(timestamps, "id") val filteredresult = loadDateResult.filter($"timestamp" === unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) filteredresult.count The expected result, 3 is what I obtain in local mode, but as soon as I run fully distributed, I get 0. If Increase size to 1 to 32000, I do get some results (depending on the size of counter) - none of which makes any sense. Up to the application of the last filter, at first glance everything looks okay, but then something goes wrong. Potentially this is due to lingering re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed mode. The generated execution plan is the same in each case, as expected. > Different results in local mode and yarn mode when working with dates (race > condition with SimpleDateFormat?) > - > > Key: SPARK-20489 > URL: https://issues.apache.org/jira/browse/SPARK-20489 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core, SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Environment: yarn-client mode in Zeppelin >Reporter: Rick Moritz >Priority: Critical > > Running the following code (in Zeppelin, but I assume spark-shell would be > the same), I get different results, depending on whether I am using local[*] > -mode or yarn-client mode: > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > import spark.implicits._ > val counter = 1 to 2 > val size = 1 to 3 > val sampleText = spark.createDataFrame( > sc.parallelize(size) > .map(Row(_)), > StructType(Array(StructField("id", IntegerType, nullable=false)) > ) > ) >
[jira] [Created] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
Rick Moritz created SPARK-20489: --- Summary: Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?) Key: SPARK-20489 URL: https://issues.apache.org/jira/browse/SPARK-20489 Project: Spark Issue Type: Bug Components: Shuffle, Spark Core, SQL Affects Versions: 2.0.2, 2.0.1, 2.0.0 Environment: yarn-client mode in Zeppelin Reporter: Rick Moritz Priority: Critical Running the following code (in Zeppelin, but I assume spark-shell would be the same), I get different results, depending on whether I am using local[*] -mode or yarn-client mode: import org.apache.spark.sql.Row import spark.implicits._ val counter = 1 to 2 val size = 1 to 3 val sampleText = spark.createDataFrame( sc.parallelize(size) .map(Row(_)), StructType(Array(StructField("id", IntegerType, nullable=false)) ) ) .withColumn("loadDTS",lit("2017-04-25T10:45:02.2")) val rddList = counter.map( count => sampleText .withColumn("loadDTS2", date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS")) .drop(col("loadDTS")) .withColumnRenamed("loadDTS2","loadDTS") .coalesce(4) .rdd ) val resultText = spark.createDataFrame( spark.sparkContext.union(rddList), sampleText.schema ) val testGrouped = resultText.groupBy("id") val timestamps = testGrouped.agg( max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as "timestamp" ) val loadDateResult = resultText.join(timestamps, "id") val filteredresult = loadDateResult.filter($"timestamp" === unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) filteredresult.count The expected result, 3 is what I obtain in local mode, but as soon as I run fully distributed, I get 0. If Increase size to 1 to 32000, I do get some results (depending on the size of counter) - none of which makes any sense. Up to the application of the last filter, at first glance everything looks okay, but then something goes wrong. Potentially this is due to lingering re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed mode. The generated execution plan is the same in each case, as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20155) CSV-files with quoted quotes can't be parsed, if delimiter follows quoted quote
[ https://issues.apache.org/jira/browse/SPARK-20155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Moritz updated SPARK-20155: Description: According to : https://tools.ietf.org/html/rfc4180#section-2 7. If double-quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote. For example: "aaa","b""bb","ccc" This currently works as is, but the following does not: "aaa","b""b,b","ccc" while "aaa","b\"b,b","ccc" does get parsed. I assume, this happens because quotes are currently being parsed in pairs, and that somehow ends up unquoting delimiter. Edit: So future readers don't have to dive into the comments: A workaround (as of Spark 2.0) is to explicitely declare the escape character to be a double quote: (read.csv.option("escape","\"")) I argue that this should be the default setting (or at least the default setting should be compatible to the RFC). Related work on how to properly escape an escape character which ambiguously escapes a quote would be a good vector to pass this change. was: According to : https://tools.ietf.org/html/rfc4180#section-2 7. If double-quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote. For example: "aaa","b""bb","ccc" This currently works as is, but the following does not: "aaa","b""b,b","ccc" while "aaa","b\"b,b","ccc" does get parsed. I assume, this happens because quotes are currently being parsed in pairs, and that somehow ends up unquoting delimiter. Edit: So future readers don't have to dive into the comments: A workaround (as of Spark 2.0) is to explicitely declare the escape character to be a double quote: (read.csv.option("escape","\"")) > CSV-files with quoted quotes can't be parsed, if delimiter follows quoted > quote > --- > > Key: SPARK-20155 > URL: https://issues.apache.org/jira/browse/SPARK-20155 > Project: Spark > Issue Type: Bug > Components: Input/Output, SQL >Affects Versions: 2.0.0 >Reporter: Rick Moritz > > According to : > https://tools.ietf.org/html/rfc4180#section-2 > 7. If double-quotes are used to enclose fields, then a double-quote >appearing inside a field must be escaped by preceding it with >another double quote. For example: >"aaa","b""bb","ccc" > This currently works as is, but the following does not: > "aaa","b""b,b","ccc" > while "aaa","b\"b,b","ccc" does get parsed. > I assume, this happens because quotes are currently being parsed in pairs, > and that somehow ends up unquoting delimiter. > Edit: So future readers don't have to dive into the comments: A workaround > (as of Spark 2.0) is to explicitely declare the escape character to be a > double quote: (read.csv.option("escape","\"")) > I argue that this should be the default setting (or at least the default > setting should be compatible to the RFC). Related work on how to properly > escape an escape character which ambiguously escapes a quote would be a good > vector to pass this change. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20155) CSV-files with quoted quotes can't be parsed, if delimiter follows quoted quote
[ https://issues.apache.org/jira/browse/SPARK-20155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Moritz updated SPARK-20155: Description: According to : https://tools.ietf.org/html/rfc4180#section-2 7. If double-quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote. For example: "aaa","b""bb","ccc" This currently works as is, but the following does not: "aaa","b""b,b","ccc" while "aaa","b\"b,b","ccc" does get parsed. I assume, this happens because quotes are currently being parsed in pairs, and that somehow ends up unquoting delimiter. Edit: So future readers don't have to dive into the comments: A workaround (as of Spark 2.0) is to explicitely declare the escape character to be a double quote: (read.csv.option("escape","\"")) was: According to : https://tools.ietf.org/html/rfc4180#section-2 7. If double-quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote. For example: "aaa","b""bb","ccc" This currently works as is, but the following does not: "aaa","b""b,b","ccc" while "aaa","b\"b,b","ccc" does get parsed. I assume, this happens because quotes are currently being parsed in pairs, and that somehow ends up unquoting delimiter. > CSV-files with quoted quotes can't be parsed, if delimiter follows quoted > quote > --- > > Key: SPARK-20155 > URL: https://issues.apache.org/jira/browse/SPARK-20155 > Project: Spark > Issue Type: Bug > Components: Input/Output, SQL >Affects Versions: 2.0.0 >Reporter: Rick Moritz > > According to : > https://tools.ietf.org/html/rfc4180#section-2 > 7. If double-quotes are used to enclose fields, then a double-quote >appearing inside a field must be escaped by preceding it with >another double quote. For example: >"aaa","b""bb","ccc" > This currently works as is, but the following does not: > "aaa","b""b,b","ccc" > while "aaa","b\"b,b","ccc" does get parsed. > I assume, this happens because quotes are currently being parsed in pairs, > and that somehow ends up unquoting delimiter. > Edit: So future readers don't have to dive into the comments: A workaround > (as of Spark 2.0) is to explicitely declare the escape character to be a > double quote: (read.csv.option("escape","\"")) -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20155) CSV-files with quoted quotes can't be parsed, if delimiter follows quoted quote
[ https://issues.apache.org/jira/browse/SPARK-20155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981185#comment-15981185 ] Rick Moritz commented on SPARK-20155: - Good info, thanks. I've added a link. > CSV-files with quoted quotes can't be parsed, if delimiter follows quoted > quote > --- > > Key: SPARK-20155 > URL: https://issues.apache.org/jira/browse/SPARK-20155 > Project: Spark > Issue Type: Bug > Components: Input/Output, SQL >Affects Versions: 2.0.0 >Reporter: Rick Moritz > > According to : > https://tools.ietf.org/html/rfc4180#section-2 > 7. If double-quotes are used to enclose fields, then a double-quote >appearing inside a field must be escaped by preceding it with >another double quote. For example: >"aaa","b""bb","ccc" > This currently works as is, but the following does not: > "aaa","b""b,b","ccc" > while "aaa","b\"b,b","ccc" does get parsed. > I assume, this happens because quotes are currently being parsed in pairs, > and that somehow ends up unquoting delimiter. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20155) CSV-files with quoted quotes can't be parsed, if delimiter follows quoted quote
[ https://issues.apache.org/jira/browse/SPARK-20155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981068#comment-15981068 ] Rick Moritz commented on SPARK-20155: - Why shoudn't we change the default escape character, when there's a csv rfc which only ever mentions one csv escape character - and it tricked you and me both? Sure, it may break an existing API, but this looks like that API is already broken, given that shell-style escapes aren't part of the CSV-RFC.Sure, it's not a standard, but by diverging from the proposal Spark is making CSV more fractured than it needs to be. The option of reading broken csv files is fine by me, but the default ought to be as close as possible to the rfc as can be. > CSV-files with quoted quotes can't be parsed, if delimiter follows quoted > quote > --- > > Key: SPARK-20155 > URL: https://issues.apache.org/jira/browse/SPARK-20155 > Project: Spark > Issue Type: Bug > Components: Input/Output, SQL >Affects Versions: 2.0.0 >Reporter: Rick Moritz > > According to : > https://tools.ietf.org/html/rfc4180#section-2 > 7. If double-quotes are used to enclose fields, then a double-quote >appearing inside a field must be escaped by preceding it with >another double quote. For example: >"aaa","b""bb","ccc" > This currently works as is, but the following does not: > "aaa","b""b,b","ccc" > while "aaa","b\"b,b","ccc" does get parsed. > I assume, this happens because quotes are currently being parsed in pairs, > and that somehow ends up unquoting delimiter. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20155) CSV-files with quoted quotes can't be parsed, if delimiter follows quoted quote
[ https://issues.apache.org/jira/browse/SPARK-20155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Moritz updated SPARK-20155: Component/s: SQL > CSV-files with quoted quotes can't be parsed, if delimiter follows quoted > quote > --- > > Key: SPARK-20155 > URL: https://issues.apache.org/jira/browse/SPARK-20155 > Project: Spark > Issue Type: Bug > Components: Input/Output, SQL >Affects Versions: 2.0.0 >Reporter: Rick Moritz > > According to : > https://tools.ietf.org/html/rfc4180#section-2 > 7. If double-quotes are used to enclose fields, then a double-quote >appearing inside a field must be escaped by preceding it with >another double quote. For example: >"aaa","b""bb","ccc" > This currently works as is, but the following does not: > "aaa","b""b,b","ccc" > while "aaa","b\"b,b","ccc" does get parsed. > I assume, this happens because quotes are currently being parsed in pairs, > and that somehow ends up unquoting delimiter. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20155) CSV-files with quoted quotes can't be parsed, if delimiter follows quoted quote
[ https://issues.apache.org/jira/browse/SPARK-20155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Moritz updated SPARK-20155: Summary: CSV-files with quoted quotes can't be parsed, if delimiter follows quoted quote (was: CSV-files with quoted quotes can't be parsed, if delimiter followes quoted quote) > CSV-files with quoted quotes can't be parsed, if delimiter follows quoted > quote > --- > > Key: SPARK-20155 > URL: https://issues.apache.org/jira/browse/SPARK-20155 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 2.0.0 >Reporter: Rick Moritz > > According to : > https://tools.ietf.org/html/rfc4180#section-2 > 7. If double-quotes are used to enclose fields, then a double-quote >appearing inside a field must be escaped by preceding it with >another double quote. For example: >"aaa","b""bb","ccc" > This currently works as is, but the following does not: > "aaa","b""b,b","ccc" > while "aaa","b\"b,b","ccc" does get parsed. > I assume, this happens because quotes are currently being parsed in pairs, > and that somehow ends up unquoting delimiter. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20155) CSV-files with quoted quotes can't be parsed, if delimiter followes quoted quote
Rick Moritz created SPARK-20155: --- Summary: CSV-files with quoted quotes can't be parsed, if delimiter followes quoted quote Key: SPARK-20155 URL: https://issues.apache.org/jira/browse/SPARK-20155 Project: Spark Issue Type: Bug Components: Input/Output Affects Versions: 2.0.0 Reporter: Rick Moritz According to : https://tools.ietf.org/html/rfc4180#section-2 7. If double-quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote. For example: "aaa","b""bb","ccc" This currently works as is, but the following does not: "aaa","b""b,b","ccc" while "aaa","b\"b,b","ccc" does get parsed. I assume, this happens because quotes are currently being parsed in pairs, and that somehow ends up unquoting delimiter. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7264) SparkR API for parallel functions
[ https://issues.apache.org/jira/browse/SPARK-7264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14647268#comment-14647268 ] Rick Moritz commented on SPARK-7264: I've also added a bit of commentary. SparkR API for parallel functions - Key: SPARK-7264 URL: https://issues.apache.org/jira/browse/SPARK-7264 Project: Spark Issue Type: New Feature Components: SparkR Reporter: Shivaram Venkataraman This is a JIRA to discuss design proposals for enabling parallel R computation in SparkR without exposing the entire RDD API. The rationale for this is that the RDD API has a number of low level functions and we would like to expose a more light-weight API that is both friendly to R users and easy to maintain. http://goo.gl/GLHKZI has a first cut design doc. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6816) Add SparkConf API to configure SparkR
[ https://issues.apache.org/jira/browse/SPARK-6816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14588271#comment-14588271 ] Rick Moritz commented on SPARK-6816: Apparently this work-around is no longer needed for spark-1.4.0, which invokes a shell script instead of going directly to java as sparkR-pkg did, and fetches the required environment parameters. With spark-defaults being respected, and SPARK_MEM available for memory options, there probably isn't a whole lot that needs to be passed by -D to shell script. Add SparkConf API to configure SparkR - Key: SPARK-6816 URL: https://issues.apache.org/jira/browse/SPARK-6816 Project: Spark Issue Type: New Feature Components: SparkR Reporter: Shivaram Venkataraman Priority: Minor Right now the only way to configure SparkR is to pass in arguments to sparkR.init. The goal is to add an API similar to SparkConf on Scala/Python to make configuration easier -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-8380) SparkR mis-counts
[ https://issues.apache.org/jira/browse/SPARK-8380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Moritz closed SPARK-8380. -- Resolution: Invalid I got my columns mixed up, late in the evening after a frustrating day with SparkR's documentation. With the correct columns, the counts are equal in both expression types and via both platforms. SparkR mis-counts - Key: SPARK-8380 URL: https://issues.apache.org/jira/browse/SPARK-8380 Project: Spark Issue Type: Bug Components: SparkR Affects Versions: 1.4.0 Reporter: Rick Moritz On my dataset of ~9 Million rows x 30 columns, queried via Hive, I can perform count operations on the entirety of the dataset and get the correct value, as double checked against the same code in scala. When I start to add conditions or even do a simple partial ascending histogram, I get discrepancies. In particular, there are missing values in SparkR, and massively so: A top 6 count of a certain feature in my dataset results in an order of magnitude smaller numbers, than I get via scala. The following logic, which I consider equivalent is the basis for this report: counts-summarize(groupBy(df, df$col_name), count = n(tdf$col_name)) head(arrange(counts, desc(counts$count))) versus: val table = sql(SELECT col_name, count(col_name) as value from df group by col_name order by value desc) The first, in particular, is taken directly from the SparkR programming guide. Since summarize isn't documented from what I can see, I'd hope it does what the programming guide indicates. In that case this would be a pretty serious logic bug (no errors are thrown). Otherwise, there's the possibility of a lack of documentation and badly worded example in the guide being behind my misperception of SparkRs functionality. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8380) SparkR mis-counts
Rick Moritz created SPARK-8380: -- Summary: SparkR mis-counts Key: SPARK-8380 URL: https://issues.apache.org/jira/browse/SPARK-8380 Project: Spark Issue Type: Bug Components: SparkR Affects Versions: 1.4.0 Reporter: Rick Moritz On my dataset of ~9 Million rows x 30 columns, queried via Hive, I can perform count operations on the entirety of the dataset and get the correct value, as double checked against the same code in scala. When I start to add conditions or even do a simple partial ascending histogram, I get discrepancies. In particular, there are missing values in SparkR, and massively so: A top 6 count of a certain feature in my dataset results in an order of magnitude smaller numbers, than I get via scala. The following logic, which I consider equivalent is the basis for this report: counts-summarize(groupBy(df, df$col_name), count = n(tdf$col_name)) head(arrange(counts, desc(counts$count))) versus: val table = sql(SELECT col_name, count(col_name) as value from df group by col_name order by value desc) The first, in particular, is taken directly from the SparkR programming guide. Since summarize isn't documented from what I can see, I'd hope it does what the programming guide indicates. In that case this would be a pretty serious logic bug (no errors are thrown). Otherwise, there's the possibility of a lack of documentation and badly worded example in the guide being behind my misperception of SparkRs functionality. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8380) SparkR mis-counts
[ https://issues.apache.org/jira/browse/SPARK-8380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14586262#comment-14586262 ] Rick Moritz commented on SPARK-8380: I will attempt to reproduce this with an alternate dataset asap, but getting large volume datasets into this cluster is difficult. SparkR mis-counts - Key: SPARK-8380 URL: https://issues.apache.org/jira/browse/SPARK-8380 Project: Spark Issue Type: Bug Components: SparkR Affects Versions: 1.4.0 Reporter: Rick Moritz On my dataset of ~9 Million rows x 30 columns, queried via Hive, I can perform count operations on the entirety of the dataset and get the correct value, as double checked against the same code in scala. When I start to add conditions or even do a simple partial ascending histogram, I get discrepancies. In particular, there are missing values in SparkR, and massively so: A top 6 count of a certain feature in my dataset results in an order of magnitude smaller numbers, than I get via scala. The following logic, which I consider equivalent is the basis for this report: counts-summarize(groupBy(df, df$col_name), count = n(tdf$col_name)) head(arrange(counts, desc(counts$count))) versus: val table = sql(SELECT col_name, count(col_name) as value from df group by col_name order by value desc) The first, in particular, is taken directly from the SparkR programming guide. Since summarize isn't documented from what I can see, I'd hope it does what the programming guide indicates. In that case this would be a pretty serious logic bug (no errors are thrown). Otherwise, there's the possibility of a lack of documentation and badly worded example in the guide being behind my misperception of SparkRs functionality. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6511) Publish hadoop provided build with instructions for different distros
[ https://issues.apache.org/jira/browse/SPARK-6511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14582150#comment-14582150 ] Rick Moritz commented on SPARK-6511: Thanks, that was timely (especially since this issue does not show up nearly as far up the results list for my search as the eventual commit). A note: My HDP hadoop-2.6.0 does not support the --config option, so care has to be taken when hard-wiring such a feature into the build script. Publish hadoop provided build with instructions for different distros --- Key: SPARK-6511 URL: https://issues.apache.org/jira/browse/SPARK-6511 Project: Spark Issue Type: Improvement Components: Build Reporter: Patrick Wendell Assignee: Patrick Wendell Fix For: 1.4.0 Currently we publish a series of binaries with different Hadoop client jars. This mostly works, but some users have reported compatibility issues with different distributions. One improvement moving forward might be to publish a binary build that simply asks you to set HADOOP_HOME to pick up the Hadoop client location. That way it would work across multiple distributions, even if they have subtle incompatibilities with upstream Hadoop. I think a first step for this would be to produce such a build for the community and see how well it works. One potential issue is that our fancy excludes and dependency re-writing won't work with the simpler append Hadoop's classpath to Spark. Also, how we deal with the Hive dependency is unclear, i.e. should we continue to bundle Spark's Hive (which has some fixes for dependency conflicts) or do we allow for linking against vanilla Hive at runtime. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-6816) Add SparkConf API to configure SparkR
[ https://issues.apache.org/jira/browse/SPARK-6816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14567187#comment-14567187 ] Rick Moritz edited comment on SPARK-6816 at 6/2/15 8:55 AM: One current drawback with SparkR's configuration option is the inability to set driver VM-options. These are crucial, when attempting to run sparkR on a Hortonworks HDP, as both driver and appliation-master need to be aware of the hdp.version variable in order to resolve the classpath. While it is possible to pass this variable to the executors, there's no way to pass this option to the driver, excepting the following exploit/work-around: The SPARK_MEM variable can be abused to pass the required parameters to the driver's VM, by using String concatenation. Setting the variable to (e.g.) 512m -Dhdp.version=NNN appends the -D option to the -X option which is currently read from this environment variable. Adding a secondary variable to the System.env which gets parsed for JVM options would be far more obvious and less hacky, or by adding a separate environment list for the driver, extending what's currently available for executors. I'm adding this as a comment to this issue, since I believe it is sufficiently closely related not to warrant a separate issue. was (Author: rpcmoritz): One current drawback with SparkR's configuration option is the inability to set driver VM-options. These are crucial, when attempting to run sparkR on a Hortonworks HDP, as both driver and appliation-master need to be aware of the hdp.version variable in order to resolve the classpath. While it is possible to pass this variable to the executors, there's no way to pass this option to the driver, excepting the following exploit/work-around: The SPARK_MEM variable can be abused to pass the required parameters to the driver's VM, by using String concatenation. Setting the variable to (e.g.) 512m -Dhdp.version=NNN appends the -D option to the -X option which is currently read from this environment variable. Adding a secondary variable to the System.env which gets parsed for JVM options would be far more obvious and less hacky, or by adding a separate environment list for the driver, extending what's currently available for executors. I'm adding this as a comment to this issue, since I believe it is sufficiently closely related not to warrant a separate issue. Add SparkConf API to configure SparkR - Key: SPARK-6816 URL: https://issues.apache.org/jira/browse/SPARK-6816 Project: Spark Issue Type: New Feature Components: SparkR Reporter: Shivaram Venkataraman Priority: Minor Right now the only way to configure SparkR is to pass in arguments to sparkR.init. The goal is to add an API similar to SparkConf on Scala/Python to make configuration easier -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6816) Add SparkConf API to configure SparkR
[ https://issues.apache.org/jira/browse/SPARK-6816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14568807#comment-14568807 ] Rick Moritz commented on SPARK-6816: [~shivaram], I am integrating SparkR into an RStudio server (I would believe this to be a rather common use case), so using bin/SparkR won't work in this case, as far as I can tell. Thanks for the suggestion nonetheless. Add SparkConf API to configure SparkR - Key: SPARK-6816 URL: https://issues.apache.org/jira/browse/SPARK-6816 Project: Spark Issue Type: New Feature Components: SparkR Reporter: Shivaram Venkataraman Priority: Minor Right now the only way to configure SparkR is to pass in arguments to sparkR.init. The goal is to add an API similar to SparkConf on Scala/Python to make configuration easier -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6816) Add SparkConf API to configure SparkR
[ https://issues.apache.org/jira/browse/SPARK-6816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14567187#comment-14567187 ] Rick Moritz commented on SPARK-6816: One current drawback with SparkR's configuration option is the inability to set driver VM-options. These are crucial, when attempting to run sparkR on a Hortonworks HDP, as both driver and appliation-master need to be aware of the hdp.version variable in order to resolve the classpath. While it is possible to pass this variable to the executors, there's no way to pass this option to the driver, excepting the following exploit/work-around: The SPARK_MEM variable can be abused to pass the required parameters to the driver's VM, by using String concatenation. Setting the variable to (e.g.) 512m -Dhdp.version=NNN appends the -D option to the -X option which is currently read from this environment variable. Adding a secondary variable to the System.env which gets parsed for JVM options would be far more obvious and less hacky, or by adding a separate environment list for the driver, extending what's currently available for executors. I'm adding this as a comment to this issue, since I believe it is sufficiently closely related not to warrant a separate issue. Add SparkConf API to configure SparkR - Key: SPARK-6816 URL: https://issues.apache.org/jira/browse/SPARK-6816 Project: Spark Issue Type: New Feature Components: SparkR Reporter: Shivaram Venkataraman Priority: Minor Right now the only way to configure SparkR is to pass in arguments to sparkR.init. The goal is to add an API similar to SparkConf on Scala/Python to make configuration easier -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org