[jira] [Updated] (SPARK-32376) Make unionByName null-filling behavior work with struct columns

2020-08-18 Thread Mukul Murthy (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mukul Murthy updated SPARK-32376:
-
Attachment: tests.scala

> Make unionByName null-filling behavior work with struct columns
> ---
>
> Key: SPARK-32376
> URL: https://issues.apache.org/jira/browse/SPARK-32376
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Mukul Murthy
>Priority: Major
> Attachments: tests.scala
>
>
> https://issues.apache.org/jira/browse/SPARK-29358 added support for 
> unionByName to work when the two datasets didn't necessarily have the same 
> schema, but it does not work with nested columns like structs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32376) Make unionByName null-filling behavior work with struct columns

2020-08-18 Thread Mukul Murthy (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179450#comment-17179450
 ] 

Mukul Murthy commented on SPARK-32376:
--

I'm sorry, I only saw the mail for this now. Attaching the file with our tests, 
feel free to either use these as a reference or drop them in. Authored by Rahul 
Govind from my team, he's rgovind3 on GitHub.

> Make unionByName null-filling behavior work with struct columns
> ---
>
> Key: SPARK-32376
> URL: https://issues.apache.org/jira/browse/SPARK-32376
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Mukul Murthy
>Priority: Major
>
> https://issues.apache.org/jira/browse/SPARK-29358 added support for 
> unionByName to work when the two datasets didn't necessarily have the same 
> schema, but it does not work with nested columns like structs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32376) Make unionByName null-filling behavior work with struct columns

2020-07-20 Thread Mukul Murthy (Jira)
Mukul Murthy created SPARK-32376:


 Summary: Make unionByName null-filling behavior work with struct 
columns
 Key: SPARK-32376
 URL: https://issues.apache.org/jira/browse/SPARK-32376
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.0
Reporter: Mukul Murthy


https://issues.apache.org/jira/browse/SPARK-29358 added support for unionByName 
to work when the two datasets didn't necessarily have the same schema, but it 
does not work with nested columns like structs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31324) StreamingQuery stop() timeout exception should include the stream ID

2020-04-01 Thread Mukul Murthy (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073289#comment-17073289
 ] 

Mukul Murthy commented on SPARK-31324:
--

Gotcha, thanks. Was not planning on backporting this, but I had just picked the 
version the change was in.

> StreamingQuery stop() timeout exception should include the stream ID
> 
>
> Key: SPARK-31324
> URL: https://issues.apache.org/jira/browse/SPARK-31324
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.1.0
>Reporter: Mukul Murthy
>Priority: Trivial
>
> [https://github.com/apache/spark/pull/26771/|https://github.com/apache/spark/pull/26771/files]
>  added a conf to set a timeout when stop()ing streams; if the stream does not 
> terminate in this time, an exception is thrown. Having hit this issue once in 
> production, it would be nice to have the stream ID in the error message.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31324) StreamingQuery stop() timeout exception should include the stream ID

2020-04-01 Thread Mukul Murthy (Jira)
Mukul Murthy created SPARK-31324:


 Summary: StreamingQuery stop() timeout exception should include 
the stream ID
 Key: SPARK-31324
 URL: https://issues.apache.org/jira/browse/SPARK-31324
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.4.4
Reporter: Mukul Murthy


[https://github.com/apache/spark/pull/26771/|https://github.com/apache/spark/pull/26771/files]
 added a conf to set a timeout when stop()ing streams; if the stream does not 
terminate in this time, an exception is thrown. Having hit this issue once in 
production, it would be nice to have the stream ID in the error message.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29358) Make unionByName optionally fill missing columns with nulls

2019-10-10 Thread Mukul Murthy (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16948800#comment-16948800
 ] 

Mukul Murthy commented on SPARK-29358:
--

That would be a start to make us not have to do #1, but #2 is still annoying. 
Serializing and deserializing the data just to merge schemas is clunky, and 
transforming each DataFrame's schema is annoying enough to use when you don't 
have StructTypes and nested columns. When you add those into the picture, it 
gets even messier.

 

 

> Make unionByName optionally fill missing columns with nulls
> ---
>
> Key: SPARK-29358
> URL: https://issues.apache.org/jira/browse/SPARK-29358
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mukul Murthy
>Priority: Major
>
> Currently, unionByName requires two DataFrames to have the same set of 
> columns (even though the order can be different). It would be good to add 
> either an option to unionByName or a new type of union which fills in missing 
> columns with nulls. 
> {code:java}
> val df1 = Seq(1, 2, 3).toDF("x")
> val df2 = Seq("a", "b", "c").toDF("y")
> df1.unionByName(df2){code}
> This currently throws 
> {code:java}
> org.apache.spark.sql.AnalysisException: Cannot resolve column name "x" among 
> (y);
> {code}
> Ideally, there would be a way to make this return a DataFrame containing:
> {code:java}
> +++ 
> | x| y| 
> +++ 
> | 1|null| 
> | 2|null| 
> | 3|null| 
> |null| a| 
> |null| b| 
> |null| c| 
> +++
> {code}
> Currently the workaround to make this possible is by using unionByName, but 
> this is clunky:
> {code:java}
> df1.withColumn("y", lit(null)).unionByName(df2.withColumn("x", lit(null)))
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29358) Make unionByName optionally fill missing columns with nulls

2019-10-09 Thread Mukul Murthy (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947982#comment-16947982
 ] 

Mukul Murthy commented on SPARK-29358:
--

[~hyukjin.kwon], I disagree that the workaround is pretty easy. For the trivial 
example where you know what the schema are, it is pretty easy. For more 
complicated cases , especially where you don't know what the schema are, you 
have to:

1. Compute the merged schema using some complex logic. Check 
[https://github.com/delta-io/delta/blob/master/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala#L662]
 for one correct implementation of this logic.

2. Write both data out to JSON, union those, and read it back with the merged 
schema, OR loop and transform each source data into the correct target schema. 

> Make unionByName optionally fill missing columns with nulls
> ---
>
> Key: SPARK-29358
> URL: https://issues.apache.org/jira/browse/SPARK-29358
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mukul Murthy
>Priority: Major
>
> Currently, unionByName requires two DataFrames to have the same set of 
> columns (even though the order can be different). It would be good to add 
> either an option to unionByName or a new type of union which fills in missing 
> columns with nulls. 
> {code:java}
> val df1 = Seq(1, 2, 3).toDF("x")
> val df2 = Seq("a", "b", "c").toDF("y")
> df1.unionByName(df2){code}
> This currently throws 
> {code:java}
> org.apache.spark.sql.AnalysisException: Cannot resolve column name "x" among 
> (y);
> {code}
> Ideally, there would be a way to make this return a DataFrame containing:
> {code:java}
> +++ 
> | x| y| 
> +++ 
> | 1|null| 
> | 2|null| 
> | 3|null| 
> |null| a| 
> |null| b| 
> |null| c| 
> +++
> {code}
> Currently the workaround to make this possible is by using unionByName, but 
> this is clunky:
> {code:java}
> df1.withColumn("y", lit(null)).unionByName(df2.withColumn("x", lit(null)))
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29358) Make unionByName optionally fill missing columns with nulls

2019-10-07 Thread Mukul Murthy (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16946069#comment-16946069
 ] 

Mukul Murthy commented on SPARK-29358:
--

I agree that it should not change the current behavior of unionByName. I'm 
proposing either a new optional parameter to unionByName or a new API entirely.

Being similar to SQL is nice, but I think in cases where there are common 
problems that people have to get around (multiple DataFrames with different 
schemas, merging the schemas, and withColumn'ing all the missing ones from each 
DataFrame), it's nice to have the option to solve these in Spark instead of 
making users do it.

> Make unionByName optionally fill missing columns with nulls
> ---
>
> Key: SPARK-29358
> URL: https://issues.apache.org/jira/browse/SPARK-29358
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mukul Murthy
>Priority: Major
>
> Currently, unionByName requires two DataFrames to have the same set of 
> columns (even though the order can be different). It would be good to add 
> either an option to unionByName or a new type of union which fills in missing 
> columns with nulls. 
> {code:java}
> val df1 = Seq(1, 2, 3).toDF("x")
> val df2 = Seq("a", "b", "c").toDF("y")
> df1.unionByName(df2){code}
> This currently throws 
> {code:java}
> org.apache.spark.sql.AnalysisException: Cannot resolve column name "x" among 
> (y);
> {code}
> Ideally, there would be a way to make this return a DataFrame containing:
> {code:java}
> +++ 
> | x| y| 
> +++ 
> | 1|null| 
> | 2|null| 
> | 3|null| 
> |null| a| 
> |null| b| 
> |null| c| 
> +++
> {code}
> Currently the workaround to make this possible is by using unionByName, but 
> this is clunky:
> {code:java}
> df1.withColumn("y", lit(null)).unionByName(df2.withColumn("x", lit(null)))
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-29358) Make unionByName optionally fill missing columns with nulls

2019-10-04 Thread Mukul Murthy (Jira)
Mukul Murthy created SPARK-29358:


 Summary: Make unionByName optionally fill missing columns with 
nulls
 Key: SPARK-29358
 URL: https://issues.apache.org/jira/browse/SPARK-29358
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.4
Reporter: Mukul Murthy


Currently, unionByName requires two DataFrames to have the same set of columns 
(even though the order can be different). It would be good to add either an 
option to unionByName or a new type of union which fills in missing columns 
with nulls. 
{code:java}
val df1 = Seq(1, 2, 3).toDF("x")
val df2 = Seq("a", "b", "c").toDF("y")
df1.unionByName(df2){code}
This currently throws 
{code:java}
org.apache.spark.sql.AnalysisException: Cannot resolve column name "x" among 
(y);
{code}
Ideally, there would be a way to make this return a DataFrame containing:
{code:java}
+++ 
| x| y| 
+++ 
| 1|null| 
| 2|null| 
| 3|null| 
|null| a| 
|null| b| 
|null| c| 
+++
{code}
Currently the workaround to make this possible is by using unionByName, but 
this is clunky:
{code:java}
df1.withColumn("y", lit(null)).unionByName(df2.withColumn("x", lit(null)))
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26046) Add a way for StreamingQueryManager to remove all listeners

2019-08-30 Thread Mukul Murthy (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-26046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mukul Murthy updated SPARK-26046:
-
Description: StreamingQueryManager should have a way to clear out all 
listeners. There's addListener(listener) and removeListener(listener), but not 
removeAllListeners. We should expose a new method -removeAllListeners() that 
calls listenerBus.removeAllListeners (added here: 
[https://github.com/apache/spark/commit/9690eba16efe6d25261934d8b73a221972b684f3])-
 listListeners() that can be used to remove listeners.  (was: 
StreamingQueryManager should have a way to clear out all listeners. There's 
addListener(listener) and removeListener(listener), but not removeAllListeners. 
We should expose a new method removeAllListeners() that calls 
listenerBus.removeAllListeners (added here: 
[https://github.com/apache/spark/commit/9690eba16efe6d25261934d8b73a221972b684f3]).
 )

> Add a way for StreamingQueryManager to remove all listeners
> ---
>
> Key: SPARK-26046
> URL: https://issues.apache.org/jira/browse/SPARK-26046
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Mukul Murthy
>Priority: Major
>
> StreamingQueryManager should have a way to clear out all listeners. There's 
> addListener(listener) and removeListener(listener), but not 
> removeAllListeners. We should expose a new method -removeAllListeners() that 
> calls listenerBus.removeAllListeners (added here: 
> [https://github.com/apache/spark/commit/9690eba16efe6d25261934d8b73a221972b684f3])-
>  listListeners() that can be used to remove listeners.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-26046) Add a way for StreamingQueryManager to remove all listeners

2019-08-30 Thread Mukul Murthy (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-26046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mukul Murthy reopened SPARK-26046:
--

>From some other discussions I've had, I actually think it's a reasonable to 
>have a way to remove all listeners. I don't think it should be a 
>removeAllListeners API, as originally discussed, but StreamingQueryManager 
>could have a listListeners API which the caller could then choose to use to 
>remove each listener manually. 

> Add a way for StreamingQueryManager to remove all listeners
> ---
>
> Key: SPARK-26046
> URL: https://issues.apache.org/jira/browse/SPARK-26046
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Mukul Murthy
>Priority: Major
>
> StreamingQueryManager should have a way to clear out all listeners. There's 
> addListener(listener) and removeListener(listener), but not 
> removeAllListeners. We should expose a new method removeAllListeners() that 
> calls listenerBus.removeAllListeners (added here: 
> [https://github.com/apache/spark/commit/9690eba16efe6d25261934d8b73a221972b684f3]).
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28043) Reading json with duplicate columns drops the first column value

2019-06-13 Thread Mukul Murthy (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mukul Murthy updated SPARK-28043:
-
Description: 
When reading a JSON blob with duplicate fields, Spark appears to ignore the 
value of the first one. JSON recommends unique names but does not require it; 
since JSON and Spark SQL both allow duplicate field names, we should fix the 
bug where the first column value is getting dropped.

 

I'm guessing somewhere when parsing JSON, we're turning it into a Map which is 
causing the first value to be overridden.

 

Repro (Python, 2.4):

>>> jsonRDD = spark.sparkContext.parallelize(["\\{ \"a\": \"blah\", \"a\": 
>>> \"blah2\"}"])
 >>> df = spark.read.json(jsonRDD)
 >>> df.show()
 +-++
|a|a|

+-++
|null|blah2|

+-++

 

The expected response would be:

+-++
|a|a|

+-++
|blah|blah2|

+-++

  was:
When reading a JSON blob with duplicate fields, Spark appears to ignore the 
value of the first one. JSON recommends unique names but does not require it; 
since JSON and Spark SQL both allow duplicate field names, we should fix the 
bug where the first column value is getting dropped.

 

Repro (Python, 2.4):

>>> jsonRDD = spark.sparkContext.parallelize(["\{ \"a\": \"blah\", \"a\": 
>>> \"blah2\"}"])
>>> df = spark.read.json(jsonRDD)
>>> df.show()
++-+
| a| a|
++-+
|null|blah2|
++-+

 

The expected response would be:

++-+
| a| a|
++-+
|blah|blah2|
++-+


> Reading json with duplicate columns drops the first column value
> 
>
> Key: SPARK-28043
> URL: https://issues.apache.org/jira/browse/SPARK-28043
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Mukul Murthy
>Priority: Major
>
> When reading a JSON blob with duplicate fields, Spark appears to ignore the 
> value of the first one. JSON recommends unique names but does not require it; 
> since JSON and Spark SQL both allow duplicate field names, we should fix the 
> bug where the first column value is getting dropped.
>  
> I'm guessing somewhere when parsing JSON, we're turning it into a Map which 
> is causing the first value to be overridden.
>  
> Repro (Python, 2.4):
> >>> jsonRDD = spark.sparkContext.parallelize(["\\{ \"a\": \"blah\", \"a\": 
> >>> \"blah2\"}"])
>  >>> df = spark.read.json(jsonRDD)
>  >>> df.show()
>  +-++
> |a|a|
> +-++
> |null|blah2|
> +-++
>  
> The expected response would be:
> +-++
> |a|a|
> +-++
> |blah|blah2|
> +-++



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28043) Reading json with duplicate columns drops the first column value

2019-06-13 Thread Mukul Murthy (JIRA)
Mukul Murthy created SPARK-28043:


 Summary: Reading json with duplicate columns drops the first 
column value
 Key: SPARK-28043
 URL: https://issues.apache.org/jira/browse/SPARK-28043
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Mukul Murthy


When reading a JSON blob with duplicate fields, Spark appears to ignore the 
value of the first one. JSON recommends unique names but does not require it; 
since JSON and Spark SQL both allow duplicate field names, we should fix the 
bug where the first column value is getting dropped.

 

Repro (Python, 2.4):

>>> jsonRDD = spark.sparkContext.parallelize(["\{ \"a\": \"blah\", \"a\": 
>>> \"blah2\"}"])
>>> df = spark.read.json(jsonRDD)
>>> df.show()
++-+
| a| a|
++-+
|null|blah2|
++-+

 

The expected response would be:

++-+
| a| a|
++-+
|blah|blah2|
++-+



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26586) Streaming queries should have isolated SparkSessions and confs

2019-01-09 Thread Mukul Murthy (JIRA)
Mukul Murthy created SPARK-26586:


 Summary: Streaming queries should have isolated SparkSessions and 
confs
 Key: SPARK-26586
 URL: https://issues.apache.org/jira/browse/SPARK-26586
 Project: Spark
  Issue Type: Bug
  Components: SQL, Structured Streaming
Affects Versions: 2.4.0, 2.3.0
Reporter: Mukul Murthy


When a stream is started, the stream's config is supposed to be frozen and all 
batches run with the config at start time. However, due to a race condition in 
creating streams, updating a conf value in the active spark session immediately 
after starting a stream can lead to the stream getting that updated value.

 

The problem is that when StreamingQueryManager creates a MicrobatchExecution 
(or ContinuousExecution), it passes in the shared spark session, and the spark 
session isn't cloned until StreamExecution.start() is called. 
DataStreamWriter.start() should not return until the SparkSession is cloned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26046) Add a way for StreamingQueryManager to remove all listeners

2018-11-13 Thread Mukul Murthy (JIRA)
Mukul Murthy created SPARK-26046:


 Summary: Add a way for StreamingQueryManager to remove all 
listeners
 Key: SPARK-26046
 URL: https://issues.apache.org/jira/browse/SPARK-26046
 Project: Spark
  Issue Type: Task
  Components: Structured Streaming
Affects Versions: 2.4.0
Reporter: Mukul Murthy


StreamingQueryManager should have a way to clear out all listeners. There's 
addListener(listener) and removeListener(listener), but not removeAllListeners. 
We should expose a new method removeAllListeners() that calls 
listenerBus.removeAllListeners (added here: 
[https://github.com/apache/spark/commit/9690eba16efe6d25261934d8b73a221972b684f3]).
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25449) Don't send zero accumulators in heartbeats

2018-09-17 Thread Mukul Murthy (JIRA)
Mukul Murthy created SPARK-25449:


 Summary: Don't send zero accumulators in heartbeats
 Key: SPARK-25449
 URL: https://issues.apache.org/jira/browse/SPARK-25449
 Project: Spark
  Issue Type: Task
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Mukul Murthy


Heartbeats sent from executors to the driver every 10 seconds contain metrics 
and are generally on the order of a few KBs. However, for large jobs with lots 
of tasks, heartbeats can be on the order of tens of MBs, causing tasks to die 
with heartbeat failures. We can mitigate this by not sending zero metrics to 
the driver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25399) Reusing execution threads from continuous processing for microbatch streaming can result in correctness issues

2018-09-10 Thread Mukul Murthy (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mukul Murthy updated SPARK-25399:
-
Priority: Major  (was: Blocker)

> Reusing execution threads from continuous processing for microbatch streaming 
> can result in correctness issues
> --
>
> Key: SPARK-25399
> URL: https://issues.apache.org/jira/browse/SPARK-25399
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Mukul Murthy
>Priority: Major
>
> Continuous processing sets some thread local variables that, when read by a 
> thread running a microbatch stream, may result in incorrect or no previous 
> state being read and resulting in wrong answers. This was caught by a job 
> running the StreamSuite tests, and only repros occasionally when the same 
> threads are used.
> The issue is in StateStoreRDD.compute - when we compute currentVersion, we 
> read from a thread local variable which is set by continuous processing 
> threads. If this value is set, we then think we're on the wrong state version.
> I imagine very few people, if any, would run into this bug, because you'd 
> have to use continuous processing and then microbatch processing in the same 
> cluster. However, it can result in silent correctness issues, and it would be 
> very difficult for someone to tell if they were impacted by this or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25399) Reusing execution threads from continuous processing for microbatch streaming can result in correctness issues

2018-09-10 Thread Mukul Murthy (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609819#comment-16609819
 ] 

Mukul Murthy commented on SPARK-25399:
--

cc [~joseph.torres] and [~tdas]

> Reusing execution threads from continuous processing for microbatch streaming 
> can result in correctness issues
> --
>
> Key: SPARK-25399
> URL: https://issues.apache.org/jira/browse/SPARK-25399
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Mukul Murthy
>Priority: Blocker
>
> Continuous processing sets some thread local variables that, when read by a 
> thread running a microbatch stream, may result in incorrect or no previous 
> state being read and resulting in wrong answers. This was caught by a job 
> running the StreamSuite tests, and only repros occasionally when the same 
> threads are used.
> The issue is in StateStoreRDD.compute - when we compute currentVersion, we 
> read from a thread local variable which is set by continuous processing 
> threads. If this value is set, we then think we're on the wrong state version.
> I imagine very few people, if any, would run into this bug, because you'd 
> have to use continuous processing and then microbatch processing in the same 
> cluster. However, it can result in silent correctness issues, and it would be 
> very difficult for someone to tell if they were impacted by this or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25399) Reusing execution threads from continuous processing for microbatch streaming can result in correctness issues

2018-09-10 Thread Mukul Murthy (JIRA)
Mukul Murthy created SPARK-25399:


 Summary: Reusing execution threads from continuous processing for 
microbatch streaming can result in correctness issues
 Key: SPARK-25399
 URL: https://issues.apache.org/jira/browse/SPARK-25399
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.4.0
Reporter: Mukul Murthy


Continuous processing sets some thread local variables that, when read by a 
thread running a microbatch stream, may result in incorrect or no previous 
state being read and resulting in wrong answers. This was caught by a job 
running the StreamSuite tests, and only repros occasionally when the same 
threads are used.

The issue is in StateStoreRDD.compute - when we compute currentVersion, we read 
from a thread local variable which is set by continuous processing threads. If 
this value is set, we then think we're on the wrong state version.

I imagine very few people, if any, would run into this bug, because you'd have 
to use continuous processing and then microbatch processing in the same 
cluster. However, it can result in silent correctness issues, and it would be 
very difficult for someone to tell if they were impacted by this or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-25182) Block Manager master and slave thread pools are unbounded

2018-08-21 Thread Mukul Murthy (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mukul Murthy resolved SPARK-25182.
--
  Resolution: Duplicate
Target Version/s:   (was: 2.4.0)

> Block Manager master and slave thread pools are unbounded
> -
>
> Key: SPARK-25182
> URL: https://issues.apache.org/jira/browse/SPARK-25182
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Mukul Murthy
>Priority: Major
>
> Currently, BlockManagerMasterEndpoint and BlockManagerSlaveEndpoint both have 
> thread pools with unbounded numbers of threads. In certain cases, this can 
> lead to driver OOM errors. We should add an upper bound on the number of 
> threads in these thread pools; this should not break any existing behavior 
> because they still have queues of size Integer.MAX_VALUE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25182) Block Manager master and slave thread pools are unbounded

2018-08-21 Thread Mukul Murthy (JIRA)
Mukul Murthy created SPARK-25182:


 Summary: Block Manager master and slave thread pools are unbounded
 Key: SPARK-25182
 URL: https://issues.apache.org/jira/browse/SPARK-25182
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: Mukul Murthy


Currently, BlockManagerMasterEndpoint and BlockManagerSlaveEndpoint both have 
thread pools with unbounded numbers of threads. In certain cases, this can lead 
to driver OOM errors. We should add an upper bound on the number of threads in 
these thread pools; this should not break any existing behavior because they 
still have queues of size Integer.MAX_VALUE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25181) Block Manager master and slave thread pools are unbounded

2018-08-21 Thread Mukul Murthy (JIRA)
Mukul Murthy created SPARK-25181:


 Summary: Block Manager master and slave thread pools are unbounded
 Key: SPARK-25181
 URL: https://issues.apache.org/jira/browse/SPARK-25181
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: Mukul Murthy


Currently, BlockManagerMasterEndpoint and BlockManagerSlaveEndpoint both have 
thread pools with unbounded numbers of threads. In certain cases, this can lead 
to driver OOM errors. We should add an upper bound on the number of threads in 
these thread pools; this should not break any existing behavior because they 
still have queues of size Integer.MAX_VALUE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24438) Empty strings and null strings are written to the same partition

2018-07-05 Thread Mukul Murthy (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533977#comment-16533977
 ] 

Mukul Murthy commented on SPARK-24438:
--

Are null and empty string both invalid partition values? I kind of dislike that 
it's causing the actual data to be changed, although it is minor, and as you 
guys said it's actually a Hive bug so I don't think it's straightforward to fix.

 

> Empty strings and null strings are written to the same partition
> 
>
> Key: SPARK-24438
> URL: https://issues.apache.org/jira/browse/SPARK-24438
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Mukul Murthy
>Priority: Major
>
> When you partition on a string column that has empty strings and nulls, they 
> are both written to the same default partition. When you read the data back, 
> all those values get read back as null.
> {code:java}
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> val data = Seq(Row(1, ""), Row(2, ""), Row(3, ""), Row(4, "hello"), Row(5, 
> null))
> val schema = new StructType().add("a", IntegerType).add("b", StringType)
> val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
> display(df) 
> => 
> a b
> 1 
> 2 
> 3 
> 4 hello
> 5 null
> df.write.mode("overwrite").partitionBy("b").save("/home/mukul/weird_test_data4")
> val df2 = spark.read.load("/home/mukul/weird_test_data4")
> display(df2)
> => 
> a b
> 4 hello
> 3 null
> 2 null
> 1 null
> 5 null
> {code}
> Seems to affect multiple types of tables.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24662) Structured Streaming should support LIMIT

2018-06-28 Thread Mukul Murthy (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526848#comment-16526848
 ] 

Mukul Murthy commented on SPARK-24662:
--

Calling .limit(n) on a DataFrame (or in SQL, SELECT ... LIMIT n) returns only n 
rows from that query. limit is currently not supported on streaming dataframes; 
my fix is going to support it for streams writing in append and complete output 
modes. It's still going to be unsupported for streams in update output mode, 
because for updating streams, limit doesn't make sense.

> Structured Streaming should support LIMIT
> -
>
> Key: SPARK-24662
> URL: https://issues.apache.org/jira/browse/SPARK-24662
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Mukul Murthy
>Priority: Major
>
> Make structured streams support the LIMIT operator. 
> This will undo SPARK-24525 as the limit operator would be a superior solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24662) Structured Streaming should support LIMIT

2018-06-26 Thread Mukul Murthy (JIRA)
Mukul Murthy created SPARK-24662:


 Summary: Structured Streaming should support LIMIT
 Key: SPARK-24662
 URL: https://issues.apache.org/jira/browse/SPARK-24662
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 2.3.1
Reporter: Mukul Murthy


Make structured streams support the LIMIT operator. 

This will undo SPARK-24525 as the limit operator would be a superior solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24525) Provide an option to limit MemorySink memory usage

2018-06-11 Thread Mukul Murthy (JIRA)
Mukul Murthy created SPARK-24525:


 Summary: Provide an option to limit MemorySink memory usage
 Key: SPARK-24525
 URL: https://issues.apache.org/jira/browse/SPARK-24525
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.3.1
Reporter: Mukul Murthy


MemorySink stores stream results in memory and is mostly used for testing and 
displaying streams, but for large streams, this can OOM the driver. We should 
add an option to limit the number of rows and the total size of a memory sink 
and not add any new data once either limit is hit. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24438) Empty strings and null strings are written to the same partition

2018-05-30 Thread Mukul Murthy (JIRA)
Mukul Murthy created SPARK-24438:


 Summary: Empty strings and null strings are written to the same 
partition
 Key: SPARK-24438
 URL: https://issues.apache.org/jira/browse/SPARK-24438
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Mukul Murthy


When you partition on a string column that has empty strings and nulls, they 
are both written to the same default partition. When you read the data back, 
all those values get read back as null.


{code:java}
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
val data = Seq(Row(1, ""), Row(2, ""), Row(3, ""), Row(4, "hello"), Row(5, 
null))
val schema = new StructType().add("a", IntegerType).add("b", StringType)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
display(df) 
=> 
a b
1 
2 
3 
4 hello
5 null

df.write.mode("overwrite").partitionBy("b").save("/home/mukul/weird_test_data4")
val df2 = spark.read.load("/home/mukul/weird_test_data4")
display(df2)
=> 
a b
4 hello
3 null
2 null
1 null
5 null
{code}

Seems to affect multiple types of tables.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org