[jira] [Updated] (SPARK-32376) Make unionByName null-filling behavior work with struct columns
[ https://issues.apache.org/jira/browse/SPARK-32376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mukul Murthy updated SPARK-32376: - Attachment: tests.scala > Make unionByName null-filling behavior work with struct columns > --- > > Key: SPARK-32376 > URL: https://issues.apache.org/jira/browse/SPARK-32376 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Mukul Murthy >Priority: Major > Attachments: tests.scala > > > https://issues.apache.org/jira/browse/SPARK-29358 added support for > unionByName to work when the two datasets didn't necessarily have the same > schema, but it does not work with nested columns like structs. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-32376) Make unionByName null-filling behavior work with struct columns
[ https://issues.apache.org/jira/browse/SPARK-32376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179450#comment-17179450 ] Mukul Murthy commented on SPARK-32376: -- I'm sorry, I only saw the mail for this now. Attaching the file with our tests, feel free to either use these as a reference or drop them in. Authored by Rahul Govind from my team, he's rgovind3 on GitHub. > Make unionByName null-filling behavior work with struct columns > --- > > Key: SPARK-32376 > URL: https://issues.apache.org/jira/browse/SPARK-32376 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Mukul Murthy >Priority: Major > > https://issues.apache.org/jira/browse/SPARK-29358 added support for > unionByName to work when the two datasets didn't necessarily have the same > schema, but it does not work with nested columns like structs. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32376) Make unionByName null-filling behavior work with struct columns
Mukul Murthy created SPARK-32376: Summary: Make unionByName null-filling behavior work with struct columns Key: SPARK-32376 URL: https://issues.apache.org/jira/browse/SPARK-32376 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.0 Reporter: Mukul Murthy https://issues.apache.org/jira/browse/SPARK-29358 added support for unionByName to work when the two datasets didn't necessarily have the same schema, but it does not work with nested columns like structs. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31324) StreamingQuery stop() timeout exception should include the stream ID
[ https://issues.apache.org/jira/browse/SPARK-31324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073289#comment-17073289 ] Mukul Murthy commented on SPARK-31324: -- Gotcha, thanks. Was not planning on backporting this, but I had just picked the version the change was in. > StreamingQuery stop() timeout exception should include the stream ID > > > Key: SPARK-31324 > URL: https://issues.apache.org/jira/browse/SPARK-31324 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Mukul Murthy >Priority: Trivial > > [https://github.com/apache/spark/pull/26771/|https://github.com/apache/spark/pull/26771/files] > added a conf to set a timeout when stop()ing streams; if the stream does not > terminate in this time, an exception is thrown. Having hit this issue once in > production, it would be nice to have the stream ID in the error message. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31324) StreamingQuery stop() timeout exception should include the stream ID
Mukul Murthy created SPARK-31324: Summary: StreamingQuery stop() timeout exception should include the stream ID Key: SPARK-31324 URL: https://issues.apache.org/jira/browse/SPARK-31324 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.4.4 Reporter: Mukul Murthy [https://github.com/apache/spark/pull/26771/|https://github.com/apache/spark/pull/26771/files] added a conf to set a timeout when stop()ing streams; if the stream does not terminate in this time, an exception is thrown. Having hit this issue once in production, it would be nice to have the stream ID in the error message. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29358) Make unionByName optionally fill missing columns with nulls
[ https://issues.apache.org/jira/browse/SPARK-29358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16948800#comment-16948800 ] Mukul Murthy commented on SPARK-29358: -- That would be a start to make us not have to do #1, but #2 is still annoying. Serializing and deserializing the data just to merge schemas is clunky, and transforming each DataFrame's schema is annoying enough to use when you don't have StructTypes and nested columns. When you add those into the picture, it gets even messier. > Make unionByName optionally fill missing columns with nulls > --- > > Key: SPARK-29358 > URL: https://issues.apache.org/jira/browse/SPARK-29358 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Mukul Murthy >Priority: Major > > Currently, unionByName requires two DataFrames to have the same set of > columns (even though the order can be different). It would be good to add > either an option to unionByName or a new type of union which fills in missing > columns with nulls. > {code:java} > val df1 = Seq(1, 2, 3).toDF("x") > val df2 = Seq("a", "b", "c").toDF("y") > df1.unionByName(df2){code} > This currently throws > {code:java} > org.apache.spark.sql.AnalysisException: Cannot resolve column name "x" among > (y); > {code} > Ideally, there would be a way to make this return a DataFrame containing: > {code:java} > +++ > | x| y| > +++ > | 1|null| > | 2|null| > | 3|null| > |null| a| > |null| b| > |null| c| > +++ > {code} > Currently the workaround to make this possible is by using unionByName, but > this is clunky: > {code:java} > df1.withColumn("y", lit(null)).unionByName(df2.withColumn("x", lit(null))) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29358) Make unionByName optionally fill missing columns with nulls
[ https://issues.apache.org/jira/browse/SPARK-29358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947982#comment-16947982 ] Mukul Murthy commented on SPARK-29358: -- [~hyukjin.kwon], I disagree that the workaround is pretty easy. For the trivial example where you know what the schema are, it is pretty easy. For more complicated cases , especially where you don't know what the schema are, you have to: 1. Compute the merged schema using some complex logic. Check [https://github.com/delta-io/delta/blob/master/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala#L662] for one correct implementation of this logic. 2. Write both data out to JSON, union those, and read it back with the merged schema, OR loop and transform each source data into the correct target schema. > Make unionByName optionally fill missing columns with nulls > --- > > Key: SPARK-29358 > URL: https://issues.apache.org/jira/browse/SPARK-29358 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Mukul Murthy >Priority: Major > > Currently, unionByName requires two DataFrames to have the same set of > columns (even though the order can be different). It would be good to add > either an option to unionByName or a new type of union which fills in missing > columns with nulls. > {code:java} > val df1 = Seq(1, 2, 3).toDF("x") > val df2 = Seq("a", "b", "c").toDF("y") > df1.unionByName(df2){code} > This currently throws > {code:java} > org.apache.spark.sql.AnalysisException: Cannot resolve column name "x" among > (y); > {code} > Ideally, there would be a way to make this return a DataFrame containing: > {code:java} > +++ > | x| y| > +++ > | 1|null| > | 2|null| > | 3|null| > |null| a| > |null| b| > |null| c| > +++ > {code} > Currently the workaround to make this possible is by using unionByName, but > this is clunky: > {code:java} > df1.withColumn("y", lit(null)).unionByName(df2.withColumn("x", lit(null))) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29358) Make unionByName optionally fill missing columns with nulls
[ https://issues.apache.org/jira/browse/SPARK-29358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16946069#comment-16946069 ] Mukul Murthy commented on SPARK-29358: -- I agree that it should not change the current behavior of unionByName. I'm proposing either a new optional parameter to unionByName or a new API entirely. Being similar to SQL is nice, but I think in cases where there are common problems that people have to get around (multiple DataFrames with different schemas, merging the schemas, and withColumn'ing all the missing ones from each DataFrame), it's nice to have the option to solve these in Spark instead of making users do it. > Make unionByName optionally fill missing columns with nulls > --- > > Key: SPARK-29358 > URL: https://issues.apache.org/jira/browse/SPARK-29358 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Mukul Murthy >Priority: Major > > Currently, unionByName requires two DataFrames to have the same set of > columns (even though the order can be different). It would be good to add > either an option to unionByName or a new type of union which fills in missing > columns with nulls. > {code:java} > val df1 = Seq(1, 2, 3).toDF("x") > val df2 = Seq("a", "b", "c").toDF("y") > df1.unionByName(df2){code} > This currently throws > {code:java} > org.apache.spark.sql.AnalysisException: Cannot resolve column name "x" among > (y); > {code} > Ideally, there would be a way to make this return a DataFrame containing: > {code:java} > +++ > | x| y| > +++ > | 1|null| > | 2|null| > | 3|null| > |null| a| > |null| b| > |null| c| > +++ > {code} > Currently the workaround to make this possible is by using unionByName, but > this is clunky: > {code:java} > df1.withColumn("y", lit(null)).unionByName(df2.withColumn("x", lit(null))) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-29358) Make unionByName optionally fill missing columns with nulls
Mukul Murthy created SPARK-29358: Summary: Make unionByName optionally fill missing columns with nulls Key: SPARK-29358 URL: https://issues.apache.org/jira/browse/SPARK-29358 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.4 Reporter: Mukul Murthy Currently, unionByName requires two DataFrames to have the same set of columns (even though the order can be different). It would be good to add either an option to unionByName or a new type of union which fills in missing columns with nulls. {code:java} val df1 = Seq(1, 2, 3).toDF("x") val df2 = Seq("a", "b", "c").toDF("y") df1.unionByName(df2){code} This currently throws {code:java} org.apache.spark.sql.AnalysisException: Cannot resolve column name "x" among (y); {code} Ideally, there would be a way to make this return a DataFrame containing: {code:java} +++ | x| y| +++ | 1|null| | 2|null| | 3|null| |null| a| |null| b| |null| c| +++ {code} Currently the workaround to make this possible is by using unionByName, but this is clunky: {code:java} df1.withColumn("y", lit(null)).unionByName(df2.withColumn("x", lit(null))) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26046) Add a way for StreamingQueryManager to remove all listeners
[ https://issues.apache.org/jira/browse/SPARK-26046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mukul Murthy updated SPARK-26046: - Description: StreamingQueryManager should have a way to clear out all listeners. There's addListener(listener) and removeListener(listener), but not removeAllListeners. We should expose a new method -removeAllListeners() that calls listenerBus.removeAllListeners (added here: [https://github.com/apache/spark/commit/9690eba16efe6d25261934d8b73a221972b684f3])- listListeners() that can be used to remove listeners. (was: StreamingQueryManager should have a way to clear out all listeners. There's addListener(listener) and removeListener(listener), but not removeAllListeners. We should expose a new method removeAllListeners() that calls listenerBus.removeAllListeners (added here: [https://github.com/apache/spark/commit/9690eba16efe6d25261934d8b73a221972b684f3]). ) > Add a way for StreamingQueryManager to remove all listeners > --- > > Key: SPARK-26046 > URL: https://issues.apache.org/jira/browse/SPARK-26046 > Project: Spark > Issue Type: Task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Mukul Murthy >Priority: Major > > StreamingQueryManager should have a way to clear out all listeners. There's > addListener(listener) and removeListener(listener), but not > removeAllListeners. We should expose a new method -removeAllListeners() that > calls listenerBus.removeAllListeners (added here: > [https://github.com/apache/spark/commit/9690eba16efe6d25261934d8b73a221972b684f3])- > listListeners() that can be used to remove listeners. -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-26046) Add a way for StreamingQueryManager to remove all listeners
[ https://issues.apache.org/jira/browse/SPARK-26046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mukul Murthy reopened SPARK-26046: -- >From some other discussions I've had, I actually think it's a reasonable to >have a way to remove all listeners. I don't think it should be a >removeAllListeners API, as originally discussed, but StreamingQueryManager >could have a listListeners API which the caller could then choose to use to >remove each listener manually. > Add a way for StreamingQueryManager to remove all listeners > --- > > Key: SPARK-26046 > URL: https://issues.apache.org/jira/browse/SPARK-26046 > Project: Spark > Issue Type: Task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Mukul Murthy >Priority: Major > > StreamingQueryManager should have a way to clear out all listeners. There's > addListener(listener) and removeListener(listener), but not > removeAllListeners. We should expose a new method removeAllListeners() that > calls listenerBus.removeAllListeners (added here: > [https://github.com/apache/spark/commit/9690eba16efe6d25261934d8b73a221972b684f3]). > -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28043) Reading json with duplicate columns drops the first column value
[ https://issues.apache.org/jira/browse/SPARK-28043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mukul Murthy updated SPARK-28043: - Description: When reading a JSON blob with duplicate fields, Spark appears to ignore the value of the first one. JSON recommends unique names but does not require it; since JSON and Spark SQL both allow duplicate field names, we should fix the bug where the first column value is getting dropped. I'm guessing somewhere when parsing JSON, we're turning it into a Map which is causing the first value to be overridden. Repro (Python, 2.4): >>> jsonRDD = spark.sparkContext.parallelize(["\\{ \"a\": \"blah\", \"a\": >>> \"blah2\"}"]) >>> df = spark.read.json(jsonRDD) >>> df.show() +-++ |a|a| +-++ |null|blah2| +-++ The expected response would be: +-++ |a|a| +-++ |blah|blah2| +-++ was: When reading a JSON blob with duplicate fields, Spark appears to ignore the value of the first one. JSON recommends unique names but does not require it; since JSON and Spark SQL both allow duplicate field names, we should fix the bug where the first column value is getting dropped. Repro (Python, 2.4): >>> jsonRDD = spark.sparkContext.parallelize(["\{ \"a\": \"blah\", \"a\": >>> \"blah2\"}"]) >>> df = spark.read.json(jsonRDD) >>> df.show() ++-+ | a| a| ++-+ |null|blah2| ++-+ The expected response would be: ++-+ | a| a| ++-+ |blah|blah2| ++-+ > Reading json with duplicate columns drops the first column value > > > Key: SPARK-28043 > URL: https://issues.apache.org/jira/browse/SPARK-28043 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Mukul Murthy >Priority: Major > > When reading a JSON blob with duplicate fields, Spark appears to ignore the > value of the first one. JSON recommends unique names but does not require it; > since JSON and Spark SQL both allow duplicate field names, we should fix the > bug where the first column value is getting dropped. > > I'm guessing somewhere when parsing JSON, we're turning it into a Map which > is causing the first value to be overridden. > > Repro (Python, 2.4): > >>> jsonRDD = spark.sparkContext.parallelize(["\\{ \"a\": \"blah\", \"a\": > >>> \"blah2\"}"]) > >>> df = spark.read.json(jsonRDD) > >>> df.show() > +-++ > |a|a| > +-++ > |null|blah2| > +-++ > > The expected response would be: > +-++ > |a|a| > +-++ > |blah|blah2| > +-++ -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28043) Reading json with duplicate columns drops the first column value
Mukul Murthy created SPARK-28043: Summary: Reading json with duplicate columns drops the first column value Key: SPARK-28043 URL: https://issues.apache.org/jira/browse/SPARK-28043 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.0 Reporter: Mukul Murthy When reading a JSON blob with duplicate fields, Spark appears to ignore the value of the first one. JSON recommends unique names but does not require it; since JSON and Spark SQL both allow duplicate field names, we should fix the bug where the first column value is getting dropped. Repro (Python, 2.4): >>> jsonRDD = spark.sparkContext.parallelize(["\{ \"a\": \"blah\", \"a\": >>> \"blah2\"}"]) >>> df = spark.read.json(jsonRDD) >>> df.show() ++-+ | a| a| ++-+ |null|blah2| ++-+ The expected response would be: ++-+ | a| a| ++-+ |blah|blah2| ++-+ -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26586) Streaming queries should have isolated SparkSessions and confs
Mukul Murthy created SPARK-26586: Summary: Streaming queries should have isolated SparkSessions and confs Key: SPARK-26586 URL: https://issues.apache.org/jira/browse/SPARK-26586 Project: Spark Issue Type: Bug Components: SQL, Structured Streaming Affects Versions: 2.4.0, 2.3.0 Reporter: Mukul Murthy When a stream is started, the stream's config is supposed to be frozen and all batches run with the config at start time. However, due to a race condition in creating streams, updating a conf value in the active spark session immediately after starting a stream can lead to the stream getting that updated value. The problem is that when StreamingQueryManager creates a MicrobatchExecution (or ContinuousExecution), it passes in the shared spark session, and the spark session isn't cloned until StreamExecution.start() is called. DataStreamWriter.start() should not return until the SparkSession is cloned. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26046) Add a way for StreamingQueryManager to remove all listeners
Mukul Murthy created SPARK-26046: Summary: Add a way for StreamingQueryManager to remove all listeners Key: SPARK-26046 URL: https://issues.apache.org/jira/browse/SPARK-26046 Project: Spark Issue Type: Task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Mukul Murthy StreamingQueryManager should have a way to clear out all listeners. There's addListener(listener) and removeListener(listener), but not removeAllListeners. We should expose a new method removeAllListeners() that calls listenerBus.removeAllListeners (added here: [https://github.com/apache/spark/commit/9690eba16efe6d25261934d8b73a221972b684f3]). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25449) Don't send zero accumulators in heartbeats
Mukul Murthy created SPARK-25449: Summary: Don't send zero accumulators in heartbeats Key: SPARK-25449 URL: https://issues.apache.org/jira/browse/SPARK-25449 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 2.4.0 Reporter: Mukul Murthy Heartbeats sent from executors to the driver every 10 seconds contain metrics and are generally on the order of a few KBs. However, for large jobs with lots of tasks, heartbeats can be on the order of tens of MBs, causing tasks to die with heartbeat failures. We can mitigate this by not sending zero metrics to the driver. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25399) Reusing execution threads from continuous processing for microbatch streaming can result in correctness issues
[ https://issues.apache.org/jira/browse/SPARK-25399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mukul Murthy updated SPARK-25399: - Priority: Major (was: Blocker) > Reusing execution threads from continuous processing for microbatch streaming > can result in correctness issues > -- > > Key: SPARK-25399 > URL: https://issues.apache.org/jira/browse/SPARK-25399 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Mukul Murthy >Priority: Major > > Continuous processing sets some thread local variables that, when read by a > thread running a microbatch stream, may result in incorrect or no previous > state being read and resulting in wrong answers. This was caught by a job > running the StreamSuite tests, and only repros occasionally when the same > threads are used. > The issue is in StateStoreRDD.compute - when we compute currentVersion, we > read from a thread local variable which is set by continuous processing > threads. If this value is set, we then think we're on the wrong state version. > I imagine very few people, if any, would run into this bug, because you'd > have to use continuous processing and then microbatch processing in the same > cluster. However, it can result in silent correctness issues, and it would be > very difficult for someone to tell if they were impacted by this or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25399) Reusing execution threads from continuous processing for microbatch streaming can result in correctness issues
[ https://issues.apache.org/jira/browse/SPARK-25399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609819#comment-16609819 ] Mukul Murthy commented on SPARK-25399: -- cc [~joseph.torres] and [~tdas] > Reusing execution threads from continuous processing for microbatch streaming > can result in correctness issues > -- > > Key: SPARK-25399 > URL: https://issues.apache.org/jira/browse/SPARK-25399 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Mukul Murthy >Priority: Blocker > > Continuous processing sets some thread local variables that, when read by a > thread running a microbatch stream, may result in incorrect or no previous > state being read and resulting in wrong answers. This was caught by a job > running the StreamSuite tests, and only repros occasionally when the same > threads are used. > The issue is in StateStoreRDD.compute - when we compute currentVersion, we > read from a thread local variable which is set by continuous processing > threads. If this value is set, we then think we're on the wrong state version. > I imagine very few people, if any, would run into this bug, because you'd > have to use continuous processing and then microbatch processing in the same > cluster. However, it can result in silent correctness issues, and it would be > very difficult for someone to tell if they were impacted by this or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25399) Reusing execution threads from continuous processing for microbatch streaming can result in correctness issues
Mukul Murthy created SPARK-25399: Summary: Reusing execution threads from continuous processing for microbatch streaming can result in correctness issues Key: SPARK-25399 URL: https://issues.apache.org/jira/browse/SPARK-25399 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Mukul Murthy Continuous processing sets some thread local variables that, when read by a thread running a microbatch stream, may result in incorrect or no previous state being read and resulting in wrong answers. This was caught by a job running the StreamSuite tests, and only repros occasionally when the same threads are used. The issue is in StateStoreRDD.compute - when we compute currentVersion, we read from a thread local variable which is set by continuous processing threads. If this value is set, we then think we're on the wrong state version. I imagine very few people, if any, would run into this bug, because you'd have to use continuous processing and then microbatch processing in the same cluster. However, it can result in silent correctness issues, and it would be very difficult for someone to tell if they were impacted by this or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-25182) Block Manager master and slave thread pools are unbounded
[ https://issues.apache.org/jira/browse/SPARK-25182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mukul Murthy resolved SPARK-25182. -- Resolution: Duplicate Target Version/s: (was: 2.4.0) > Block Manager master and slave thread pools are unbounded > - > > Key: SPARK-25182 > URL: https://issues.apache.org/jira/browse/SPARK-25182 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Mukul Murthy >Priority: Major > > Currently, BlockManagerMasterEndpoint and BlockManagerSlaveEndpoint both have > thread pools with unbounded numbers of threads. In certain cases, this can > lead to driver OOM errors. We should add an upper bound on the number of > threads in these thread pools; this should not break any existing behavior > because they still have queues of size Integer.MAX_VALUE. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25182) Block Manager master and slave thread pools are unbounded
Mukul Murthy created SPARK-25182: Summary: Block Manager master and slave thread pools are unbounded Key: SPARK-25182 URL: https://issues.apache.org/jira/browse/SPARK-25182 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.0 Reporter: Mukul Murthy Currently, BlockManagerMasterEndpoint and BlockManagerSlaveEndpoint both have thread pools with unbounded numbers of threads. In certain cases, this can lead to driver OOM errors. We should add an upper bound on the number of threads in these thread pools; this should not break any existing behavior because they still have queues of size Integer.MAX_VALUE. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25181) Block Manager master and slave thread pools are unbounded
Mukul Murthy created SPARK-25181: Summary: Block Manager master and slave thread pools are unbounded Key: SPARK-25181 URL: https://issues.apache.org/jira/browse/SPARK-25181 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.0 Reporter: Mukul Murthy Currently, BlockManagerMasterEndpoint and BlockManagerSlaveEndpoint both have thread pools with unbounded numbers of threads. In certain cases, this can lead to driver OOM errors. We should add an upper bound on the number of threads in these thread pools; this should not break any existing behavior because they still have queues of size Integer.MAX_VALUE. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24438) Empty strings and null strings are written to the same partition
[ https://issues.apache.org/jira/browse/SPARK-24438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533977#comment-16533977 ] Mukul Murthy commented on SPARK-24438: -- Are null and empty string both invalid partition values? I kind of dislike that it's causing the actual data to be changed, although it is minor, and as you guys said it's actually a Hive bug so I don't think it's straightforward to fix. > Empty strings and null strings are written to the same partition > > > Key: SPARK-24438 > URL: https://issues.apache.org/jira/browse/SPARK-24438 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Mukul Murthy >Priority: Major > > When you partition on a string column that has empty strings and nulls, they > are both written to the same default partition. When you read the data back, > all those values get read back as null. > {code:java} > import org.apache.spark.sql.types._ > import org.apache.spark.sql.catalyst.encoders.RowEncoder > val data = Seq(Row(1, ""), Row(2, ""), Row(3, ""), Row(4, "hello"), Row(5, > null)) > val schema = new StructType().add("a", IntegerType).add("b", StringType) > val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) > display(df) > => > a b > 1 > 2 > 3 > 4 hello > 5 null > df.write.mode("overwrite").partitionBy("b").save("/home/mukul/weird_test_data4") > val df2 = spark.read.load("/home/mukul/weird_test_data4") > display(df2) > => > a b > 4 hello > 3 null > 2 null > 1 null > 5 null > {code} > Seems to affect multiple types of tables. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24662) Structured Streaming should support LIMIT
[ https://issues.apache.org/jira/browse/SPARK-24662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526848#comment-16526848 ] Mukul Murthy commented on SPARK-24662: -- Calling .limit(n) on a DataFrame (or in SQL, SELECT ... LIMIT n) returns only n rows from that query. limit is currently not supported on streaming dataframes; my fix is going to support it for streams writing in append and complete output modes. It's still going to be unsupported for streams in update output mode, because for updating streams, limit doesn't make sense. > Structured Streaming should support LIMIT > - > > Key: SPARK-24662 > URL: https://issues.apache.org/jira/browse/SPARK-24662 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 2.3.1 >Reporter: Mukul Murthy >Priority: Major > > Make structured streams support the LIMIT operator. > This will undo SPARK-24525 as the limit operator would be a superior solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24662) Structured Streaming should support LIMIT
Mukul Murthy created SPARK-24662: Summary: Structured Streaming should support LIMIT Key: SPARK-24662 URL: https://issues.apache.org/jira/browse/SPARK-24662 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 2.3.1 Reporter: Mukul Murthy Make structured streams support the LIMIT operator. This will undo SPARK-24525 as the limit operator would be a superior solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24525) Provide an option to limit MemorySink memory usage
Mukul Murthy created SPARK-24525: Summary: Provide an option to limit MemorySink memory usage Key: SPARK-24525 URL: https://issues.apache.org/jira/browse/SPARK-24525 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.3.1 Reporter: Mukul Murthy MemorySink stores stream results in memory and is mostly used for testing and displaying streams, but for large streams, this can OOM the driver. We should add an option to limit the number of rows and the total size of a memory sink and not add any new data once either limit is hit. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24438) Empty strings and null strings are written to the same partition
Mukul Murthy created SPARK-24438: Summary: Empty strings and null strings are written to the same partition Key: SPARK-24438 URL: https://issues.apache.org/jira/browse/SPARK-24438 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Mukul Murthy When you partition on a string column that has empty strings and nulls, they are both written to the same default partition. When you read the data back, all those values get read back as null. {code:java} import org.apache.spark.sql.types._ import org.apache.spark.sql.catalyst.encoders.RowEncoder val data = Seq(Row(1, ""), Row(2, ""), Row(3, ""), Row(4, "hello"), Row(5, null)) val schema = new StructType().add("a", IntegerType).add("b", StringType) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) display(df) => a b 1 2 3 4 hello 5 null df.write.mode("overwrite").partitionBy("b").save("/home/mukul/weird_test_data4") val df2 = spark.read.load("/home/mukul/weird_test_data4") display(df2) => a b 4 hello 3 null 2 null 1 null 5 null {code} Seems to affect multiple types of tables. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org