[jira] [Comment Edited] (SPARK-43106) Data lost from the table if the INSERT OVERWRITE query fails
[ https://issues.apache.org/jira/browse/SPARK-43106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717461#comment-17717461 ] kalyan s edited comment on SPARK-43106 at 4/28/23 5:03 AM: --- [~cloud_fan] , [~dongjoon] , [~gurwls223] Appreciate input on this! Myself(kalyan) and [~vaibhavb] are from Uber and we see our workloads getting into this problem more regularly. While we are willing to work on it, we would like to know if there were any gotchas in taking forward the idea proposed in SPARK-19183. was (Author: itskals): [~cloud_fan] , [~dongjoon] , [~gurwls223] Input from you folks will help us. Myself(kalyan) and [~vaibhavb] are from Uber and we see our workloads getting into this problem more regularly. While we are willing to work on it, we would like to know if there were any gotchas in taking forward the idea proposed in SPARK-19183. > Data lost from the table if the INSERT OVERWRITE query fails > > > Key: SPARK-43106 > URL: https://issues.apache.org/jira/browse/SPARK-43106 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.2 >Reporter: vaibhav beriwala >Priority: Major > > When we run an INSERT OVERWRITE query for an unpartitioned table on Spark-3, > Spark has the following behavior: > 1) It will first clean up all the data from the actual table path. > 2) It will then launch a job that performs the actual insert. > > There are 2 major issues with this approach: > 1) If the insert job launched in step 2 above fails for any reason, the data > from the original table is lost. > 2) If the insert job in step 2 above takes a huge time to complete, then > table data is unavailable to other readers for the entire duration the job > takes. > This behavior is the same even for the partitioned tables when using static > partitioning. For dynamic partitioning, we do not delete the table data > before the job launch. > > Is there a reason as to why we perform this delete before the job launch and > not as part of the Job commit operation? This issue is not there with Hive - > where the data is cleaned up as part of the Job commit operation probably. As > part of SPARK-19183, we did add a new hook in the commit protocol for this > exact same purpose, but seems like its default behavior is still to delete > the table data before the job launch. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43106) Data lost from the table if the INSERT OVERWRITE query fails
[ https://issues.apache.org/jira/browse/SPARK-43106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717461#comment-17717461 ] kalyan s commented on SPARK-43106: -- [~cloud_fan] , [~dongjoon] , [~gurwls223] Input from you folks will help us. Myself(kalyan) and [~vaibhavb] are from Uber and we see our workloads getting into this problem more regularly. While we are willing to work on it, we would like to know if there were any gotchas in taking forward the idea proposed in SPARK-19183. > Data lost from the table if the INSERT OVERWRITE query fails > > > Key: SPARK-43106 > URL: https://issues.apache.org/jira/browse/SPARK-43106 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.2 >Reporter: vaibhav beriwala >Priority: Major > > When we run an INSERT OVERWRITE query for an unpartitioned table on Spark-3, > Spark has the following behavior: > 1) It will first clean up all the data from the actual table path. > 2) It will then launch a job that performs the actual insert. > > There are 2 major issues with this approach: > 1) If the insert job launched in step 2 above fails for any reason, the data > from the original table is lost. > 2) If the insert job in step 2 above takes a huge time to complete, then > table data is unavailable to other readers for the entire duration the job > takes. > This behavior is the same even for the partitioned tables when using static > partitioning. For dynamic partitioning, we do not delete the table data > before the job launch. > > Is there a reason as to why we perform this delete before the job launch and > not as part of the Job commit operation? This issue is not there with Hive - > where the data is cleaned up as part of the Job commit operation probably. As > part of SPARK-19183, we did add a new hook in the commit protocol for this > exact same purpose, but seems like its default behavior is still to delete > the table data before the job launch. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-43302) Make Python UDAF an AggregateFunction
[ https://issues.apache.org/jira/browse/SPARK-43302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-43302: --- Assignee: Wenchen Fan > Make Python UDAF an AggregateFunction > - > > Key: SPARK-43302 > URL: https://issues.apache.org/jira/browse/SPARK-43302 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-38461) Use error classes in org.apache.spark.broadcast
[ https://issues.apache.org/jira/browse/SPARK-38461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-38461. - Fix Version/s: 3.5.0 Resolution: Fixed Issue resolved by pull request 40978 [https://github.com/apache/spark/pull/40978] > Use error classes in org.apache.spark.broadcast > --- > > Key: SPARK-38461 > URL: https://issues.apache.org/jira/browse/SPARK-38461 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.3.0 >Reporter: Bo Zhang >Assignee: Bo Zhang >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-43302) Make Python UDAF an AggregateFunction
[ https://issues.apache.org/jira/browse/SPARK-43302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-43302. - Fix Version/s: 3.5.0 Resolution: Fixed Issue resolved by pull request 40739 [https://github.com/apache/spark/pull/40739] > Make Python UDAF an AggregateFunction > - > > Key: SPARK-43302 > URL: https://issues.apache.org/jira/browse/SPARK-43302 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-43309) Extend INTERNAL_ERROR with category
[ https://issues.apache.org/jira/browse/SPARK-43309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-43309. - Fix Version/s: 3.5.0 Resolution: Fixed Issue resolved by pull request 40978 [https://github.com/apache/spark/pull/40978] > Extend INTERNAL_ERROR with category > --- > > Key: SPARK-43309 > URL: https://issues.apache.org/jira/browse/SPARK-43309 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Bo Zhang >Assignee: Bo Zhang >Priority: Major > Fix For: 3.5.0 > > > This is to extend INTERNAL_ERROR with different categories / areas / modules > (e.g. INTERNAL_ERROR_BROADCAST) so that we can better differentiate them. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-43309) Extend INTERNAL_ERROR with category
[ https://issues.apache.org/jira/browse/SPARK-43309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-43309: --- Assignee: Bo Zhang > Extend INTERNAL_ERROR with category > --- > > Key: SPARK-43309 > URL: https://issues.apache.org/jira/browse/SPARK-43309 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Bo Zhang >Assignee: Bo Zhang >Priority: Major > > This is to extend INTERNAL_ERROR with different categories / areas / modules > (e.g. INTERNAL_ERROR_BROADCAST) so that we can better differentiate them. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-38461) Use error classes in org.apache.spark.broadcast
[ https://issues.apache.org/jira/browse/SPARK-38461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-38461: --- Assignee: Bo Zhang > Use error classes in org.apache.spark.broadcast > --- > > Key: SPARK-38461 > URL: https://issues.apache.org/jira/browse/SPARK-38461 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.3.0 >Reporter: Bo Zhang >Assignee: Bo Zhang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43315) Migrate remaining errors from DataFrame(Reader|Writer) into error class
[ https://issues.apache.org/jira/browse/SPARK-43315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haejoon Lee updated SPARK-43315: Summary: Migrate remaining errors from DataFrame(Reader|Writer) into error class (was: Migrate errors from DataFrame(Reader|Writer) into error class) > Migrate remaining errors from DataFrame(Reader|Writer) into error class > --- > > Key: SPARK-43315 > URL: https://issues.apache.org/jira/browse/SPARK-43315 > Project: Spark > Issue Type: Sub-task > Components: Connect, PySpark, Structured Streaming >Affects Versions: 3.5.0 >Reporter: Haejoon Lee >Priority: Major > > Migrate errors from DataFrame(Reader|Writer) into error class -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43315) Migrate errors from DataFrame(Reader|Writer) into error class
Haejoon Lee created SPARK-43315: --- Summary: Migrate errors from DataFrame(Reader|Writer) into error class Key: SPARK-43315 URL: https://issues.apache.org/jira/browse/SPARK-43315 Project: Spark Issue Type: Sub-task Components: Connect, PySpark, Structured Streaming Affects Versions: 3.5.0 Reporter: Haejoon Lee Migrate errors from DataFrame(Reader|Writer) into error class -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43314) Migrate Spark Connect client errors into error class
Haejoon Lee created SPARK-43314: --- Summary: Migrate Spark Connect client errors into error class Key: SPARK-43314 URL: https://issues.apache.org/jira/browse/SPARK-43314 Project: Spark Issue Type: Sub-task Components: Connect, PySpark Affects Versions: 3.5.0 Reporter: Haejoon Lee Migrate Spark Connect client errors into error class -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
[ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717418#comment-17717418 ] Jungtaek Lim commented on SPARK-43244: -- [~kimahriman] Great to hear that [~anishshri-db] 's proposal is promising to solve your issue. Would you mind looking into Anish's PR as well? Thanks! > RocksDB State Store can accumulate unbounded native memory > -- > > Key: SPARK-43244 > URL: https://issues.apache.org/jira/browse/SPARK-43244 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > We noticed in one of our production stateful streaming jobs using RocksDB > that an executor with 20g of heap was using around 40g of resident memory. I > noticed a single RocksDB instance was using around 150 MiB of memory, and > only 5 MiB or so of this was from the write batch (which is now cleared after > committing). > After reading about RocksDB memory usage (this link was helpful: > [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) > I realized a lot of this was likely the "Index and Filters" memory usage. > This job is doing a streaming duplicate with a lot of unique keys so it makes > sense these block usages would be high. The problem is that, because as it is > now the underlying RocksDB instance stays open on an executor as long as that > executor is assigned that stateful partition (to be reused across batches). > So a single executor can accumulate a large number of RocksDB instances open > at once, each using a certain amount of native memory. In the worst case, a > single executor could need to keep open every single partitions' RocksDB > instance at once. > There are a couple ways you can control the amount of memory used, such as > limiting the max open files, or adding the option to use the block cache for > the indices and filters, but neither of these solve the underlying problem of > accumulating native memory from multiple partitions on an executor. > The real fix needs to be a mechanism and option to close the underlying > RocksDB instance at the end of each task, so you have the option to only ever > have one RocksDB instance open at a time, thus having predictable memory > usage no matter the size of your data or number of shuffle partitions. > We are running this on Spark 3.3, but just kicked off a test to see if things > are any different in Spark 3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43253) Assign a name to the error class _LEGACY_ERROR_TEMP_2017
[ https://issues.apache.org/jira/browse/SPARK-43253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717412#comment-17717412 ] LUIZ FERNANDO NEVES DE ARAUJO commented on SPARK-43253: --- I created this [pull request|https://github.com/apache/spark/pull/40984] in an attempt to address the issue, could you review? Your feedback would be greatly appreciated. Thank you. > Assign a name to the error class _LEGACY_ERROR_TEMP_2017 > > > Key: SPARK-43253 > URL: https://issues.apache.org/jira/browse/SPARK-43253 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.5.0 >Reporter: Max Gekk >Priority: Minor > Labels: starter > > Choose a proper name for the error class *_LEGACY_ERROR_TEMP_2017* defined in > {*}core/src/main/resources/error/error-classes.json{*}. The name should be > short but complete (look at the example in error-classes.json). > Add a test which triggers the error from user code if such test still doesn't > exist. Check exception fields by using {*}checkError(){*}. The last function > checks valuable error fields only, and avoids dependencies from error text > message. In this way, tech editors can modify error format in > error-classes.json, and don't worry of Spark's internal tests. Migrate other > tests that might trigger the error onto checkError(). > If you cannot reproduce the error from user space (using SQL query), replace > the error by an internal error, see {*}SparkException.internalError(){*}. > Improve the error message format in error-classes.json if the current is not > clear. Propose a solution to users how to avoid and fix such kind of errors. > Please, look at the PR below as examples: > * [https://github.com/apache/spark/pull/38685] > * [https://github.com/apache/spark/pull/38656] > * [https://github.com/apache/spark/pull/38490] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
[ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717407#comment-17717407 ] Anish Shrigondekar commented on SPARK-43244: Yes correct. All the usage will be accounted for using block cache (block cache, memtables, filter/index blocks). And we are getting rid of writeBatchWithIndex. So, with the proposed changes, we can cap memory usage on a per node basis as well. > RocksDB State Store can accumulate unbounded native memory > -- > > Key: SPARK-43244 > URL: https://issues.apache.org/jira/browse/SPARK-43244 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > We noticed in one of our production stateful streaming jobs using RocksDB > that an executor with 20g of heap was using around 40g of resident memory. I > noticed a single RocksDB instance was using around 150 MiB of memory, and > only 5 MiB or so of this was from the write batch (which is now cleared after > committing). > After reading about RocksDB memory usage (this link was helpful: > [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) > I realized a lot of this was likely the "Index and Filters" memory usage. > This job is doing a streaming duplicate with a lot of unique keys so it makes > sense these block usages would be high. The problem is that, because as it is > now the underlying RocksDB instance stays open on an executor as long as that > executor is assigned that stateful partition (to be reused across batches). > So a single executor can accumulate a large number of RocksDB instances open > at once, each using a certain amount of native memory. In the worst case, a > single executor could need to keep open every single partitions' RocksDB > instance at once. > There are a couple ways you can control the amount of memory used, such as > limiting the max open files, or adding the option to use the block cache for > the indices and filters, but neither of these solve the underlying problem of > accumulating native memory from multiple partitions on an executor. > The real fix needs to be a mechanism and option to close the underlying > RocksDB instance at the end of each task, so you have the option to only ever > have one RocksDB instance open at a time, thus having predictable memory > usage no matter the size of your data or number of shuffle partitions. > We are running this on Spark 3.3, but just kicked off a test to see if things > are any different in Spark 3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43313) Adding missing default values for MERGE INSERT actions
Daniel created SPARK-43313: -- Summary: Adding missing default values for MERGE INSERT actions Key: SPARK-43313 URL: https://issues.apache.org/jira/browse/SPARK-43313 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.4.0 Reporter: Daniel -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
[ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717394#comment-17717394 ] Adam Binford edited comment on SPARK-43244 at 4/27/23 11:08 PM: Yeah I just saw that PR and was looking through that, I can see that making sense. The memory usage I was seeing that I couldn't do anything about was the filter/index cache. Trying to re-use the LRUCache across all DB instances is interesting and does seem like that would solve the issue. Do you know if that definitely actually reuses the same underlying cache, and isn't just a config that tells the native RocksDB instance how big to make the cache? Edit: does appear to be the actual cache, so I think that would solve my issues was (Author: kimahriman): Yeah I just saw that PR and was looking through that, I can see that making sense. The memory usage I was seeing that I couldn't do anything about was the filter/index cache. Trying to re-use the LRUCache across all DB instances is interesting and does seem like that would solve the issue. Do you know if that definitely actually reuses the same underlying cache, and isn't just a config that tells the native RocksDB instance how big to make the cache? > RocksDB State Store can accumulate unbounded native memory > -- > > Key: SPARK-43244 > URL: https://issues.apache.org/jira/browse/SPARK-43244 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > We noticed in one of our production stateful streaming jobs using RocksDB > that an executor with 20g of heap was using around 40g of resident memory. I > noticed a single RocksDB instance was using around 150 MiB of memory, and > only 5 MiB or so of this was from the write batch (which is now cleared after > committing). > After reading about RocksDB memory usage (this link was helpful: > [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) > I realized a lot of this was likely the "Index and Filters" memory usage. > This job is doing a streaming duplicate with a lot of unique keys so it makes > sense these block usages would be high. The problem is that, because as it is > now the underlying RocksDB instance stays open on an executor as long as that > executor is assigned that stateful partition (to be reused across batches). > So a single executor can accumulate a large number of RocksDB instances open > at once, each using a certain amount of native memory. In the worst case, a > single executor could need to keep open every single partitions' RocksDB > instance at once. > There are a couple ways you can control the amount of memory used, such as > limiting the max open files, or adding the option to use the block cache for > the indices and filters, but neither of these solve the underlying problem of > accumulating native memory from multiple partitions on an executor. > The real fix needs to be a mechanism and option to close the underlying > RocksDB instance at the end of each task, so you have the option to only ever > have one RocksDB instance open at a time, thus having predictable memory > usage no matter the size of your data or number of shuffle partitions. > We are running this on Spark 3.3, but just kicked off a test to see if things > are any different in Spark 3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
[ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717394#comment-17717394 ] Adam Binford commented on SPARK-43244: -- Yeah I just saw that PR and was looking through that, I can see that making sense. The memory usage I was seeing that I couldn't do anything about was the filter/index cache. Trying to re-use the LRUCache across all DB instances is interesting and does seem like that would solve the issue. Do you know if that definitely actually reuses the same underlying cache, and isn't just a config that tells the native RocksDB instance how big to make the cache? > RocksDB State Store can accumulate unbounded native memory > -- > > Key: SPARK-43244 > URL: https://issues.apache.org/jira/browse/SPARK-43244 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > We noticed in one of our production stateful streaming jobs using RocksDB > that an executor with 20g of heap was using around 40g of resident memory. I > noticed a single RocksDB instance was using around 150 MiB of memory, and > only 5 MiB or so of this was from the write batch (which is now cleared after > committing). > After reading about RocksDB memory usage (this link was helpful: > [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) > I realized a lot of this was likely the "Index and Filters" memory usage. > This job is doing a streaming duplicate with a lot of unique keys so it makes > sense these block usages would be high. The problem is that, because as it is > now the underlying RocksDB instance stays open on an executor as long as that > executor is assigned that stateful partition (to be reused across batches). > So a single executor can accumulate a large number of RocksDB instances open > at once, each using a certain amount of native memory. In the worst case, a > single executor could need to keep open every single partitions' RocksDB > instance at once. > There are a couple ways you can control the amount of memory used, such as > limiting the max open files, or adding the option to use the block cache for > the indices and filters, but neither of these solve the underlying problem of > accumulating native memory from multiple partitions on an executor. > The real fix needs to be a mechanism and option to close the underlying > RocksDB instance at the end of each task, so you have the option to only ever > have one RocksDB instance open at a time, thus having predictable memory > usage no matter the size of your data or number of shuffle partitions. > We are running this on Spark 3.3, but just kicked off a test to see if things > are any different in Spark 3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
[ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717393#comment-17717393 ] Anish Shrigondekar commented on SPARK-43244: [~kimahriman] - we did some investigation here and we believe that it might not be ideal to close instance on task completion and reload it later. We think a better idea would be to remove writeBatch related changes since they are not required and rely on native db operations itself, with WAL disabled. Here is the ticket that I created: https://issues.apache.org/jira/browse/SPARK-43311 Here is the PR for the change: [https://github.com/apache/spark/pull/40981] cc - [~kabhwan] Please let us know if you have any questions/concerns here. > RocksDB State Store can accumulate unbounded native memory > -- > > Key: SPARK-43244 > URL: https://issues.apache.org/jira/browse/SPARK-43244 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > We noticed in one of our production stateful streaming jobs using RocksDB > that an executor with 20g of heap was using around 40g of resident memory. I > noticed a single RocksDB instance was using around 150 MiB of memory, and > only 5 MiB or so of this was from the write batch (which is now cleared after > committing). > After reading about RocksDB memory usage (this link was helpful: > [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) > I realized a lot of this was likely the "Index and Filters" memory usage. > This job is doing a streaming duplicate with a lot of unique keys so it makes > sense these block usages would be high. The problem is that, because as it is > now the underlying RocksDB instance stays open on an executor as long as that > executor is assigned that stateful partition (to be reused across batches). > So a single executor can accumulate a large number of RocksDB instances open > at once, each using a certain amount of native memory. In the worst case, a > single executor could need to keep open every single partitions' RocksDB > instance at once. > There are a couple ways you can control the amount of memory used, such as > limiting the max open files, or adding the option to use the block cache for > the indices and filters, but neither of these solve the underlying problem of > accumulating native memory from multiple partitions on an executor. > The real fix needs to be a mechanism and option to close the underlying > RocksDB instance at the end of each task, so you have the option to only ever > have one RocksDB instance open at a time, thus having predictable memory > usage no matter the size of your data or number of shuffle partitions. > We are running this on Spark 3.3, but just kicked off a test to see if things > are any different in Spark 3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43312) Protobuf: Allow converting Any fields to JSON
[ https://issues.apache.org/jira/browse/SPARK-43312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Raghu Angadi updated SPARK-43312: - Summary: Protobuf: Allow converting Any fields to JSON (was: Protobfu: Allow converting Any fields to JSON) > Protobuf: Allow converting Any fields to JSON > - > > Key: SPARK-43312 > URL: https://issues.apache.org/jira/browse/SPARK-43312 > Project: Spark > Issue Type: Task > Components: Protobuf >Affects Versions: 3.4.0 >Reporter: Raghu Angadi >Priority: Major > > Allow option to deseralize `Any` fields at runtime to JSON strings rather > than leaving them as binary values. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43312) Protobfu: Allow converting Any fields to JSON
Raghu Angadi created SPARK-43312: Summary: Protobfu: Allow converting Any fields to JSON Key: SPARK-43312 URL: https://issues.apache.org/jira/browse/SPARK-43312 Project: Spark Issue Type: Task Components: Protobuf Affects Versions: 3.4.0 Reporter: Raghu Angadi Allow option to deseralize `Any` fields at runtime to JSON strings rather than leaving them as binary values. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43311) RocksDB state store provider memory management enhancements
[ https://issues.apache.org/jira/browse/SPARK-43311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717359#comment-17717359 ] Anish Shrigondekar commented on SPARK-43311: PR here: https://github.com/apache/spark/pull/40981 > RocksDB state store provider memory management enhancements > --- > > Key: SPARK-43311 > URL: https://issues.apache.org/jira/browse/SPARK-43311 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.4.0 >Reporter: Anish Shrigondekar >Priority: Major > > Today when RocksDB is used as a State Store provider, memory usage when > writing using writeBatch is not capped. Also, a related issue is that the > state store coordinator can create multiple RocksDB instances on a single > node without enforcing a global limit on native memory usage. Due to these > issues we could run into OOM issues and task failures. > > We are looking to improve this behavior by doing a series of improvements > such as: > * remove writeBatch and use native RocksDB operations > * use writeBufferManager to manage global limit for all instances on a > single node and accounting memtable + filter/index blocks usage as part of > block cacheWith these changes we will be avoiding OOM issues around RocksDB > native memory usage. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43310) Dataset.observe is ignored when writing to Kafka with batch query
[ https://issues.apache.org/jira/browse/SPARK-43310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated SPARK-43310: - Component/s: Structured Streaming > Dataset.observe is ignored when writing to Kafka with batch query > - > > Key: SPARK-43310 > URL: https://issues.apache.org/jira/browse/SPARK-43310 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 3.3.2, 3.4.0 >Reporter: David Deuber >Priority: Major > > When writing to Kafka with a batch query, metrics defined with > {{Dataset.observe}} are not recorded. > For example, > {code:java} > import org.apache.spark.sql.execution.QueryExecution > import org.apache.spark.sql.util.QueryExecutionListener > spark.listenerManager.register(new QueryExecutionListener { > override def onSuccess(funcName: String, qe: QueryExecution, durationNs: > Long): Unit = { > println(qe.observedMetrics) > } > override def onFailure(funcName: String, qe: QueryExecution, exception: > Exception): Unit = { > //pass > } > }) > val df = Seq(("k", "v")).toDF("key", "value") > val observed = df.observe("my_observation", > lit("metric_value").as("some_metric")) > observed > .write > .format("kafka") > .option("kafka.bootstrap.servers", "host1:port1") > .option("topic", "topic1") > .save() > {code} > prints {{{}Map(){}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43132) Add foreach streaming API in Python
[ https://issues.apache.org/jira/browse/SPARK-43132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717347#comment-17717347 ] Wei Liu commented on SPARK-43132: - i'm working on this > Add foreach streaming API in Python > --- > > Key: SPARK-43132 > URL: https://issues.apache.org/jira/browse/SPARK-43132 > Project: Spark > Issue Type: Task > Components: Connect, Structured Streaming >Affects Versions: 3.5.0 >Reporter: Raghu Angadi >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-43054) Support foreach() in streaming spark connect
[ https://issues.apache.org/jira/browse/SPARK-43054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu resolved SPARK-43054. - Resolution: Duplicate duplicate with 43132 and 43133 > Support foreach() in streaming spark connect > > > Key: SPARK-43054 > URL: https://issues.apache.org/jira/browse/SPARK-43054 > Project: Spark > Issue Type: Task > Components: Connect, Structured Streaming >Affects Versions: 3.5.0 >Reporter: Wei Liu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-43298) predict_batch_udf with scalar input fails when batch size consists of a single value
[ https://issues.apache.org/jira/browse/SPARK-43298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-43298. -- Fix Version/s: 3.5.0 Resolution: Fixed Issue resolved by pull request 40967 [https://github.com/apache/spark/pull/40967] > predict_batch_udf with scalar input fails when batch size consists of a > single value > > > Key: SPARK-43298 > URL: https://issues.apache.org/jira/browse/SPARK-43298 > Project: Spark > Issue Type: Bug > Components: ML, PySpark >Affects Versions: 3.4.0 >Reporter: Lee Yang >Assignee: Lee Yang >Priority: Major > Fix For: 3.5.0 > > > This is related to SPARK-42250. For scalar inputs, the predict_batch_udf > will fail if the batch size is 1: > {code:java} > import numpy as np > from pyspark.ml.functions import predict_batch_udf > from pyspark.sql.types import DoubleType > df = spark.createDataFrame([[1.0],[2.0]], schema=["a"]) > def make_predict_fn(): > def predict(inputs): > return inputs > return predict > identity = predict_batch_udf(make_predict_fn, return_type=DoubleType(), > batch_size=1) > preds = df.withColumn("preds", identity("a")).collect() > {code} > fails with: > {code:java} > File "/.../spark/python/pyspark/worker.py", line 869, in main > process() > File "/.../spark/python/pyspark/worker.py", line 861, in process > serializer.dump_stream(out_iter, outfile) > File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 354, in > dump_stream > return ArrowStreamSerializer.dump_stream(self, > init_stream_yield_batches(), stream) > File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 86, in > dump_stream > for batch in iterator: > File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 347, in > init_stream_yield_batches > for series in iterator: > File "/.../spark/python/pyspark/worker.py", line 555, in func > for result_batch, result_type in result_iter: > File "/.../spark/python/pyspark/ml/functions.py", line 818, in predict > yield _validate_and_transform_prediction_result( > File "/.../spark/python/pyspark/ml/functions.py", line 339, in > _validate_and_transform_prediction_result > if len(preds_array) != num_input_rows: > TypeError: len() of unsized object > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-43298) predict_batch_udf with scalar input fails when batch size consists of a single value
[ https://issues.apache.org/jira/browse/SPARK-43298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-43298: Assignee: Lee Yang > predict_batch_udf with scalar input fails when batch size consists of a > single value > > > Key: SPARK-43298 > URL: https://issues.apache.org/jira/browse/SPARK-43298 > Project: Spark > Issue Type: Bug > Components: ML, PySpark >Affects Versions: 3.4.0 >Reporter: Lee Yang >Assignee: Lee Yang >Priority: Major > > This is related to SPARK-42250. For scalar inputs, the predict_batch_udf > will fail if the batch size is 1: > {code:java} > import numpy as np > from pyspark.ml.functions import predict_batch_udf > from pyspark.sql.types import DoubleType > df = spark.createDataFrame([[1.0],[2.0]], schema=["a"]) > def make_predict_fn(): > def predict(inputs): > return inputs > return predict > identity = predict_batch_udf(make_predict_fn, return_type=DoubleType(), > batch_size=1) > preds = df.withColumn("preds", identity("a")).collect() > {code} > fails with: > {code:java} > File "/.../spark/python/pyspark/worker.py", line 869, in main > process() > File "/.../spark/python/pyspark/worker.py", line 861, in process > serializer.dump_stream(out_iter, outfile) > File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 354, in > dump_stream > return ArrowStreamSerializer.dump_stream(self, > init_stream_yield_batches(), stream) > File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 86, in > dump_stream > for batch in iterator: > File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 347, in > init_stream_yield_batches > for series in iterator: > File "/.../spark/python/pyspark/worker.py", line 555, in func > for result_batch, result_type in result_iter: > File "/.../spark/python/pyspark/ml/functions.py", line 818, in predict > yield _validate_and_transform_prediction_result( > File "/.../spark/python/pyspark/ml/functions.py", line 339, in > _validate_and_transform_prediction_result > if len(preds_array) != num_input_rows: > TypeError: len() of unsized object > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43311) RocksDB state store provider memory management enhancements
[ https://issues.apache.org/jira/browse/SPARK-43311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anish Shrigondekar updated SPARK-43311: --- Description: Today when RocksDB is used as a State Store provider, memory usage when writing using writeBatch is not capped. Also, a related issue is that the state store coordinator can create multiple RocksDB instances on a single node without enforcing a global limit on native memory usage. Due to these issues we could run into OOM issues and task failures. We are looking to improve this behavior by doing a series of improvements such as: * remove writeBatch and use native RocksDB operations * use writeBufferManager to manage global limit for all instances on a single node and accounting memtable + filter/index blocks usage as part of block cacheWith these changes we will be avoiding OOM issues around RocksDB native memory usage. was:RocksDB state store provider memory management enhancements > RocksDB state store provider memory management enhancements > --- > > Key: SPARK-43311 > URL: https://issues.apache.org/jira/browse/SPARK-43311 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.4.0 >Reporter: Anish Shrigondekar >Priority: Major > > Today when RocksDB is used as a State Store provider, memory usage when > writing using writeBatch is not capped. Also, a related issue is that the > state store coordinator can create multiple RocksDB instances on a single > node without enforcing a global limit on native memory usage. Due to these > issues we could run into OOM issues and task failures. > > We are looking to improve this behavior by doing a series of improvements > such as: > * remove writeBatch and use native RocksDB operations > * use writeBufferManager to manage global limit for all instances on a > single node and accounting memtable + filter/index blocks usage as part of > block cacheWith these changes we will be avoiding OOM issues around RocksDB > native memory usage. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43311) RocksDB state store provider memory management enhancements
[ https://issues.apache.org/jira/browse/SPARK-43311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717331#comment-17717331 ] Anish Shrigondekar commented on SPARK-43311: [~kabhwan] - will send the PR for this soon. Thanks > RocksDB state store provider memory management enhancements > --- > > Key: SPARK-43311 > URL: https://issues.apache.org/jira/browse/SPARK-43311 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.4.0 >Reporter: Anish Shrigondekar >Priority: Major > > Today when RocksDB is used as a State Store provider, memory usage when > writing using writeBatch is not capped. Also, a related issue is that the > state store coordinator can create multiple RocksDB instances on a single > node without enforcing a global limit on native memory usage. Due to these > issues we could run into OOM issues and task failures. > > We are looking to improve this behavior by doing a series of improvements > such as: > * remove writeBatch and use native RocksDB operations > * use writeBufferManager to manage global limit for all instances on a > single node and accounting memtable + filter/index blocks usage as part of > block cacheWith these changes we will be avoiding OOM issues around RocksDB > native memory usage. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43311) RocksDB state store provider memory management enhancements
Anish Shrigondekar created SPARK-43311: -- Summary: RocksDB state store provider memory management enhancements Key: SPARK-43311 URL: https://issues.apache.org/jira/browse/SPARK-43311 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.4.0 Reporter: Anish Shrigondekar RocksDB state store provider memory management enhancements -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43310) Dataset.observe is ignored when writing to Kafka with batch query
David Deuber created SPARK-43310: Summary: Dataset.observe is ignored when writing to Kafka with batch query Key: SPARK-43310 URL: https://issues.apache.org/jira/browse/SPARK-43310 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.4.0, 3.3.2 Reporter: David Deuber When writing to Kafka with a batch query, metrics defined with {{Dataset.observe}} are not recorded. For example, {code:java} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.QueryExecutionListener spark.listenerManager.register(new QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { println(qe.observedMetrics) } override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { //pass } }) val df = Seq(("k", "v")).toDF("key", "value") val observed = df.observe("my_observation", lit("metric_value").as("some_metric")) observed .write .format("kafka") .option("kafka.bootstrap.servers", "host1:port1") .option("topic", "topic1") .save() {code} prints {{{}Map(){}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43308) Improve scalar subquery logic plan when result are literal
[ https://issues.apache.org/jira/browse/SPARK-43308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jia Fan updated SPARK-43308: Description: When use scalar subquery, sometimes we can get result before physical plan execute. Like `select (select (count(1)) is null from r where a = c ) from l where a < 4`. The result alway be false. So we can skip unnecessary aggregate. was: When use scalar subquery, sometimes we can get result before physical plan execute. Like `select (select (count(1)) is null from r where a = c group by c) from l where a < 4`. The result alway be false. So we can skip unnecessary aggregate. > Improve scalar subquery logic plan when result are literal > -- > > Key: SPARK-43308 > URL: https://issues.apache.org/jira/browse/SPARK-43308 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.5.0 >Reporter: Jia Fan >Priority: Major > > When use scalar subquery, sometimes we can get result before physical plan > execute. > Like `select (select (count(1)) is null from r where a = c ) from l where a < > 4`. > The result alway be false. So we can skip unnecessary aggregate. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43309) Extend INTERNAL_ERROR with category
Bo Zhang created SPARK-43309: Summary: Extend INTERNAL_ERROR with category Key: SPARK-43309 URL: https://issues.apache.org/jira/browse/SPARK-43309 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 3.5.0 Reporter: Bo Zhang This is to extend INTERNAL_ERROR with different categories / areas / modules (e.g. INTERNAL_ERROR_BROADCAST) so that we can better differentiate them. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43308) Improve scalar subquery logic plan when result are literal
[ https://issues.apache.org/jira/browse/SPARK-43308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jia Fan updated SPARK-43308: Description: When use scalar subquery, sometimes we can get result before physical plan execute. Like `select (select (count(1)) is null from r where a = c group by c) from l where a < 4`. The result alway be false. So we can skip unnecessary aggregate. was: When use scalar subquery, sometimes we can get result before physical plan execute. Like `select a, (select (count(1)) is null from r where a = c group by c) from l where a < 4`. The result alway be false. So we can skip unnecessary aggregate. > Improve scalar subquery logic plan when result are literal > -- > > Key: SPARK-43308 > URL: https://issues.apache.org/jira/browse/SPARK-43308 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.5.0 >Reporter: Jia Fan >Priority: Major > > When use scalar subquery, sometimes we can get result before physical plan > execute. > Like `select (select (count(1)) is null from r where a = c group by c) from l > where a < 4`. > The result alway be false. So we can skip unnecessary aggregate. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43308) Improve scalar subquery logic plan when result are literal
[ https://issues.apache.org/jira/browse/SPARK-43308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jia Fan updated SPARK-43308: Parent: SPARK-35553 Issue Type: Sub-task (was: Improvement) > Improve scalar subquery logic plan when result are literal > -- > > Key: SPARK-43308 > URL: https://issues.apache.org/jira/browse/SPARK-43308 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.5.0 >Reporter: Jia Fan >Priority: Major > > When use scalar subquery, sometimes we can get result before physical plan > execute. > Like `select a, (select (count(1)) is null from r where a = c group by c) > from l where a < 4`. > The result alway be false. So we can skip unnecessary aggregate. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43308) Improve scalar subquery logic plan when result are literal
Jia Fan created SPARK-43308: --- Summary: Improve scalar subquery logic plan when result are literal Key: SPARK-43308 URL: https://issues.apache.org/jira/browse/SPARK-43308 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.5.0 Reporter: Jia Fan When use scalar subquery, sometimes we can get result before physical plan execute. Like `select a, (select (count(1)) is null from r where a = c group by c) from l where a < 4`. The result alway be false. So we can skip unnecessary aggregate. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43156) Correctness COUNT bug in correlated scalar subselect with `COUNT(*) is null`
[ https://issues.apache.org/jira/browse/SPARK-43156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717154#comment-17717154 ] Nikita Awasthi commented on SPARK-43156: User 'Hisoka-X' has created a pull request for this issue: https://github.com/apache/spark/pull/40977 > Correctness COUNT bug in correlated scalar subselect with `COUNT(*) is null` > > > Key: SPARK-43156 > URL: https://issues.apache.org/jira/browse/SPARK-43156 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Jack Chen >Assignee: Jack Chen >Priority: Major > Fix For: 3.5.0 > > > Example query: > {code:java} > spark.sql("select *, (select (count(1)) is null from t1 where t0.a = t1.c) > from t0").collect() > res6: Array[org.apache.spark.sql.Row] = Array([1,1.0,null], [2,2.0,false]) > {code} > In this subquery, count(1) always evaluates to a non-null integer value, so > count(1) is null is always false. The correct evaluation of the subquery is > always false. > We incorrectly evaluate it to null for empty groups. The reason is that > NullPropagation rewrites Aggregate [c] [isnull(count(1))] to Aggregate [c] > [false] - this rewrite would be correct normally, but in the context of a > scalar subquery it breaks our count bug handling in > RewriteCorrelatedScalarSubquery.constructLeftJoins . By the time we get > there, the query appears to not have the count bug - it looks the same as if > the original query had a subquery with select any_value(false) from r..., and > that case is _not_ subject to the count bug. > > Postgres comparison show correct always-false result: > [http://sqlfiddle.com/#!17/67822/5] > DDL for the example: > {code:java} > create or replace temp view t0 (a, b) > as values > (1, 1.0), > (2, 2.0); > create or replace temp view t1 (c, d) > as values > (2, 3.0); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-43257) Assign a name to the error class _LEGACY_ERROR_TEMP_2022
[ https://issues.apache.org/jira/browse/SPARK-43257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk resolved SPARK-43257. -- Fix Version/s: 3.5.0 Resolution: Fixed Issue resolved by pull request 40957 [https://github.com/apache/spark/pull/40957] > Assign a name to the error class _LEGACY_ERROR_TEMP_2022 > > > Key: SPARK-43257 > URL: https://issues.apache.org/jira/browse/SPARK-43257 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.5.0 >Reporter: Max Gekk >Assignee: Jin Helin >Priority: Minor > Labels: starter > Fix For: 3.5.0 > > > Choose a proper name for the error class *_LEGACY_ERROR_TEMP_2022* defined in > {*}core/src/main/resources/error/error-classes.json{*}. The name should be > short but complete (look at the example in error-classes.json). > Add a test which triggers the error from user code if such test still doesn't > exist. Check exception fields by using {*}checkError(){*}. The last function > checks valuable error fields only, and avoids dependencies from error text > message. In this way, tech editors can modify error format in > error-classes.json, and don't worry of Spark's internal tests. Migrate other > tests that might trigger the error onto checkError(). > If you cannot reproduce the error from user space (using SQL query), replace > the error by an internal error, see {*}SparkException.internalError(){*}. > Improve the error message format in error-classes.json if the current is not > clear. Propose a solution to users how to avoid and fix such kind of errors. > Please, look at the PR below as examples: > * [https://github.com/apache/spark/pull/38685] > * [https://github.com/apache/spark/pull/38656] > * [https://github.com/apache/spark/pull/38490] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-43257) Assign a name to the error class _LEGACY_ERROR_TEMP_2022
[ https://issues.apache.org/jira/browse/SPARK-43257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk reassigned SPARK-43257: Assignee: Jin Helin > Assign a name to the error class _LEGACY_ERROR_TEMP_2022 > > > Key: SPARK-43257 > URL: https://issues.apache.org/jira/browse/SPARK-43257 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.5.0 >Reporter: Max Gekk >Assignee: Jin Helin >Priority: Minor > Labels: starter > > Choose a proper name for the error class *_LEGACY_ERROR_TEMP_2022* defined in > {*}core/src/main/resources/error/error-classes.json{*}. The name should be > short but complete (look at the example in error-classes.json). > Add a test which triggers the error from user code if such test still doesn't > exist. Check exception fields by using {*}checkError(){*}. The last function > checks valuable error fields only, and avoids dependencies from error text > message. In this way, tech editors can modify error format in > error-classes.json, and don't worry of Spark's internal tests. Migrate other > tests that might trigger the error onto checkError(). > If you cannot reproduce the error from user space (using SQL query), replace > the error by an internal error, see {*}SparkException.internalError(){*}. > Improve the error message format in error-classes.json if the current is not > clear. Propose a solution to users how to avoid and fix such kind of errors. > Please, look at the PR below as examples: > * [https://github.com/apache/spark/pull/38685] > * [https://github.com/apache/spark/pull/38656] > * [https://github.com/apache/spark/pull/38490] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43051) Allow materializing zero values when deserializing protobuf messages
[ https://issues.apache.org/jira/browse/SPARK-43051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717147#comment-17717147 ] Nikita Awasthi commented on SPARK-43051: User 'justaparth' has created a pull request for this issue: https://github.com/apache/spark/pull/40686 > Allow materializing zero values when deserializing protobuf messages > > > Key: SPARK-43051 > URL: https://issues.apache.org/jira/browse/SPARK-43051 > Project: Spark > Issue Type: Improvement > Components: Protobuf >Affects Versions: 3.4.0 >Reporter: Parth Upadhyay >Priority: Major > > Currently, when deserializing protobufs using {{{}from_protobuf{}}}, fields > that are not explicitly present in the serialized message are deserialized as > {{null}} in the resulting struct. (In proto3, this also includes fields that > have been explicitly set to their zero value, as it is not distinguishable in > the serialized format. > [https://protobuf.dev/programming-guides/field_presence/]) > For example, given a message format like > {code:java} > syntax = "proto3"; > message SearchRequest { > string query = 1; > int32 page_number = 2; > int32 result_per_page = 3; > } > {code} > and an example message like > {code:python} > SearchRequest(query = "", page_number = 10) > {code} > the result from calling {{from_protobuf}} on the serialized form of the above > message would be > {code:json} > {"query": null, "page_number": 10, "result_per_page": null} > {code} > In proto3, all fields are considered optional and have default values > ([https://protobuf.dev/programming-guides/proto3/#default]), and reader > clients in some languages (e.g. go, scala) will fill in that default value > when reading the protobuf. It could be useful to make this configurable so > that zero values can optionally be materialized if desired. > Concretely, in the example above, we might want to deserialize it instead as > {code:json} > {"query": "", "page_number": 10, "result_per_page": 0} > {code} > In this ticket I propose implementing a way to get the above functionality. > In the linked PR, i've done it by adding an option, {{materializeZeroValues}} > that can be passed to the options map in the {{from_protobuf}} function to > enable this behavior. However i'd love any feedback on if i've understood the > problem correctly and if the implementation makes sense. > > PR: https://github.com/apache/spark/pull/40686 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43257) Assign a name to the error class _LEGACY_ERROR_TEMP_2022
[ https://issues.apache.org/jira/browse/SPARK-43257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717141#comment-17717141 ] Jin Helin commented on SPARK-43257: --- I'd like to work on this. > Assign a name to the error class _LEGACY_ERROR_TEMP_2022 > > > Key: SPARK-43257 > URL: https://issues.apache.org/jira/browse/SPARK-43257 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.5.0 >Reporter: Max Gekk >Priority: Minor > Labels: starter > > Choose a proper name for the error class *_LEGACY_ERROR_TEMP_2022* defined in > {*}core/src/main/resources/error/error-classes.json{*}. The name should be > short but complete (look at the example in error-classes.json). > Add a test which triggers the error from user code if such test still doesn't > exist. Check exception fields by using {*}checkError(){*}. The last function > checks valuable error fields only, and avoids dependencies from error text > message. In this way, tech editors can modify error format in > error-classes.json, and don't worry of Spark's internal tests. Migrate other > tests that might trigger the error onto checkError(). > If you cannot reproduce the error from user space (using SQL query), replace > the error by an internal error, see {*}SparkException.internalError(){*}. > Improve the error message format in error-classes.json if the current is not > clear. Propose a solution to users how to avoid and fix such kind of errors. > Please, look at the PR below as examples: > * [https://github.com/apache/spark/pull/38685] > * [https://github.com/apache/spark/pull/38656] > * [https://github.com/apache/spark/pull/38490] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43307) Migrate PandasUDF value errors into error class
Haejoon Lee created SPARK-43307: --- Summary: Migrate PandasUDF value errors into error class Key: SPARK-43307 URL: https://issues.apache.org/jira/browse/SPARK-43307 Project: Spark Issue Type: Sub-task Components: PySpark Affects Versions: 3.5.0 Reporter: Haejoon Lee Migrate PandasUDF value errors into error class -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43306) Migrate `ValueError` from Spark SQL types into error class
Haejoon Lee created SPARK-43306: --- Summary: Migrate `ValueError` from Spark SQL types into error class Key: SPARK-43306 URL: https://issues.apache.org/jira/browse/SPARK-43306 Project: Spark Issue Type: Sub-task Components: PySpark Affects Versions: 3.5.0 Reporter: Haejoon Lee Migrate `ValueError` from Spark SQL types into error class -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43305) Add Java17 dockerfiles for 3.4.0
Yikun Jiang created SPARK-43305: --- Summary: Add Java17 dockerfiles for 3.4.0 Key: SPARK-43305 URL: https://issues.apache.org/jira/browse/SPARK-43305 Project: Spark Issue Type: Sub-task Components: Spark Docker Affects Versions: 3.5.0 Reporter: Yikun Jiang -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43304) Enable test_to_latex by supporting jinja2>=3.0.0
Haejoon Lee created SPARK-43304: --- Summary: Enable test_to_latex by supporting jinja2>=3.0.0 Key: SPARK-43304 URL: https://issues.apache.org/jira/browse/SPARK-43304 Project: Spark Issue Type: Sub-task Components: Pandas API on Spark Affects Versions: 3.5.0 Reporter: Haejoon Lee Please refer to [https://github.com/pandas-dev/pandas/pull/47970] see more detail. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-43261) Migrate `TypeError` from Spark SQL types into error class
[ https://issues.apache.org/jira/browse/SPARK-43261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruifeng Zheng resolved SPARK-43261. --- Fix Version/s: 3.5.0 Resolution: Fixed Issue resolved by pull request 40926 [https://github.com/apache/spark/pull/40926] > Migrate `TypeError` from Spark SQL types into error class > - > > Key: SPARK-43261 > URL: https://issues.apache.org/jira/browse/SPARK-43261 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Haejoon Lee >Assignee: Haejoon Lee >Priority: Major > Fix For: 3.5.0 > > > from pyspark/sql/types.py -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-43261) Migrate `TypeError` from Spark SQL types into error class
[ https://issues.apache.org/jira/browse/SPARK-43261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruifeng Zheng reassigned SPARK-43261: - Assignee: Haejoon Lee > Migrate `TypeError` from Spark SQL types into error class > - > > Key: SPARK-43261 > URL: https://issues.apache.org/jira/browse/SPARK-43261 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Haejoon Lee >Assignee: Haejoon Lee >Priority: Major > > from pyspark/sql/types.py -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-43219) Website can't find INSERT INTO REPLACE Statement
[ https://issues.apache.org/jira/browse/SPARK-43219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-43219. - Fix Version/s: 3.5.0 Resolution: Fixed Issue resolved by pull request 40890 [https://github.com/apache/spark/pull/40890] > Website can't find INSERT INTO REPLACE Statement > > > Key: SPARK-43219 > URL: https://issues.apache.org/jira/browse/SPARK-43219 > Project: Spark > Issue Type: Documentation > Components: Documentation >Affects Versions: 3.4.0 >Reporter: Jia Fan >Assignee: Jia Fan >Priority: Minor > Fix For: 3.5.0 > > > `INSERT INTO REPLACE` statement be supported in [SPARK_40956], but can't be > found in website -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-43219) Website can't find INSERT INTO REPLACE Statement
[ https://issues.apache.org/jira/browse/SPARK-43219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-43219: --- Assignee: Jia Fan > Website can't find INSERT INTO REPLACE Statement > > > Key: SPARK-43219 > URL: https://issues.apache.org/jira/browse/SPARK-43219 > Project: Spark > Issue Type: Documentation > Components: Documentation >Affects Versions: 3.4.0 >Reporter: Jia Fan >Assignee: Jia Fan >Priority: Minor > > `INSERT INTO REPLACE` statement be supported in [SPARK_40956], but can't be > found in website -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43303) Migrate NotImplementedError into PySparkNotImplementedError
Haejoon Lee created SPARK-43303: --- Summary: Migrate NotImplementedError into PySparkNotImplementedError Key: SPARK-43303 URL: https://issues.apache.org/jira/browse/SPARK-43303 Project: Spark Issue Type: Sub-task Components: Connect, PySpark Affects Versions: 3.4.0 Reporter: Haejoon Lee Migrate NotImplementedError into PySparkNotImplementedError -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org