[jira] [Created] (SPARK-51363) Delegate `Desc As JSON` clustering info to recursive jsonType struct
Amanda Liu created SPARK-51363: -- Summary: Delegate `Desc As JSON` clustering info to recursive jsonType struct Key: SPARK-51363 URL: https://issues.apache.org/jira/browse/SPARK-51363 Project: Spark Issue Type: Task Components: SQL Affects Versions: 4.0.0 Reporter: Amanda Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-51362) Change toJSON to use NextIterator API to reduce latency
[ https://issues.apache.org/jira/browse/SPARK-51362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuchen Liu updated SPARK-51362: --- Summary: Change toJSON to use NextIterator API to reduce latency (was: change toJSON to use NextIterator API to reduce latency) > Change toJSON to use NextIterator API to reduce latency > --- > > Key: SPARK-51362 > URL: https://issues.apache.org/jira/browse/SPARK-51362 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Yuchen Liu >Priority: Major > > The current toJSON operation uses the Iterator API where iter.hasNext is > called after iter.next, which means the return of current row depends on the > next row to arrive. If we change it to use the NextIterator API, iter.next > will be called after iter.hasNext, so the current row will return > immediately. This eliminates the dependency between adjacent rows, which > reduces record-level latency. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-51362) Change toJSON to use NextIterator API to reduce record-level latency
[ https://issues.apache.org/jira/browse/SPARK-51362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuchen Liu updated SPARK-51362: --- Summary: Change toJSON to use NextIterator API to reduce record-level latency (was: Change toJSON to use NextIterator API to reduce latency) > Change toJSON to use NextIterator API to reduce record-level latency > > > Key: SPARK-51362 > URL: https://issues.apache.org/jira/browse/SPARK-51362 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Yuchen Liu >Priority: Major > > The current toJSON operation uses the Iterator API where iter.hasNext is > called after iter.next, which means the return of current row depends on the > next row to arrive. If we change it to use the NextIterator API, iter.next > will be called after iter.hasNext, so the current row will return > immediately. This eliminates the dependency between adjacent rows, which > reduces record-level latency. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-51362) change toJSON to use NextIterator API to reduce latency
Yuchen Liu created SPARK-51362: -- Summary: change toJSON to use NextIterator API to reduce latency Key: SPARK-51362 URL: https://issues.apache.org/jira/browse/SPARK-51362 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0.0 Reporter: Yuchen Liu The current toJSON operation uses the Iterator API where iter.hasNext is called after iter.next, which means the return of current row depends on the next row to arrive. If we change it to use the NextIterator API, iter.next will be called after iter.hasNext, so the current row will return immediately. This eliminates the dependency between adjacent rows, which reduces record-level latency. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-51084) Assign appropriate error class for negativeScaleNotAllowedError
[ https://issues.apache.org/jira/browse/SPARK-51084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu updated SPARK-51084: --- Description: Improve user-facing error message for `negativeScaleNotAllowedError`. Previously it was an internal error, even though it should have a user-facing error class. Previous code: {code:java} def negativeScaleNotAllowedError(scale: Int): Throwable = { val sqlConf = QuotingUtils.toSQLConf("spark.sql.legacy.allowNegativeScaleOfDecimal") SparkException.internalError( s"Negative scale is not allowed: ${scale.toString}." + s" Set the config ${sqlConf}" + " to \"true\" to allow it.") } {code} was: Improve user-facing error message for `negativeScaleNotAllowedError`. Previously was an internal error, even though it should have a user-facing error class Previous code: {code:java} def negativeScaleNotAllowedError(scale: Int): Throwable = { val sqlConf = QuotingUtils.toSQLConf("spark.sql.legacy.allowNegativeScaleOfDecimal") SparkException.internalError( s"Negative scale is not allowed: ${scale.toString}." + s" Set the config ${sqlConf}" + " to \"true\" to allow it.") } {code} > Assign appropriate error class for negativeScaleNotAllowedError > --- > > Key: SPARK-51084 > URL: https://issues.apache.org/jira/browse/SPARK-51084 > Project: Spark > Issue Type: Task > Components: Spark Core >Affects Versions: 4.0.0 >Reporter: Amanda Liu >Priority: Minor > Labels: pull-request-available > > Improve user-facing error message for `negativeScaleNotAllowedError`. > Previously it was an internal error, even though it should have a user-facing > error class. > Previous code: > {code:java} > def negativeScaleNotAllowedError(scale: Int): Throwable = { > val sqlConf = > QuotingUtils.toSQLConf("spark.sql.legacy.allowNegativeScaleOfDecimal") > SparkException.internalError( > s"Negative scale is not allowed: ${scale.toString}." + > s" Set the config ${sqlConf}" + > " to \"true\" to allow it.") > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-51084) Assign appropriate error class for negativeScaleNotAllowedError
[ https://issues.apache.org/jira/browse/SPARK-51084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu updated SPARK-51084: --- Description: Improve user-facing error message for `negativeScaleNotAllowedError`. Previously was an internal error, even though it should have a user-facing error class {code:java} def negativeScaleNotAllowedError(scale: Int): Throwable = { val sqlConf = QuotingUtils.toSQLConf("spark.sql.legacy.allowNegativeScaleOfDecimal") SparkException.internalError( s"Negative scale is not allowed: ${scale.toString}." + s" Set the config ${sqlConf}" + " to \"true\" to allow it.") } {code} was: Improve user-facing error message for `negativeScaleNotAllowedError`. Previously was an internal error, even though it should have a user-facing Error Class {code:java} def negativeScaleNotAllowedError(scale: Int): Throwable = { val sqlConf = QuotingUtils.toSQLConf("spark.sql.legacy.allowNegativeScaleOfDecimal") SparkException.internalError( s"Negative scale is not allowed: ${scale.toString}." + s" Set the config ${sqlConf}" + " to \"true\" to allow it.") } {code} > Assign appropriate error class for negativeScaleNotAllowedError > --- > > Key: SPARK-51084 > URL: https://issues.apache.org/jira/browse/SPARK-51084 > Project: Spark > Issue Type: Task > Components: Spark Core >Affects Versions: 4.0.0 >Reporter: Amanda Liu >Priority: Minor > Labels: pull-request-available > > Improve user-facing error message for `negativeScaleNotAllowedError`. > Previously was an internal error, even though it should have a user-facing > error class > > {code:java} > def negativeScaleNotAllowedError(scale: Int): Throwable = { > val sqlConf = > QuotingUtils.toSQLConf("spark.sql.legacy.allowNegativeScaleOfDecimal") > SparkException.internalError( > s"Negative scale is not allowed: ${scale.toString}." + > s" Set the config ${sqlConf}" + > " to \"true\" to allow it.") > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-51084) Assign appropriate error class for negativeScaleNotAllowedError
[ https://issues.apache.org/jira/browse/SPARK-51084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu updated SPARK-51084: --- Description: Improve user-facing error message for `negativeScaleNotAllowedError`. Previously was an internal error, even though it should have a user-facing error class Previous code: {code:java} def negativeScaleNotAllowedError(scale: Int): Throwable = { val sqlConf = QuotingUtils.toSQLConf("spark.sql.legacy.allowNegativeScaleOfDecimal") SparkException.internalError( s"Negative scale is not allowed: ${scale.toString}." + s" Set the config ${sqlConf}" + " to \"true\" to allow it.") } {code} was: Improve user-facing error message for `negativeScaleNotAllowedError`. Previously was an internal error, even though it should have a user-facing error class {code:java} def negativeScaleNotAllowedError(scale: Int): Throwable = { val sqlConf = QuotingUtils.toSQLConf("spark.sql.legacy.allowNegativeScaleOfDecimal") SparkException.internalError( s"Negative scale is not allowed: ${scale.toString}." + s" Set the config ${sqlConf}" + " to \"true\" to allow it.") } {code} > Assign appropriate error class for negativeScaleNotAllowedError > --- > > Key: SPARK-51084 > URL: https://issues.apache.org/jira/browse/SPARK-51084 > Project: Spark > Issue Type: Task > Components: Spark Core >Affects Versions: 4.0.0 >Reporter: Amanda Liu >Priority: Minor > Labels: pull-request-available > > Improve user-facing error message for `negativeScaleNotAllowedError`. > Previously was an internal error, even though it should have a user-facing > error class > Previous code: > {code:java} > def negativeScaleNotAllowedError(scale: Int): Throwable = { > val sqlConf = > QuotingUtils.toSQLConf("spark.sql.legacy.allowNegativeScaleOfDecimal") > SparkException.internalError( > s"Negative scale is not allowed: ${scale.toString}." + > s" Set the config ${sqlConf}" + > " to \"true\" to allow it.") > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-51084) Assign appropriate error class for negativeScaleNotAllowedError
[ https://issues.apache.org/jira/browse/SPARK-51084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu updated SPARK-51084: --- Summary: Assign appropriate error class for negativeScaleNotAllowedError (was: Improve negativeScaleNotAllowedError message) > Assign appropriate error class for negativeScaleNotAllowedError > --- > > Key: SPARK-51084 > URL: https://issues.apache.org/jira/browse/SPARK-51084 > Project: Spark > Issue Type: Task > Components: Spark Core >Affects Versions: 4.0.0 >Reporter: Amanda Liu >Priority: Minor > > Improve user-facing error message for `negativeScaleNotAllowedError`. > Previously was an internal error, even though it should have a user-facing > Error Class > > {code:java} > def negativeScaleNotAllowedError(scale: Int): Throwable = { > val sqlConf = > QuotingUtils.toSQLConf("spark.sql.legacy.allowNegativeScaleOfDecimal") > SparkException.internalError( > s"Negative scale is not allowed: ${scale.toString}." + > s" Set the config ${sqlConf}" + > " to \"true\" to allow it.") > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-51084) Improve negativeScaleNotAllowedError message
Amanda Liu created SPARK-51084: -- Summary: Improve negativeScaleNotAllowedError message Key: SPARK-51084 URL: https://issues.apache.org/jira/browse/SPARK-51084 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu Improve user-facing error message for `negativeScaleNotAllowedError`. Previously was an internal error, even though it should have a user-facing Error Class {code:java} def negativeScaleNotAllowedError(scale: Int): Throwable = { val sqlConf = QuotingUtils.toSQLConf("spark.sql.legacy.allowNegativeScaleOfDecimal") SparkException.internalError( s"Negative scale is not allowed: ${scale.toString}." + s" Set the config ${sqlConf}" + " to \"true\" to allow it.") } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-51074) Set timeout for communicating init configs in streaming foreachBatch
Wei Liu created SPARK-51074: --- Summary: Set timeout for communicating init configs in streaming foreachBatch Key: SPARK-51074 URL: https://issues.apache.org/jira/browse/SPARK-51074 Project: Spark Issue Type: Improvement Components: Connect, SS Affects Versions: 4.1 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-51055) Streaming foreachBatch should put initialization logic inside try
Wei Liu created SPARK-51055: --- Summary: Streaming foreachBatch should put initialization logic inside try Key: SPARK-51055 URL: https://issues.apache.org/jira/browse/SPARK-51055 Project: Spark Issue Type: Improvement Components: Connect, PySpark, SS Affects Versions: 4.1 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-51043) Spark Connect Streaming ForeachBatch log improvement
Wei Liu created SPARK-51043: --- Summary: Spark Connect Streaming ForeachBatch log improvement Key: SPARK-51043 URL: https://issues.apache.org/jira/browse/SPARK-51043 Project: Spark Issue Type: Improvement Components: Connect, SS Affects Versions: 4.1.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-51034) Reformat Describe As JSON statistics dict for parse-ability
Amanda Liu created SPARK-51034: -- Summary: Reformat Describe As JSON statistics dict for parse-ability Key: SPARK-51034 URL: https://issues.apache.org/jira/browse/SPARK-51034 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-51032) Improve Describe As JSON Unsupported Table Error Message
Amanda Liu created SPARK-51032: -- Summary: Improve Describe As JSON Unsupported Table Error Message Key: SPARK-51032 URL: https://issues.apache.org/jira/browse/SPARK-51032 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-51008) Implement Result Stage for AQE
Ziqi Liu created SPARK-51008: Summary: Implement Result Stage for AQE Key: SPARK-51008 URL: https://issues.apache.org/jira/browse/SPARK-51008 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0 Reporter: Ziqi Liu To support [https://github.com/apache/spark/pull/44013#issuecomment-2421167393] we need to implement Result Stage for AQE so that all plan segment can fall into a stage context. This would also improve the AQE flow to a more self-contained state. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-51007) Describe As JSON v2 Table Support
Amanda Liu created SPARK-51007: -- Summary: Describe As JSON v2 Table Support Key: SPARK-51007 URL: https://issues.apache.org/jira/browse/SPARK-51007 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-50541) Describe Table As JSON
[ https://issues.apache.org/jira/browse/SPARK-50541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu updated SPARK-50541: --- Issue Type: Epic (was: Task) > Describe Table As JSON > -- > > Key: SPARK-50541 > URL: https://issues.apache.org/jira/browse/SPARK-50541 > Project: Spark > Issue Type: Epic > Components: SQL >Affects Versions: 4.0.0 >Reporter: Amanda Liu >Assignee: Amanda Liu >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > > Support DESCRIBE TABLE ... [AS JSON] option to display table metadata in > JSON format. > > *Context:* > The Spark SQL command DESCRIBE TABLE displays table metadata in a DataFrame > format geared toward human consumption. This format causes parsing > challenges, e.g. if fields contain special characters or the format changes > as new features are added. > The new AS JSON option would return the table metadata as a JSON string that > supports parsing via machine, while being extensible with a minimized risk of > breaking changes. It is not meant to be human-readable. > > *SQL Ref Spec:* > { DESC | DESCRIBE } [ TABLE ] [ EXTENDED | FORMATTED ] table_name \{ [ > PARTITION clause ] | [ column_name ] } [ AS JSON ] > > *JSON Schema:* > ``` > { > "table_name": "", > "catalog_name": [...], > "database_name": [...], > "qualified_name": "" > "type": "", > "provider": "", > "columns": [ > { "id": 1, "name": "", "type": , "comment": "", > "default": "" } > ], > "partition_values": > { "": "" } > , > "location": "", > "view_definition": "", > "owner": "", > "comment": "", > "table_properties": > { "property1": "", "property2": "" } > , > "storage_properties": > { "property1": "", "property2": "" } > , > "serde_library": "", > "inputformat": "", > "outputformt": "", > "bucket_columns": [], > "sort_columns": [], > "created_time": "", > "last_access": "", > "partition_provider": "" > } > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-50817) Fix rocksdb error reporting
Wei Liu created SPARK-50817: --- Summary: Fix rocksdb error reporting Key: SPARK-50817 URL: https://issues.apache.org/jira/browse/SPARK-50817 Project: Spark Issue Type: Improvement Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-50795) Display all DESCRIBE AS JSON dates in ISO-8601 format
Amanda Liu created SPARK-50795: -- Summary: Display all DESCRIBE AS JSON dates in ISO-8601 format Key: SPARK-50795 URL: https://issues.apache.org/jira/browse/SPARK-50795 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0.0 Reporter: Amanda Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-50769) ClassCastException in HistogramNumeric
Linhong Liu created SPARK-50769: --- Summary: ClassCastException in HistogramNumeric Key: SPARK-50769 URL: https://issues.apache.org/jira/browse/SPARK-50769 Project: Spark Issue Type: Task Components: SQL Affects Versions: 4.0.0 Reporter: Linhong Liu The {{HistogramNumeric}} accepts {{NumberType}} but it doesn't properly handle the DecimalType in the execution. Therefore, the {{ClassCastException}} when trying to change a {{Decimal}} to {{{}Double{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-50690) Fix discrepancy in DESCRIBE TABLE columns list quoting
Amanda Liu created SPARK-50690: -- Summary: Fix discrepancy in DESCRIBE TABLE columns list quoting Key: SPARK-50690 URL: https://issues.apache.org/jira/browse/SPARK-50690 Project: Spark Issue Type: Task Components: SQL Affects Versions: 4.0.0 Reporter: Amanda Liu Previously, the `DESCRIBE TABLE` SQL command displayed column lists such as bucket and sort columns as so: Bucket Columns [`a`] Sort Columns [`b`] However view query output columns were not quoted: View Query Output Columns [c1] This issue fixes the discrepancy. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-50654) CommitMetadata should set stateUniqueIds to None in V1
Wei Liu created SPARK-50654: --- Summary: CommitMetadata should set stateUniqueIds to None in V1 Key: SPARK-50654 URL: https://issues.apache.org/jira/browse/SPARK-50654 Project: Spark Issue Type: Improvement Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-50652) RocksDB V2 backward compatibility currently not suuported
[ https://issues.apache.org/jira/browse/SPARK-50652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-50652: Epic Link: SPARK-49374 > RocksDB V2 backward compatibility currently not suuported > - > > Key: SPARK-50652 > URL: https://issues.apache.org/jira/browse/SPARK-50652 > Project: Spark > Issue Type: Improvement > Components: SS >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > Currently V2 rocksdb format code logic cannot read v1 formatted state store > files. We should block it in commit log level -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-50653) Support RocksDB V2 -> V1 backward compatibility
Wei Liu created SPARK-50653: --- Summary: Support RocksDB V2 -> V1 backward compatibility Key: SPARK-50653 URL: https://issues.apache.org/jira/browse/SPARK-50653 Project: Spark Issue Type: Improvement Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu Follow up of https://issues.apache.org/jira/browse/SPARK-50652, we should support v2 -> v1 compatibility -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-50652) RocksDB V2 backward compatibility currently not suuported
Wei Liu created SPARK-50652: --- Summary: RocksDB V2 backward compatibility currently not suuported Key: SPARK-50652 URL: https://issues.apache.org/jira/browse/SPARK-50652 Project: Spark Issue Type: Improvement Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu Currently V2 rocksdb format code logic cannot read v1 formatted state store files. We should block it in commit log level -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-50622) RocksDB refactor
Wei Liu created SPARK-50622: --- Summary: RocksDB refactor Key: SPARK-50622 URL: https://issues.apache.org/jira/browse/SPARK-50622 Project: Spark Issue Type: Improvement Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-50565) Add transformWithState correctness test
Wei Liu created SPARK-50565: --- Summary: Add transformWithState correctness test Key: SPARK-50565 URL: https://issues.apache.org/jira/browse/SPARK-50565 Project: Spark Issue Type: Improvement Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-50541) Describe Table As JSON
Amanda Liu created SPARK-50541: -- Summary: Describe Table As JSON Key: SPARK-50541 URL: https://issues.apache.org/jira/browse/SPARK-50541 Project: Spark Issue Type: Task Components: SQL Affects Versions: 4.0.0 Reporter: Amanda Liu Support DESCRIBE TABLE ... [AS JSON] option to display table metadata in JSON format. *Context:* The Spark SQL command DESCRIBE TABLE displays table metadata in a DataFrame format geared toward human consumption. This format causes parsing challenges, e.g. if fields contain special characters or the format changes as new features are added. [DBT|https://www.getdbt.com/] is an example customer that motivates this proposal, as providing a structured JSON format can help prevent breakages in pipelines that depend on parsing table metadata. The new AS JSON option would return the table metadata as a JSON string that supports parsing via machine, while being extensible with a minimized risk of breaking changes. It is not meant to be human-readable. *SQL Ref Spec:* { DESC | DESCRIBE } [ TABLE ] [ EXTENDED | FORMATTED ] table_name \{ [ PARTITION clause ] | [ column_name ] } [ AS JSON ] *JSON Schema:* ``` { "table_name": "", "catalog_name": [...], "database_name": [...], "qualified_name": "" "type": "", "provider": "", "columns": [ { "id": 1, "name": "", "type": , "comment": "", "default": "" } ], "partition_values": { "": "" }, "location": "", "view_definition": "", "owner": "", "comment": "", "table_properties": { "property1": "", "property2": "" }, "storage_properties": { "property1": "", "property2": "" }, "serde_library": "", "inputformat": "", "outputformt": "", "bucket_columns": [], "sort_columns": [], "created_time": "", "last_access": "", "partition_provider": "" } ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-50541) Describe Table As JSON
[ https://issues.apache.org/jira/browse/SPARK-50541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu updated SPARK-50541: --- Description: Support DESCRIBE TABLE ... [AS JSON] option to display table metadata in JSON format. *Context:* The Spark SQL command DESCRIBE TABLE displays table metadata in a DataFrame format geared toward human consumption. This format causes parsing challenges, e.g. if fields contain special characters or the format changes as new features are added. The new AS JSON option would return the table metadata as a JSON string that supports parsing via machine, while being extensible with a minimized risk of breaking changes. It is not meant to be human-readable. *SQL Ref Spec:* { DESC | DESCRIBE } [ TABLE ] [ EXTENDED | FORMATTED ] table_name \{ [ PARTITION clause ] | [ column_name ] } [ AS JSON ] *JSON Schema:* ``` { "table_name": "", "catalog_name": [...], "database_name": [...], "qualified_name": "" "type": "", "provider": "", "columns": [ { "id": 1, "name": "", "type": , "comment": "", "default": "" } ], "partition_values": { "": "" } , "location": "", "view_definition": "", "owner": "", "comment": "", "table_properties": { "property1": "", "property2": "" } , "storage_properties": { "property1": "", "property2": "" } , "serde_library": "", "inputformat": "", "outputformt": "", "bucket_columns": [], "sort_columns": [], "created_time": "", "last_access": "", "partition_provider": "" } ``` was: Support DESCRIBE TABLE ... [AS JSON] option to display table metadata in JSON format. *Context:* The Spark SQL command DESCRIBE TABLE displays table metadata in a DataFrame format geared toward human consumption. This format causes parsing challenges, e.g. if fields contain special characters or the format changes as new features are added. The new AS JSON option would return the table metadata as a JSON string that supports parsing via machine, while being extensible with a minimized risk of breaking changes. It is not meant to be human-readable. *SQL Ref Spec:* { DESC | DESCRIBE } [ TABLE ] [ EXTENDED | FORMATTED ] table_name \{ [ PARTITION clause ] | [ column_name ] } [ AS JSON ] *JSON Schema:* ``` { "table_name": "", "catalog_name": [...], "database_name": [...], "qualified_name": "" "type": "", "provider": "", "columns": [ { "id": 1, "name": "", "type": , "comment": "", "default": "" } ], "partition_values": { "": "" } , "location": "", "view_definition": "", "owner": "", "comment": "", "table_properties": { "property1": "", "property2": "" } , "storage_properties": { "property1": "", "property2": "" } , "serde_library": "", "inputformat": "", "outputformt": "", "bucket_columns": [], "sort_columns": [], "created_time": "", "last_access": "", "partition_provider": "" } ``` > Describe Table As JSON > -- > > Key: SPARK-50541 > URL: https://issues.apache.org/jira/browse/SPARK-50541 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 4.0.0 >Reporter: Amanda Liu >Priority: Major > > Support DESCRIBE TABLE ... [AS JSON] option to display table metadata in > JSON format. > > *Context:* > The Spark SQL command DESCRIBE TABLE displays table metadata in a DataFrame > format geared toward human consumption. This format causes parsing > challenges, e.g. if fields contain special characters or the format changes > as new features are added. > The new AS JSON option would return the table metadata as a JSON string that > supports parsing via machine, while being extensible with a minimized risk of > breaking changes. It is not meant to be human-readable. > > *SQL Ref Spec:* > { DESC | DESCRIBE } [ TABLE ] [ EXTENDED | FORMATTED ] table_name \{ [ > PARTITION clause ] | [ column_name ] } [ AS JSON ] > > *JSON Schema:* > ``` > { > "table_name": "", > "catalog_name": [...], > "database_name": [...], > "qualified_name": "" > "type": "", > "provider": "", > "columns": [ > { "id": 1, "name": "", "type": , "comment": "", > "default": "" } > ], > "partition_values": > { "": "" } > , > "location": "", > "view_definition": "", > "owner": "", > "comment": "", > "table_properties": > { "property1": "", "property2": "" } > , > "storage_properties": > { "property1": "", "property2": "" } > , > "serde_library": "", > "inputformat": "", > "outputformt": "", > "bucket_columns": [], > "sort_columns": [], > "created_time": "", > "last_access": "", > "partition_provider": "" > } > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-50541) Describe Table As JSON
[ https://issues.apache.org/jira/browse/SPARK-50541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu updated SPARK-50541: --- Description: Support DESCRIBE TABLE ... [AS JSON] option to display table metadata in JSON format. *Context:* The Spark SQL command DESCRIBE TABLE displays table metadata in a DataFrame format geared toward human consumption. This format causes parsing challenges, e.g. if fields contain special characters or the format changes as new features are added. The new AS JSON option would return the table metadata as a JSON string that supports parsing via machine, while being extensible with a minimized risk of breaking changes. It is not meant to be human-readable. *SQL Ref Spec:* { DESC | DESCRIBE } [ TABLE ] [ EXTENDED | FORMATTED ] table_name \{ [ PARTITION clause ] | [ column_name ] } [ AS JSON ] *JSON Schema:* ``` { "table_name": "", "catalog_name": [...], "database_name": [...], "qualified_name": "" "type": "", "provider": "", "columns": [ { "id": 1, "name": "", "type": , "comment": "", "default": "" } ], "partition_values": { "": "" } , "location": "", "view_definition": "", "owner": "", "comment": "", "table_properties": { "property1": "", "property2": "" } , "storage_properties": { "property1": "", "property2": "" } , "serde_library": "", "inputformat": "", "outputformt": "", "bucket_columns": [], "sort_columns": [], "created_time": "", "last_access": "", "partition_provider": "" } ``` was: Support DESCRIBE TABLE ... [AS JSON] option to display table metadata in JSON format. *Context:* The Spark SQL command DESCRIBE TABLE displays table metadata in a DataFrame format geared toward human consumption. This format causes parsing challenges, e.g. if fields contain special characters or the format changes as new features are added. [DBT|https://www.getdbt.com/] is an example customer that motivates this proposal, as providing a structured JSON format can help prevent breakages in pipelines that depend on parsing table metadata. The new AS JSON option would return the table metadata as a JSON string that supports parsing via machine, while being extensible with a minimized risk of breaking changes. It is not meant to be human-readable. *SQL Ref Spec:* { DESC | DESCRIBE } [ TABLE ] [ EXTENDED | FORMATTED ] table_name \{ [ PARTITION clause ] | [ column_name ] } [ AS JSON ] *JSON Schema:* ``` { "table_name": "", "catalog_name": [...], "database_name": [...], "qualified_name": "" "type": "", "provider": "", "columns": [ { "id": 1, "name": "", "type": , "comment": "", "default": "" } ], "partition_values": { "": "" }, "location": "", "view_definition": "", "owner": "", "comment": "", "table_properties": { "property1": "", "property2": "" }, "storage_properties": { "property1": "", "property2": "" }, "serde_library": "", "inputformat": "", "outputformt": "", "bucket_columns": [], "sort_columns": [], "created_time": "", "last_access": "", "partition_provider": "" } ``` > Describe Table As JSON > -- > > Key: SPARK-50541 > URL: https://issues.apache.org/jira/browse/SPARK-50541 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 4.0.0 >Reporter: Amanda Liu >Priority: Major > > Support DESCRIBE TABLE ... [AS JSON] option to display table metadata in > JSON format. > > *Context:* > The Spark SQL command DESCRIBE TABLE displays table metadata in a DataFrame > format geared toward human consumption. This format causes parsing > challenges, e.g. if fields contain special characters or the format changes > as new features are added. > The new AS JSON option would return the table metadata as a JSON string that > supports parsing via machine, while being extensible with a minimized risk of > breaking changes. It is not meant to be human-readable. > > *SQL Ref Spec:* > { DESC | DESCRIBE } > [ TABLE ] [ EXTENDED | FORMATTED ] table_name \{ [ PARTITION clause ] | [ > column_name ] } [ AS JSON ] > > *JSON Schema:* > ``` > { > "table_name": "", > "catalog_name": [...], > "database_name": [...], > "qualified_name": "" > "type": "", > "provider": "", > "columns": [ > { "id": 1, "name": "", "type": , "comment": "", > "default": "" } > ], > "partition_values": > { "": "" } > , > "location": "", > "view_definition": "", > "owner": "", > "comment": "", > "table_properties": > { "property1": "", "property2": "" } > , > "storage_properties": > { "property1": "", "property2": "" } > , > "serde_library": "", > "inputformat": "", > "outputformt": "", > "bucket_columns": [], > "sort_columns": [], > "created_time": "", > "last_access": "", > "partition_provider": "" > } > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e
[jira] [Created] (SPARK-50360) StateStoreChangelogReader should read state store version from the changelog
Wei Liu created SPARK-50360: --- Summary: StateStoreChangelogReader should read state store version from the changelog Key: SPARK-50360 URL: https://issues.apache.org/jira/browse/SPARK-50360 Project: Spark Issue Type: Improvement Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-50339) Enable changelog to store lineage information
Wei Liu created SPARK-50339: --- Summary: Enable changelog to store lineage information Key: SPARK-50339 URL: https://issues.apache.org/jira/browse/SPARK-50339 Project: Spark Issue Type: Task Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-49448) Spark Connect ExecuteThreadRunner promise will always complete with success.
[ https://issues.apache.org/jira/browse/SPARK-49448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LIU closed SPARK-49448. --- > Spark Connect ExecuteThreadRunner promise will always complete with success. > > > Key: SPARK-49448 > URL: https://issues.apache.org/jira/browse/SPARK-49448 > Project: Spark > Issue Type: Improvement > Components: Connect >Affects Versions: 4.0.0 >Reporter: LIU >Priority: Minor > > {code:java} > private class ExecutionThread(onCompletionPromise: Promise[Unit]) > extends > Thread(s"SparkConnectExecuteThread_opId=${executeHolder.operationId}") { > override def run(): Unit = { > try { > execute() > onCompletionPromise.success(()) > } catch { > case NonFatal(e) => > onCompletionPromise.failure(e) > } > } > }{code} > > execute method end with ErrorUtils.handleError() function call. if any > excetion throw. it will not catch by promise. is it better to catch real > exceptions with promises instead of? if wants. i will submit this change. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-49448) Spark Connect ExecuteThreadRunner promise will always complete with success.
[ https://issues.apache.org/jira/browse/SPARK-49448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LIU resolved SPARK-49448. - Resolution: Abandoned > Spark Connect ExecuteThreadRunner promise will always complete with success. > > > Key: SPARK-49448 > URL: https://issues.apache.org/jira/browse/SPARK-49448 > Project: Spark > Issue Type: Improvement > Components: Connect >Affects Versions: 4.0.0 >Reporter: LIU >Priority: Minor > > {code:java} > private class ExecutionThread(onCompletionPromise: Promise[Unit]) > extends > Thread(s"SparkConnectExecuteThread_opId=${executeHolder.operationId}") { > override def run(): Unit = { > try { > execute() > onCompletionPromise.success(()) > } catch { > case NonFatal(e) => > onCompletionPromise.failure(e) > } > } > }{code} > > execute method end with ErrorUtils.handleError() function call. if any > excetion throw. it will not catch by promise. is it better to catch real > exceptions with promises instead of? if wants. i will submit this change. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-49982) Negative AQE caching in SQL cache
[ https://issues.apache.org/jira/browse/SPARK-49982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-49982: - Description: When we use a cached an AQE plan, it will do `cachedPlan.execute` to build the RDD, which will execute all AQE stages except the result stage. If any of them failed, the failure will be cached by lazy RDD val. So the next time when we reuse that cached plan (even by a totally irrelevant caller) it will fail immediately. We need to re-cache the AQE plan upon failure. was: When we use a cached an AQE plan, it will do `cachedPlan.execute` to build the RDD, which will execute all AQE stages except the result stage. If any of them failed, the failure will be cached by lazy RDD val. So the next any when we reuse that cached plan (even by a totally irrelevant caller) it will fail immediately. We need to re-cache the AQE plan upon failure. > Negative AQE caching in SQL cache > - > > Key: SPARK-49982 > URL: https://issues.apache.org/jira/browse/SPARK-49982 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 4.0.0 >Reporter: Ziqi Liu >Priority: Major > Labels: pull-request-available > > When we use a cached an AQE plan, it will do `cachedPlan.execute` to build > the RDD, which will execute all AQE stages except the result stage. If any of > them failed, the failure will be cached by lazy RDD val. So the next time > when we reuse that cached plan (even by a totally irrelevant caller) it will > fail immediately. > We need to re-cache the AQE plan upon failure. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-49982) Negative AQE caching in SQL cache
Ziqi Liu created SPARK-49982: Summary: Negative AQE caching in SQL cache Key: SPARK-49982 URL: https://issues.apache.org/jira/browse/SPARK-49982 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 4.0.0 Reporter: Ziqi Liu When we use a cached an AQE plan, it will do `cachedPlan.execute` to build the RDD, which will execute all AQE stages except the result stage. If any of them failed, the failure will be cached by lazy RDD val. So the next any when we reuse that cached plan (even by a totally irrelevant caller) it will fail immediately. We need to re-cache the AQE plan upon failure. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-49979) AQE hang forever when collecting twice on a failed plan
Ziqi Liu created SPARK-49979: Summary: AQE hang forever when collecting twice on a failed plan Key: SPARK-49979 URL: https://issues.apache.org/jira/browse/SPARK-49979 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 4.0.0 Reporter: Ziqi Liu When we collect twice in a failed AQE plan, no new query stage will be created, and no stage will be submitted either. We will be waiting for a finish event forever, which will never come because that query stage has already failed in the previous run. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-49886) Query Level Correctness verification
Wei Liu created SPARK-49886: --- Summary: Query Level Correctness verification Key: SPARK-49886 URL: https://issues.apache.org/jira/browse/SPARK-49886 Project: Spark Issue Type: Improvement Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-49885) Avoid listing when V2 lineage is provided in RocksDB
Wei Liu created SPARK-49885: --- Summary: Avoid listing when V2 lineage is provided in RocksDB Key: SPARK-49885 URL: https://issues.apache.org/jira/browse/SPARK-49885 Project: Spark Issue Type: Improvement Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu Optimization, in rocksDB.load() there is no need to list and find the latest version when the lineage is provided, we can just get the latest version from the lineage stored in the changelog -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-49885) Avoid listing when V2 lineage is provided in RocksDB
[ https://issues.apache.org/jira/browse/SPARK-49885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-49885: Epic Link: SPARK-49374 > Avoid listing when V2 lineage is provided in RocksDB > > > Key: SPARK-49885 > URL: https://issues.apache.org/jira/browse/SPARK-49885 > Project: Spark > Issue Type: Improvement > Components: SS >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > Optimization, in rocksDB.load() there is no need to list and find the latest > version when the lineage is provided, we can just get the latest version from > the lineage stored in the changelog -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-49884) Backward Compatibility and Test
Wei Liu created SPARK-49884: --- Summary: Backward Compatibility and Test Key: SPARK-49884 URL: https://issues.apache.org/jira/browse/SPARK-49884 Project: Spark Issue Type: Improvement Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-49883) Integrate with RocksDB and RocksDBFileManager
Wei Liu created SPARK-49883: --- Summary: Integrate with RocksDB and RocksDBFileManager Key: SPARK-49883 URL: https://issues.apache.org/jira/browse/SPARK-49883 Project: Spark Issue Type: Improvement Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-49852) Fix deadlock between shuffle dependency initialization and explain string generation
Ziqi Liu created SPARK-49852: Summary: Fix deadlock between shuffle dependency initialization and explain string generation Key: SPARK-49852 URL: https://issues.apache.org/jira/browse/SPARK-49852 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 4.0.0 Reporter: Ziqi Liu We hit a deadlock between shuffle dependency initialization and explain string generation, in which both code paths trigger lazy variable instantiation while visiting some tree nodes in different orders and thus acquiring object locks in a reversed order. The hashcode of plan node is implemented as lazy val. So the fix is to remove hash code computation from explain string generation so to break the chain of lazy variable instantiation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-49824) SparkConnectStreamingQueryCache Log improvement
Wei Liu created SPARK-49824: --- Summary: SparkConnectStreamingQueryCache Log improvement Key: SPARK-49824 URL: https://issues.apache.org/jira/browse/SPARK-49824 Project: Spark Issue Type: Improvement Components: Connect, SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-49722) Fix Column ordering of dropDuplicates in spark connect
Wei Liu created SPARK-49722: --- Summary: Fix Column ordering of dropDuplicates in spark connect Key: SPARK-49722 URL: https://issues.apache.org/jira/browse/SPARK-49722 Project: Spark Issue Type: Task Components: Connect Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-49127) How to restart failed spark stream job from the failure point
[ https://issues.apache.org/jira/browse/SPARK-49127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17879657#comment-17879657 ] Wei Liu commented on SPARK-49127: - A checkpoint define the streaming query progress. If you use the same checkpoint as the last time (.writeStream.option("checkpointLocation", loc)), you'll be able to restart it. note that if you don't set this option, spark will create a random location for you and try its best to delete it afterwards. So it's nearly impossible to restart from the last offset in this case > How to restart failed spark stream job from the failure point > - > > Key: SPARK-49127 > URL: https://issues.apache.org/jira/browse/SPARK-49127 > Project: Spark > Issue Type: Request > Components: PySpark >Affects Versions: 3.5.0 > Environment: GCP Dataproc cluster >Reporter: Rajdeepak >Priority: Blocker > > I am setting up a ETL process using pyspark. My input is a kafka stream and i > am writing output to multiple sink (one into kafka and another into cloud > storage). I am writing checkpoints on the cloud storage. The issue i am > facing is that, whenever my application is getting failed due to some reason > and when i am restarting my application then, my pyspark application is again > reprocessing some (not all) of the input stream data causing data redundancy. > Is there any way i can avoid this. I am using spark 3.5.0 and python 3.11. > Below are some of my application code: > Spark Session : > spark = SparkSession \ > .builder \ > .appName("ETL") \ > .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2") \ > .config('spark.hadoop.fs.s3a.aws.credentials.provider', > 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\ > .config('spark.driver.extraJavaOptions', '-Duser.timezone=GMT') \ > .config('spark.executor.extraJavaOptions', '-Duser.timezone=GMT') \ > .config('spark.sql.session.timeZone', 'UTC') \ > .config('spark.hadoop.fs.s3a.buffer.dir', '/tmp,/mnt/tmp') \ > .config('spark.hadoop.fs.s3a.fast.upload.buffer', 'bytebuffer') \ > .config('spark.hadoop.fs.s3a.fast.upload.active.blocks', 1) \ > .config('spark.streaming.backpressure.enabled', True) \ > .config("spark.redis.host",conf["nosql-host"]) \ > .config("spark.redis.port",conf["nosql-port"]) \ > .config("spark.redis.db",conf["nosql-db"]) \ > .config("spark.redis.auth", __REDIS_CREDENTIAL__) \ > .getOrCreate() > > Kafka Read Stream : > streamDF = (spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", kafka_bootstrap_server_consumer) \ > .option("subscribe", kafka_topic_name) \ > .option("mode", "PERMISSIVE") \ > .option("startingOffsets", "earliest").option("failOnDataLoss", "false") \ > .load().withColumn('fixedValue', fn.expr("substring(value, 6, > length(value)-5)")).select('fixedValue')) > > Write Stream to multiple sinks : > write_stream = extractionDF \ > .writeStream \ > .trigger(processingTime='2 seconds') \ > .outputMode("append") \ > .foreachBatch(lambda df,epochId: write_to_multiple_sinks(df, > epochId,processed_cloud_storage_path,kafka_bootstrap_server_producer)) \ > .option("truncate", "false").option("checkpointLocation", cloud_storage_path)\ > .start() > write_to_multiple_sinks Function : > def write_to_multiple_sinks(dataframe: DataFrame, epochId,cloud_storage_path, > kafka_bootstrap_server): > dataframe = dataframe.cache() > druidDF = dataframe.select(druidSchema()) > druidDF.selectExpr(producerTopic,"to_json(struct(*)) AS value").write\ > .format("kafka")\ > .option("kafka.bootstrap.servers", kafka_bootstrap_server).save() > processedDF = dataframe.select(processedSchema()) > processedDF.write.format("csv").mode("append").option("sep","^").option("compression","gzip").option("path", > cloud_storage_path).save() > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-49519) Merge options of table and relation when create FileScanBuilder
[ https://issues.apache.org/jira/browse/SPARK-49519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiayi Liu updated SPARK-49519: -- Description: Currently, the subclass of {{FileTable}} only uses the options from relation when constructing the {{{}FileScanBuilder{}}}, which leads to the omission of the contents in {{{}FileTable.options{}}}. For the {{{}TableCatalog{}}}, the {{dsOptions}} can be set into the {{FileTable.options}} returned by the {{TableCatalog.loadTable}} method. If only the relation options are used here, the {{TableCatalog}} will not be able to pass {{dsOptions}} that contains table options to {{{}FileScan{}}}. Merge the two options is a better option. was: Currently, the {{CSVTable}} only uses the options from {{relation}} when constructing the {{{}CSVScanBuilder{}}}, which leads to the omission of the contents in {{{}CSVTable.options{}}}. For the {{{}TableCatalog{}}}, the {{dsOptions}} can be set into the {{CSVTable.options}} returned by the {{TableCatalog.loadTable}} method. If only the relation {{options}} are used here, the {{TableCatalog}} will not be able to pass {{dsOptions}} that contains CSV options to {{{}CSVScan{}}}. Combining the two options is a better option. > Merge options of table and relation when create FileScanBuilder > --- > > Key: SPARK-49519 > URL: https://issues.apache.org/jira/browse/SPARK-49519 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Jiayi Liu >Priority: Major > Labels: pull-request-available > > Currently, the subclass of {{FileTable}} only uses the options from relation > when constructing the {{{}FileScanBuilder{}}}, which leads to the omission of > the contents in {{{}FileTable.options{}}}. For the {{{}TableCatalog{}}}, the > {{dsOptions}} can be set into the {{FileTable.options}} returned by the > {{TableCatalog.loadTable}} method. If only the relation options are used > here, the {{TableCatalog}} will not be able to pass {{dsOptions}} that > contains table options to {{{}FileScan{}}}. > Merge the two options is a better option. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-49519) Merge options of table and relation when create FileScanBuilder
[ https://issues.apache.org/jira/browse/SPARK-49519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiayi Liu updated SPARK-49519: -- Summary: Merge options of table and relation when create FileScanBuilder (was: Combine options of table and relation when create CSVScanBuilder) > Merge options of table and relation when create FileScanBuilder > --- > > Key: SPARK-49519 > URL: https://issues.apache.org/jira/browse/SPARK-49519 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Jiayi Liu >Priority: Major > Labels: pull-request-available > > Currently, the {{CSVTable}} only uses the options from {{relation}} when > constructing the {{{}CSVScanBuilder{}}}, which leads to the omission of the > contents in {{{}CSVTable.options{}}}. For the {{{}TableCatalog{}}}, the > {{dsOptions}} can be set into the {{CSVTable.options}} returned by the > {{TableCatalog.loadTable}} method. If only the relation {{options}} are used > here, the {{TableCatalog}} will not be able to pass {{dsOptions}} that > contains CSV options to {{{}CSVScan{}}}. > Combining the two options is a better option. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-49519) Combine options of table and relation when create CSVScanBuilder
[ https://issues.apache.org/jira/browse/SPARK-49519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiayi Liu updated SPARK-49519: -- Description: Currently, the {{CSVTable}} only uses the options from {{relation}} when constructing the {{{}CSVScanBuilder{}}}, which leads to the omission of the contents in {{{}CSVTable.options{}}}. For the {{{}TableCatalog{}}}, the {{dsOptions}} can be set into the {{CSVTable.options}} returned by the {{TableCatalog.loadTable}} method. If only the relation {{options}} are used here, the {{TableCatalog}} will not be able to pass {{dsOptions}} that contains CSV options to {{{}CSVScan{}}}. Combining the two options is a better option. was: Currently, the {{CSVTable}} only uses the options from {{relation}} when constructing the {{{}CSVScanBuilder{}}}, which leads to the omission of the contents in {{{}CSVTable.options{}}}. For the {{{}TableCatalog{}}}, the {{dsOptions}} can be set into the {{CSVTable.options}} returned by the {{TableCatalog.loadTable}} method. If only the relation {{options}} are used here, the {{TableCatalog}} will not be able to pass {{dsOptions}} that contains CSV options to {{{}CSVScan{}}}. > Combine options of table and relation when create CSVScanBuilder > > > Key: SPARK-49519 > URL: https://issues.apache.org/jira/browse/SPARK-49519 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Jiayi Liu >Priority: Major > > Currently, the {{CSVTable}} only uses the options from {{relation}} when > constructing the {{{}CSVScanBuilder{}}}, which leads to the omission of the > contents in {{{}CSVTable.options{}}}. For the {{{}TableCatalog{}}}, the > {{dsOptions}} can be set into the {{CSVTable.options}} returned by the > {{TableCatalog.loadTable}} method. If only the relation {{options}} are used > here, the {{TableCatalog}} will not be able to pass {{dsOptions}} that > contains CSV options to {{{}CSVScan{}}}. > Combining the two options is a better option. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-49460) NPE error in EmptyRelationExec.cleanupResources()
[ https://issues.apache.org/jira/browse/SPARK-49460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-49460: - Affects Version/s: (was: 3.5.2) (was: 3.5.3) > NPE error in EmptyRelationExec.cleanupResources() > - > > Key: SPARK-49460 > URL: https://issues.apache.org/jira/browse/SPARK-49460 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 4.0.0 >Reporter: Ziqi Liu >Assignee: Ziqi Liu >Priority: Major > Labels: pull-request-available > > This bug was introduced in [https://github.com/apache/spark/pull/46830] : > *{{cleanupResources}}* might be executed on the executor where {{*logical* is > null.}} > > A simple repro > {code:java} > spark.sql("create table t1left (a int, b int);") > spark.sql("insert into t1left values (1, 1), (2,2), (3,3);") > spark.sql("create table t1right (a int, b int);") > spark.sql("create table t1empty (a int, b int);") > spark.sql("insert into t1right values (2,20), (4, 40);") > spark.sql(""" > |with leftT as ( > | with erp as ( > |select > | * > |from > | t1left > | join t1empty on t1left.a = t1empty.a > | join t1right on t1left.a = t1right.a > | ) > | SELECT > |CASE > | WHEN COUNT(*) = 0 THEN 4 > | ELSE NULL > |END AS a > | FROM > |erp > | HAVING > |COUNT(*) = 0 > |) > |select > | /*+ MERGEJOIN(t1right) */ > | * > |from > | leftT > | join t1right on leftT.a = t1right.a""").collect() {code} > > error stacktrace: > {code:java} > 24/08/29 17:52:08 ERROR TaskSetManager: Task 0 in stage 9.0 failed 1 times; > aborting job > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 > (TID 10) (192.168.3.181 executor driver): java.lang.NullPointerException: > Cannot invoke > "org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.foreach(scala.Function1)" > because the return value of > "org.apache.spark.sql.execution.EmptyRelationExec.logical()" is null > at > org.apache.spark.sql.execution.EmptyRelationExec.cleanupResources(EmptyRelationExec.scala:86) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$cleanupResources$1(SparkPlan.scala:571) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$cleanupResources$1$adapted(SparkPlan.scala:571) > at scala.collection.immutable.Vector.foreach(Vector.scala:2124) > > at > org.apache.spark.sql.execution.SparkPlan.cleanupResources(SparkPlan.scala:571) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) > at > org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171) > at org.apache.spark.scheduler.Task.run(Task.scala:146) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-49462) A query running under ckpt version v1 should be able to read a STATELESS query running under ckptv2
[ https://issues.apache.org/jira/browse/SPARK-49462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-49462: Epic Link: SPARK-49374 > A query running under ckpt version v1 should be able to read a STATELESS > query running under ckptv2 > --- > > Key: SPARK-49462 > URL: https://issues.apache.org/jira/browse/SPARK-49462 > Project: Spark > Issue Type: Task > Components: SS >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > Follow up to [https://github.com/apache/spark/pull/47932] > The validate version method force checks if the version persisted in the > commit log > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala#L59] > > But the V2 checkpoint is only meaningful to stateful queries, so for a > stateless query, a CommitLog running under V1 (old code) should be able to > process a persisted commit log that has V2 format (new log) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-49462) A query running under ckpt version v1 should be able to read a STATELESS query running under ckptv2
Wei Liu created SPARK-49462: --- Summary: A query running under ckpt version v1 should be able to read a STATELESS query running under ckptv2 Key: SPARK-49462 URL: https://issues.apache.org/jira/browse/SPARK-49462 Project: Spark Issue Type: Task Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu Follow up to [https://github.com/apache/spark/pull/47932] The validate version method force checks if the version persisted in the commit log [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala#L59] But the V2 checkpoint is only meaningful to stateful queries, so for a stateless query, a CommitLog running under V1 (old code) should be able to process a persisted commit log that has V2 format (new log) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-49461) Persistent Checkpoint ID to commit logs and read it back
Wei Liu created SPARK-49461: --- Summary: Persistent Checkpoint ID to commit logs and read it back Key: SPARK-49461 URL: https://issues.apache.org/jira/browse/SPARK-49461 Project: Spark Issue Type: Task Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-49460) NPE error in EmptyRelationExec.cleanupResources()
[ https://issues.apache.org/jira/browse/SPARK-49460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-49460: - Description: This bug was introduced in [https://github.com/apache/spark/pull/46830] : *{{cleanupResources}}* might be executed on the executor where {{*logical* is null.}} A simple repro {code:java} spark.sql("create table t1left (a int, b int);") spark.sql("insert into t1left values (1, 1), (2,2), (3,3);") spark.sql("create table t1right (a int, b int);") spark.sql("create table t1empty (a int, b int);") spark.sql("insert into t1right values (2,20), (4, 40);") spark.sql(""" |with leftT as ( | with erp as ( |select | * |from | t1left | join t1empty on t1left.a = t1empty.a | join t1right on t1left.a = t1right.a | ) | SELECT |CASE | WHEN COUNT(*) = 0 THEN 4 | ELSE NULL |END AS a | FROM |erp | HAVING |COUNT(*) = 0 |) |select | /*+ MERGEJOIN(t1right) */ | * |from | leftT | join t1right on leftT.a = t1right.a""").collect() {code} error stacktrace: {code:java} 24/08/29 17:52:08 ERROR TaskSetManager: Task 0 in stage 9.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 10) (192.168.3.181 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.foreach(scala.Function1)" because the return value of "org.apache.spark.sql.execution.EmptyRelationExec.logical()" is null at org.apache.spark.sql.execution.EmptyRelationExec.cleanupResources(EmptyRelationExec.scala:86) at org.apache.spark.sql.execution.SparkPlan.$anonfun$cleanupResources$1(SparkPlan.scala:571) at org.apache.spark.sql.execution.SparkPlan.$anonfun$cleanupResources$1$adapted(SparkPlan.scala:571) at scala.collection.immutable.Vector.foreach(Vector.scala:2124) at org.apache.spark.sql.execution.SparkPlan.cleanupResources(SparkPlan.scala:571) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171) at org.apache.spark.scheduler.Task.run(Task.scala:146) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644) {code} was: This bug was introduced in [https://github.com/apache/spark/pull/46830] where it wrongly assumes {{cleanupResources}} will not be called on the executor. A simple repro {code:java} spark.sql("create table t1left (a int, b int);") spark.sql("insert into t1left values (1, 1), (2,2), (3,3);") spark.sql("create table t1right (a int, b int);") spark.sql("create table t1empty (a int, b int);") spark.sql("insert into t1right values (2,20), (4, 40);") spark.sql(""" |with leftT as ( | with erp as ( |select | * |from | t1left | join t1empty on t1left.a = t1empty.a | join t1right on t1left.a = t1right.a | ) | SELECT |CASE | WHEN COUNT(*) = 0 THEN 4 | ELSE NULL |END AS a | FROM |erp | HAVING |COUNT(*) = 0 |) |select | /*+ MERGEJOIN(t1right) */ | * |from | leftT | join t1right on leftT.a = t1right.a""").collect() {code} error stacktrace: {code:java} 24/08/29 17:52:08 ERROR TaskSetManager: Task 0 in stage 9.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 10) (192.168.3.181 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.foreach(scala.Function1)" because the return value of "org.apache.spark.sql.execution.EmptyRelationExec.logical()" is null at org.apache.spark.sql.execution.EmptyRelationExec.cleanupResources(EmptyRelat
[jira] [Updated] (SPARK-49460) NPE error in EmptyRelationExec.cleanupResources()
[ https://issues.apache.org/jira/browse/SPARK-49460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-49460: - Description: This bug was introduced in [https://github.com/apache/spark/pull/46830] where it wrongly assumes {{cleanupResources}} will not be called on the executor. A simple repro ``` spark.sql("create table t1left (a int, b int);") spark.sql("insert into t1left values (1, 1), (2,2), (3,3);") spark.sql("create table t1right (a int, b int);") spark.sql("create table t1empty (a int, b int);") spark.sql("insert into t1right values (2,20), (4, 40);") spark.sql(""" with leftT as ( with erp as ( select * from t1left join t1empty on t1left.a = t1empty.a join t1right on t1left.a = t1right.a ) SELECT CASE WHEN COUNT(*) = 0 THEN 4 ELSE NULL END AS a FROM erp HAVING COUNT(*) = 0 ) select /*+ MERGEJOIN(t1right) */ * from leftT join t1right on leftT.a = t1right.a """).collect() error stacktrace: ``` 24/08/29 17:52:08 ERROR TaskSetManager: Task 0 in stage 9.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 10) (192.168.3.181 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.foreach(scala.Function1)" because the return value of "org.apache.spark.sql.execution.EmptyRelationExec.logical()" is null at org.apache.spark.sql.execution.EmptyRelationExec.cleanupResources(EmptyRelationExec.scala:86) at org.apache.spark.sql.execution.SparkPlan.$anonfun$cleanupResources$1(SparkPlan.scala:571) at org.apache.spark.sql.execution.SparkPlan.$anonfun$cleanupResources$1$adapted(SparkPlan.scala:571) at scala.collection.immutable.Vector.foreach(Vector.scala:2124) at org.apache.spark.sql.execution.SparkPlan.cleanupResources(SparkPlan.scala:571) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171) at org.apache.spark.scheduler.Task.run(Task.scala:146) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644) ``` was: This bug was introduced in [https://github.com/apache/spark/pull/46830] where it wrongly assumes {{cleanupResources}} will not be called on the executor. A simple repro ``` spark.sql("create table t1left (a int, b int);") spark.sql("insert into t1left values (1, 1), (2,2), (3,3);") spark.sql("create table t1right (a int, b int);") spark.sql("create table t1empty (a int, b int);") spark.sql("insert into t1right values (2,20), (4, 40);") spark.sql(""" |with leftT as ( | with erp as ( | select | * | from | t1left | join t1empty on t1left.a = t1empty.a | join t1right on t1left.a = t1right.a | ) | SELECT | CASE | WHEN COUNT(*) = 0 THEN 4 | ELSE NULL | END AS a | FROM | erp | HAVING | COUNT(*) = 0 |) |select | /*+ MERGEJOIN(t1right) */ | * |from | leftT | join t1right on leftT.a = t1right.a""").collect() ``` error stacktrace: ``` 24/08/29 17:52:08 ERROR TaskSetManager: Task 0 in stage 9.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 10) (192.168.3.181 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.foreach(scala.Function1)" because the return value of "org.apache.spark.sql.execution.EmptyRelationExec.logical()" is null at org.apache.spark.sql.execution.EmptyRelationExec.cleanupResources(EmptyRelationExec.scala:86) at org.apache.spark.sql.execution.SparkPlan.$anonfun$cleanupResources$1(SparkPlan.scala:571) at org.apache.spark.sql.execution.SparkPlan.$anonfun$cleanupResources$1$adapted(SparkPlan.scala:5
[jira] [Created] (SPARK-49460) NPE error in EmptyRelationExec.cleanupResources()
Ziqi Liu created SPARK-49460: Summary: NPE error in EmptyRelationExec.cleanupResources() Key: SPARK-49460 URL: https://issues.apache.org/jira/browse/SPARK-49460 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.5.2, 4.0.0, 3.5.3 Reporter: Ziqi Liu This bug was introduced in [https://github.com/apache/spark/pull/46830] where it wrongly assumes {{cleanupResources}} will not be called on the executor. A simple repro ``` spark.sql("create table t1left (a int, b int);") spark.sql("insert into t1left values (1, 1), (2,2), (3,3);") spark.sql("create table t1right (a int, b int);") spark.sql("create table t1empty (a int, b int);") spark.sql("insert into t1right values (2,20), (4, 40);") spark.sql(""" |with leftT as ( | with erp as ( | select | * | from | t1left | join t1empty on t1left.a = t1empty.a | join t1right on t1left.a = t1right.a | ) | SELECT | CASE | WHEN COUNT(*) = 0 THEN 4 | ELSE NULL | END AS a | FROM | erp | HAVING | COUNT(*) = 0 |) |select | /*+ MERGEJOIN(t1right) */ | * |from | leftT | join t1right on leftT.a = t1right.a""").collect() ``` error stacktrace: ``` 24/08/29 17:52:08 ERROR TaskSetManager: Task 0 in stage 9.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 10) (192.168.3.181 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.foreach(scala.Function1)" because the return value of "org.apache.spark.sql.execution.EmptyRelationExec.logical()" is null at org.apache.spark.sql.execution.EmptyRelationExec.cleanupResources(EmptyRelationExec.scala:86) at org.apache.spark.sql.execution.SparkPlan.$anonfun$cleanupResources$1(SparkPlan.scala:571) at org.apache.spark.sql.execution.SparkPlan.$anonfun$cleanupResources$1$adapted(SparkPlan.scala:571) at scala.collection.immutable.Vector.foreach(Vector.scala:2124) at org.apache.spark.sql.execution.SparkPlan.cleanupResources(SparkPlan.scala:571) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171) at org.apache.spark.scheduler.Task.run(Task.scala:146) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644) ``` ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-49460) NPE error in EmptyRelationExec.cleanupResources()
[ https://issues.apache.org/jira/browse/SPARK-49460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-49460: - Issue Type: Bug (was: Improvement) > NPE error in EmptyRelationExec.cleanupResources() > - > > Key: SPARK-49460 > URL: https://issues.apache.org/jira/browse/SPARK-49460 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 4.0.0, 3.5.2, 3.5.3 >Reporter: Ziqi Liu >Priority: Major > > This bug was introduced in [https://github.com/apache/spark/pull/46830] where > it wrongly assumes {{cleanupResources}} will not be called on the executor. > > A simple repro > ``` > spark.sql("create table t1left (a int, b int);") > spark.sql("insert into t1left values (1, 1), (2,2), (3,3);") > spark.sql("create table t1right (a int, b int);") > spark.sql("create table t1empty (a int, b int);") > spark.sql("insert into t1right values (2,20), (4, 40);") > spark.sql(""" > |with leftT as ( > | with erp as ( > | select > | * > | from > | t1left > | join t1empty on t1left.a = t1empty.a > | join t1right on t1left.a = t1right.a > | ) > | SELECT > | CASE > | WHEN COUNT(*) = 0 THEN 4 > | ELSE NULL > | END AS a > | FROM > | erp > | HAVING > | COUNT(*) = 0 > |) > |select > | /*+ MERGEJOIN(t1right) */ > | * > |from > | leftT > | join t1right on leftT.a = t1right.a""").collect() > ``` > > > error stacktrace: > ``` > 24/08/29 17:52:08 ERROR TaskSetManager: Task 0 in stage 9.0 failed 1 times; > aborting job > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 > (TID 10) (192.168.3.181 executor driver): java.lang.NullPointerException: > Cannot invoke > "org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.foreach(scala.Function1)" > because the return value of > "org.apache.spark.sql.execution.EmptyRelationExec.logical()" is null > at > org.apache.spark.sql.execution.EmptyRelationExec.cleanupResources(EmptyRelationExec.scala:86) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$cleanupResources$1(SparkPlan.scala:571) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$cleanupResources$1$adapted(SparkPlan.scala:571) > at scala.collection.immutable.Vector.foreach(Vector.scala:2124) > > at > org.apache.spark.sql.execution.SparkPlan.cleanupResources(SparkPlan.scala:571) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) > at > org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171) > at org.apache.spark.scheduler.Task.run(Task.scala:146) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644) > ``` > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-49448) Spark Connect ExecuteThreadRunner promise will always complete with success.
[ https://issues.apache.org/jira/browse/SPARK-49448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LIU updated SPARK-49448: Description: {code:java} private class ExecutionThread(onCompletionPromise: Promise[Unit]) extends Thread(s"SparkConnectExecuteThread_opId=${executeHolder.operationId}") { override def run(): Unit = { try { execute() onCompletionPromise.success(()) } catch { case NonFatal(e) => onCompletionPromise.failure(e) } } }{code} execute method end with ErrorUtils.handleError() function call. if any excetion throw. it will not catch by promise. is it better to catch real exceptions with promises instead of? if wants. i will submit this change. was: {code:java} //代码占位符 {code} private class ExecutionThread(onCompletionPromise: Promise[Unit]) extends Thread(s"SparkConnectExecuteThread_opId=${executeHolder.operationId}") \{ override def run(): Unit = { try { execute() onCompletionPromise.success(()) } catch \{ case NonFatal(e) => onCompletionPromise.failure(e) } } } execute method end with ErrorUtils.handleError() function call. if any excetion throw. it will not catch by promise. is it better to catch real exceptions with promises instead of. > Spark Connect ExecuteThreadRunner promise will always complete with success. > > > Key: SPARK-49448 > URL: https://issues.apache.org/jira/browse/SPARK-49448 > Project: Spark > Issue Type: Improvement > Components: Connect >Affects Versions: 4.0.0 >Reporter: LIU >Priority: Minor > > {code:java} > private class ExecutionThread(onCompletionPromise: Promise[Unit]) > extends > Thread(s"SparkConnectExecuteThread_opId=${executeHolder.operationId}") { > override def run(): Unit = { > try { > execute() > onCompletionPromise.success(()) > } catch { > case NonFatal(e) => > onCompletionPromise.failure(e) > } > } > }{code} > > execute method end with ErrorUtils.handleError() function call. if any > excetion throw. it will not catch by promise. is it better to catch real > exceptions with promises instead of? if wants. i will submit this change. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-49448) Spark Connect ExecuteThreadRunner promise will always complete with success.
LIU created SPARK-49448: --- Summary: Spark Connect ExecuteThreadRunner promise will always complete with success. Key: SPARK-49448 URL: https://issues.apache.org/jira/browse/SPARK-49448 Project: Spark Issue Type: Improvement Components: Connect Affects Versions: 4.0.0 Reporter: LIU private class ExecutionThread(onCompletionPromise: Promise[Unit]) extends Thread(s"SparkConnectExecuteThread_opId=${executeHolder.operationId}") { override def run(): Unit = { try { execute() onCompletionPromise.success(()) } catch { case NonFatal(e) => onCompletionPromise.failure(e) } } } execute method end with ErrorUtils.handleError() function call. if any excetion throw. it will not catch by promise. is it better to catch real exceptions with promises instead of. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-49448) Spark Connect ExecuteThreadRunner promise will always complete with success.
[ https://issues.apache.org/jira/browse/SPARK-49448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LIU updated SPARK-49448: Description: {code:java} //代码占位符 {code} private class ExecutionThread(onCompletionPromise: Promise[Unit]) extends Thread(s"SparkConnectExecuteThread_opId=${executeHolder.operationId}") \{ override def run(): Unit = { try { execute() onCompletionPromise.success(()) } catch \{ case NonFatal(e) => onCompletionPromise.failure(e) } } } execute method end with ErrorUtils.handleError() function call. if any excetion throw. it will not catch by promise. is it better to catch real exceptions with promises instead of. was: private class ExecutionThread(onCompletionPromise: Promise[Unit]) extends Thread(s"SparkConnectExecuteThread_opId=${executeHolder.operationId}") { override def run(): Unit = { try { execute() onCompletionPromise.success(()) } catch { case NonFatal(e) => onCompletionPromise.failure(e) } } } execute method end with ErrorUtils.handleError() function call. if any excetion throw. it will not catch by promise. is it better to catch real exceptions with promises instead of. > Spark Connect ExecuteThreadRunner promise will always complete with success. > > > Key: SPARK-49448 > URL: https://issues.apache.org/jira/browse/SPARK-49448 > Project: Spark > Issue Type: Improvement > Components: Connect >Affects Versions: 4.0.0 >Reporter: LIU >Priority: Minor > > {code:java} > //代码占位符 > {code} > private class ExecutionThread(onCompletionPromise: Promise[Unit]) extends > Thread(s"SparkConnectExecuteThread_opId=${executeHolder.operationId}") \{ > override def run(): Unit = { try { execute() onCompletionPromise.success(()) > } catch \{ case NonFatal(e) => onCompletionPromise.failure(e) } } } > > execute method end with ErrorUtils.handleError() function call. if any > excetion throw. it will not catch by promise. is it better to catch real > exceptions with promises instead of. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-49374) RocksDB State Store Checkpoint Structure V2
[ https://issues.apache.org/jira/browse/SPARK-49374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-49374: Description: h2. Motivation We expect the new checkpoint structure would benefit micro-batch mode by establishing characteristics of linear dependency between batch versions. Right now, tasks can be executed multiple times for the same batchID (for speculative execution or rerunning in ForEachBatch), and there can be multiple parallel lineages of state stores going on. For example, in one of the issue with ForEachBatch showed this lineage, which triggered a RocksDB file uploading bug: !image-2024-08-23-14-28-56-418.png! Although we fixed all the bugs, this complexity always makes the system prone to bugs. This non-linear lineage also presents a correctness risk, when some of the Version is changelogs. !image-2024-08-23-14-29-19-443.png! In the same example, suppose Version 17 is a Snapshot checkpoint and Version 18 is a changelog checkpoint. When we need to recover from a checkpoint, we need to apply Version 17 and Version 18 together. However, Version 18 isn’t generated on top of Version 17. This can happen either because Version 18 is generated by a different worker from Version 17, or the same worker abandoned Version 17, replay this batch and generated Version 17’. In most cases, it is accurate, but we have identified some edge cases where users might be surprised by the results, and there may already be correctness issues. These correctness issues will become more prominent with transformWithState due to the occurrence of partial updates. Note that the fixing state store lineage only makes sure the state store is consistent, and doesn’t make sure the state store is consistent with outputs to the sink. Furthermore, the complex checkpoint version lineage makes it hard to reduce overhead for old version cleanup. Even a long running state store has to read metadata for all previous versions and list all files to do any cleanup safely, which is expensive. It is necessary because any version can be written by a different executor and references to RocksDB files that the current executor isn’t aware of. In extreme cases, we can even corrupt the state store. The chance that it happens is very low, but it’s not a good idea to leave unknown correctness risk unfixed. h2. Proposal sketch The proposed checkpoint structure will ensure a linear dependency: !image-2024-08-23-14-29-59-165.png! This stronger guarantee will be a good foundation for the problems above. The proposal’s basic idea is to guarantee linear lineage by not allowing checkpoint overwriting. All checkpoints are made with a new file name with a uniqueID. When starting any batch, a task precisely identifies which checkpoint to load with the uniqueID. When a new state store checkpoint is generated, the checkpoint path name includes a globally unique ID, so that it can never be updated. Here is an example: 20_d8e2ca47.zip 21_ef6618c2.delta 21_f4d05ac9.delta 22_4489578d.delta The name is stored in the commit log too. When the next batch is being executed, those unique IDs are passed to the executors, where they make sure they start to execute from this checkpoint. If the local state store isn’t in this state, it will download the checkpoint from the cloud. h1. Part II: Detailed design h2. Architecture Previously, a state store checkpoint was stored in a path that was only determined by checkpoint root path, operatorID, partitionID and batchID. When a stateful operator is executed, it is always able to construct the path with that information. It will download the checkpoint path from the path of the previous batchID and checkpoint to the path for this batchID. As mentioned earlier, this flexibility comes with a cost: there is no way to distinguish among checkpoints generated by different tasks rerunning for the same batchID. In this new design, every checkpoint will go to a globally unique file. The globally unique ID needs to be communicated between driver and the executors, and stored in commit logs. The basic workflow is shown as following: !image-2024-08-23-14-30-22-474.png! h3. File Structure Under Checkpoint Path Currently, a checkpoint is stored in path _.[changelog, zip]._ The path structure will look like following: __ 0 (operator ID) ++ | 0 (partitionID) +-+ | …… | 1 (partitionID) +-+ | |- default (storeName) | +-+ | | 20.zip | | 21.delta | | 22.delta | + 23.delta | 2 (partitionID) +--- …… The general structure will be intact, and we only change how files in the root path. For example, in the example above, we will only change file names in the file names in blue color. Inste
[jira] [Updated] (SPARK-49374) RocksDB State Store Checkpoint Structure V2
[ https://issues.apache.org/jira/browse/SPARK-49374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-49374: Description: h2. Motivation We expect the new checkpoint structure would benefit micro-batch mode by establishing characteristics of linear dependency between batch versions. Right now, tasks can be executed multiple times for the same batchID (for speculative execution or rerunning in ForEachBatch), and there can be multiple parallel lineages of state stores going on. For example, in one of the issue with ForEachBatch showed this lineage, which triggered a RocksDB file uploading bug: !image-2024-08-23-14-28-56-418.png! Although we fixed all the bugs, this complexity always makes the system prone to bugs. This non-linear lineage also presents a correctness risk, when some of the Version is changelogs. !image-2024-08-23-14-29-19-443.png! In the same example, suppose Version 17 is a Snapshot checkpoint and Version 18 is a changelog checkpoint. When we need to recover from a checkpoint, we need to apply Version 17 and Version 18 together. However, Version 18 isn’t generated on top of Version 17. This can happen either because Version 18 is generated by a different worker from Version 17, or the same worker abandoned Version 17, replay this batch and generated Version 17’. In most cases, it is accurate, but we have identified some edge cases where users might be surprised by the results, and there may already be correctness issues. These correctness issues will become more prominent with transformWithState due to the occurrence of partial updates. Note that the fixing state store lineage only makes sure the state store is consistent, and doesn’t make sure the state store is consistent with outputs to the sink. Furthermore, the complex checkpoint version lineage makes it hard to reduce overhead for old version cleanup. Even a long running state store has to read metadata for all previous versions and list all files to do any cleanup safely, which is expensive. It is necessary because any version can be written by a different executor and references to RocksDB files that the current executor isn’t aware of. In extreme cases, we can even corrupt the state store. The chance that it happens is very low, but it’s not a good idea to leave unknown correctness risk unfixed. h2. Proposal sketch The proposed checkpoint structure will ensure a linear dependency: !image-2024-08-23-14-29-59-165.png! This stronger guarantee will be a good foundation for the problems above. The proposal’s basic idea is to guarantee linear lineage by not allowing checkpoint overwriting. All checkpoints are made with a new file name with a uniqueID. When starting any batch, a task precisely identifies which checkpoint to load with the uniqueID. When a new state store checkpoint is generated, the checkpoint path name includes a globally unique ID, so that it can never be updated. Here is an example: 20_d8e2ca47.zip 21_ef6618c2.delta 21_f4d05ac9.delta 22_4489578d.delta The name is stored in the commit log too. When the next batch is being executed, those unique IDs are passed to the executors, where they make sure they start to execute from this checkpoint. If the local state store isn’t in this state, it will download the checkpoint from the cloud. h1. Part II: Detailed design h2. Architecture Previously, a state store checkpoint was stored in a path that was only determined by checkpoint root path, operatorID, partitionID and batchID. When a stateful operator is executed, it is always able to construct the path with that information. It will download the checkpoint path from the path of the previous batchID and checkpoint to the path for this batchID. As mentioned earlier, this flexibility comes with a cost: there is no way to distinguish among checkpoints generated by different tasks rerunning for the same batchID. In this new design, every checkpoint will go to a globally unique file. The globally unique ID needs to be communicated between driver and the executors, and stored in commit logs. The basic workflow is shown as following: !image-2024-08-23-14-30-22-474.png! h3. File Structure Under Checkpoint Path Currently, a checkpoint is stored in path _.[changelog, zip]._ The path structure will look like following: __ 0 (operator ID) ++ | 0 (partitionID) +-+ | …… | 1 (partitionID) +-+ | |- default (storeName) | +-+ | | 20.zip | | 21.delta | | 22.delta | + 23.delta | 2 (partitionID) +--- …… The general structure will be intact, and we only change how files in the root path. For example, in the example above, we will only change file names in the file names in blue color. Inste
[jira] [Updated] (SPARK-49374) RocksDB State Store Checkpoint Structure V2
[ https://issues.apache.org/jira/browse/SPARK-49374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-49374: Issue Type: Epic (was: Task) > RocksDB State Store Checkpoint Structure V2 > --- > > Key: SPARK-49374 > URL: https://issues.apache.org/jira/browse/SPARK-49374 > Project: Spark > Issue Type: Epic > Components: Structured Streaming >Affects Versions: 4.0.0 >Reporter: Siying Dong >Priority: Major > Attachments: image-2024-08-23-14-28-56-418.png, > image-2024-08-23-14-29-19-443.png, image-2024-08-23-14-29-59-165.png, > image-2024-08-23-14-30-22-474.png > > > h2. Motivation > We expect the new checkpoint structure would benefit micro-batch mode by > establishing characteristics of linear dependency between batch versions. > Right now, tasks can be executed multiple times for the same batchID (for > speculative execution or rerunning in ForEachBatch), and there can be > multiple parallel lineages of state stores going on. For example, in one of > the issue with ForEachBatch showed this lineage, which triggered a RocksDB > file uploading bug: > !image-2024-08-23-14-28-56-418.png! > Although we fixed all the bugs, this complexity always makes the system prone > to bugs. This non-linear lineage also presents a correctness risk, when some > of the Version is changelogs. > !image-2024-08-23-14-29-19-443.png! > In the same example, suppose Version 17 is a Snapshot checkpoint and Version > 18 is a changelog checkpoint. When we need to recover from a checkpoint, we > need to apply Version 17 and Version 18 together. However, Version 18 isn’t > generated on top of Version 17. This can happen either because Version 18 is > generated by a different worker from Version 17, or the same worker abandoned > Version 17, replay this batch and generated Version 17’. In most cases, it is > accurate, but we have identified some edge cases where users might be > surprised by the results, and there may already be correctness issues. See an > example in the > [Appendix|https://docs.google.com/document/d/1pT5bwW325VndVH09aZd7TykKXbBP5uCrNIQOC0uDoLc/edit#heading=h.ehd1najwmeos]. > These correctness issues will become more prominent with transformWithState > due to the occurrence of partial updates. Note that the fixing state store > lineage only makes sure the state store is consistent, and doesn’t make sure > the state store is consistent with outputs to the sink. > Furthermore, the complex checkpoint version lineage makes it hard to reduce > overhead for old version cleanup. Even a long running state store has to read > metadata for all previous versions and list all files to do any cleanup > safely, which is expensive. It is necessary because any version can be > written by a different executor and references to RocksDB files that the > current executor isn’t aware of. In extreme cases, we can even corrupt the > state store. The chance that it happens is very low, but it’s not a good idea > to leave unknown correctness risk unfixed. > h2. Proposal sketch > The proposed checkpoint structure will ensure a linear dependency: > !image-2024-08-23-14-29-59-165.png! > This stronger guarantee will be a good foundation for the problems above. > > The proposal’s basic idea is to guarantee linear lineage by not allowing > checkpoint overwriting. All checkpoints are made with a new file name with a > uniqueID. When starting any batch, a task precisely identifies which > checkpoint to load with the uniqueID. > When a new state store checkpoint is generated, the checkpoint path name > includes a globally unique ID, so that it can never be updated. Here is an > example: > 20_d8e2ca47.zip > 21_ef6618c2.delta > 21_f4d05ac9.delta > 22_4489578d.delta > The name is stored in the commit log too. When the next batch is being > executed, those unique IDs are passed to the executors, where they make sure > they start to execute from this checkpoint. If the local state store isn’t in > this state, it will download the checkpoint from the cloud. > h1. Part II: Detailed design > h2. Architecture > Previously, a state store checkpoint was stored in a path that was only > determined by checkpoint root path, operatorID, partitionID and batchID. When > a stateful operator is executed, it is always able to construct the path with > that information. It will download the checkpoint path from the path of the > previous batchID and checkpoint to the path for this batchID. As mentioned > earlier, this flexibility comes with a cost: there is no way to distinguish > among checkpoints generated by different tasks rerunning for the same batchID. > > In this new design, every checkpoint will go to a globally unique file. The > globally unique ID needs to be communicated between driver and the executo
[jira] [Updated] (SPARK-49145) Improve readability of log4j console log output
[ https://issues.apache.org/jira/browse/SPARK-49145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu updated SPARK-49145: --- Description: Prior to this update, the OSS Spark logs were difficult to interpret. The logs followed a JSON output format which is not optimal for human consumption: {code:java} {"ts":"2024-07-26T18:45:17.712Z","level":"INFO","msg":"Running Spark version 4.0.0-SNAPSHOT","context":{"spark_version":"4.0.0-SNAPSHOT"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.715Z","level":"INFO","msg":"OS info Mac OS X, 14.4.1, aarch64","context":{"os_arch":"aarch64","os_name":"Mac OS X","os_version":"14.4.1"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.716Z","level":"INFO","msg":"Java version 17.0.11","context":{"java_version":"17.0.11"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.761Z","level":"WARN","msg":"Unable to load native-hadoop library for your platform... using builtin-java classes where applicable","logger":"NativeCodeLoader"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"==","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"No custom resources configured for spark.driver.","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"==","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.784Z","level":"INFO","msg":"Submitted application: Spark Pi","context":{"app_name":"Spark Pi"},"logger":"SparkContext"}...{"ts":"2024-07-26T18:45:18.036Z","level":"INFO","msg":"Start Jetty 0.0.0.0:4040 for SparkUI","context":{"host":"0.0.0.0","port":"4040","server_name":"SparkUI"},"logger":"JettyUtils"}{"ts":"2024-07-26T18:45:18.044Z","level":"INFO","msg":"jetty-11.0.20; built: 2024-01-29T21:04:22.394Z; git: 922f8dc188f7011e60d0361de585fd4ac4d63064; jvm 17.0.11+9-LTS","logger":"Server"}{"ts":"2024-07-26T18:45:18.054Z","level":"INFO","msg":"Started Server@22c75c01{STARTING}[11.0.20,sto=3] @1114ms","logger":"Server"} {code} This issue updates the default `log4j.properties.template` with the following improvements for console logging format: * Use PatternLayout for improved human readability * Color-code log levels to simplify logging output * Visually partition the threadName and contextInfo for easy interpretation was: Prior to this update, the OSS Spark logs were difficult to interpret. The logs followed a JSON output format which is not optimal for human consumption: {code:java} {"ts":"2024-07-26T18:45:17.712Z","level":"INFO","msg":"Running Spark version 4.0.0-SNAPSHOT","context":{"spark_version":"4.0.0-SNAPSHOT"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.715Z","level":"INFO","msg":"OS info Mac OS X, 14.4.1, aarch64","context":{"os_arch":"aarch64","os_name":"Mac OS X","os_version":"14.4.1"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.716Z","level":"INFO","msg":"Java version 17.0.11","context":{"java_version":"17.0.11"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.761Z","level":"WARN","msg":"Unable to load native-hadoop library for your platform... using builtin-java classes where applicable","logger":"NativeCodeLoader"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"==","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"No custom resources configured for spark.driver.","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"==","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.784Z","level":"INFO","msg":"Submitted application: Spark Pi","context":{"app_name":"Spark Pi"},"logger":"SparkContext"}...{"ts":"2024-07-26T18:45:18.036Z","level":"INFO","msg":"Start Jetty 0.0.0.0:4040 for SparkUI","context":{"host":"0.0.0.0","port":"4040","server_name":"SparkUI"},"logger":"JettyUtils"}{"ts":"2024-07-26T18:45:18.044Z","level":"INFO","msg":"jetty-11.0.20; built: 2024-01-29T21:04:22.394Z; git: 922f8dc188f7011e60d0361de585fd4ac4d63064; jvm 17.0.11+9-LTS","logger":"Server"}{"ts":"2024-07-26T18:45:18.054Z","level":"INFO","msg":"Started Server@22c75c01{STARTING}[11.0.20,sto=3] @1114ms","logger":"Server"} {code} This effort updates the default `log4j.properties.template` with the following improvements for console logging format: * Use PatternLayout for improved human readability * Color-code log levels to simplify logging output * Visually partition the threadName and contextInfo for easy interpretation > Improve readability of log4j console log output > --- > > Key: SPARK-49145 > URL: https://issues.apache.org/jira/browse/SPARK-49145 > Project: Spark > Issue Type: Task >
[jira] [Updated] (SPARK-49145) Improve readability of log4j console log output
[ https://issues.apache.org/jira/browse/SPARK-49145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu updated SPARK-49145: --- Description: Prior to this update, the OSS Spark logs were difficult to interpret. The logs followed a JSON output format which is not optimal for human consumption: {code:java} {"ts":"2024-07-26T18:45:17.712Z","level":"INFO","msg":"Running Spark version 4.0.0-SNAPSHOT","context":{"spark_version":"4.0.0-SNAPSHOT"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.715Z","level":"INFO","msg":"OS info Mac OS X, 14.4.1, aarch64","context":{"os_arch":"aarch64","os_name":"Mac OS X","os_version":"14.4.1"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.716Z","level":"INFO","msg":"Java version 17.0.11","context":{"java_version":"17.0.11"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.761Z","level":"WARN","msg":"Unable to load native-hadoop library for your platform... using builtin-java classes where applicable","logger":"NativeCodeLoader"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"==","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"No custom resources configured for spark.driver.","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"==","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.784Z","level":"INFO","msg":"Submitted application: Spark Pi","context":{"app_name":"Spark Pi"},"logger":"SparkContext"}...{"ts":"2024-07-26T18:45:18.036Z","level":"INFO","msg":"Start Jetty 0.0.0.0:4040 for SparkUI","context":{"host":"0.0.0.0","port":"4040","server_name":"SparkUI"},"logger":"JettyUtils"}{"ts":"2024-07-26T18:45:18.044Z","level":"INFO","msg":"jetty-11.0.20; built: 2024-01-29T21:04:22.394Z; git: 922f8dc188f7011e60d0361de585fd4ac4d63064; jvm 17.0.11+9-LTS","logger":"Server"}{"ts":"2024-07-26T18:45:18.054Z","level":"INFO","msg":"Started Server@22c75c01{STARTING}[11.0.20,sto=3] @1114ms","logger":"Server"} {code} This effort updates the default `log4j.properties.template` with the following improvements for console logging format: * Use PatternLayout for improved human readability * Color-code log levels to simplify logging output * Visually partition the threadName and contextInfo for easy interpretation was: Prior to this update, the OSS Spark logs were difficult to interpret. The logs followed a JSON output format which is not optimal for human consumption: {code:java} {"ts":"2024-07-26T18:45:17.712Z","level":"INFO","msg":"Running Spark version 4.0.0-SNAPSHOT","context":{"spark_version":"4.0.0-SNAPSHOT"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.715Z","level":"INFO","msg":"OS info Mac OS X, 14.4.1, aarch64","context":{"os_arch":"aarch64","os_name":"Mac OS X","os_version":"14.4.1"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.716Z","level":"INFO","msg":"Java version 17.0.11","context":{"java_version":"17.0.11"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.761Z","level":"WARN","msg":"Unable to load native-hadoop library for your platform... using builtin-java classes where applicable","logger":"NativeCodeLoader"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"==","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"No custom resources configured for spark.driver.","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"==","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.784Z","level":"INFO","msg":"Submitted application: Spark Pi","context":{"app_name":"Spark Pi"},"logger":"SparkContext"}...{"ts":"2024-07-26T18:45:18.036Z","level":"INFO","msg":"Start Jetty 0.0.0.0:4040 for SparkUI","context":{"host":"0.0.0.0","port":"4040","server_name":"SparkUI"},"logger":"JettyUtils"}{"ts":"2024-07-26T18:45:18.044Z","level":"INFO","msg":"jetty-11.0.20; built: 2024-01-29T21:04:22.394Z; git: 922f8dc188f7011e60d0361de585fd4ac4d63064; jvm 17.0.11+9-LTS","logger":"Server"}{"ts":"2024-07-26T18:45:18.054Z","level":"INFO","msg":"Started Server@22c75c01{STARTING}[11.0.20,sto=3] @1114ms","logger":"Server"} {code} This PR updates the default `log4j.properties.template` with the following improvements for console logging format: * Use PatternLayout for improved human readability * Color-code log levels to simplify logging output * Visually partition the threadName and contextInfo for easy interpretation > Improve readability of log4j console log output > --- > > Key: SPARK-49145 > URL: https://issues.apache.org/jira/browse/SPARK-49145 > Project: Spark > Issue Type: Task > Com
[jira] [Created] (SPARK-49145) Improve readability of log4j console log output
Amanda Liu created SPARK-49145: -- Summary: Improve readability of log4j console log output Key: SPARK-49145 URL: https://issues.apache.org/jira/browse/SPARK-49145 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu Prior to this update, the OSS Spark logs were difficult to interpret. The logs followed a JSON output format which is not optimal for human consumption: {code:java} {"ts":"2024-07-26T18:45:17.712Z","level":"INFO","msg":"Running Spark version 4.0.0-SNAPSHOT","context":{"spark_version":"4.0.0-SNAPSHOT"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.715Z","level":"INFO","msg":"OS info Mac OS X, 14.4.1, aarch64","context":{"os_arch":"aarch64","os_name":"Mac OS X","os_version":"14.4.1"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.716Z","level":"INFO","msg":"Java version 17.0.11","context":{"java_version":"17.0.11"},"logger":"SparkContext"}{"ts":"2024-07-26T18:45:17.761Z","level":"WARN","msg":"Unable to load native-hadoop library for your platform... using builtin-java classes where applicable","logger":"NativeCodeLoader"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"==","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"No custom resources configured for spark.driver.","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.783Z","level":"INFO","msg":"==","logger":"ResourceUtils"}{"ts":"2024-07-26T18:45:17.784Z","level":"INFO","msg":"Submitted application: Spark Pi","context":{"app_name":"Spark Pi"},"logger":"SparkContext"}...{"ts":"2024-07-26T18:45:18.036Z","level":"INFO","msg":"Start Jetty 0.0.0.0:4040 for SparkUI","context":{"host":"0.0.0.0","port":"4040","server_name":"SparkUI"},"logger":"JettyUtils"}{"ts":"2024-07-26T18:45:18.044Z","level":"INFO","msg":"jetty-11.0.20; built: 2024-01-29T21:04:22.394Z; git: 922f8dc188f7011e60d0361de585fd4ac4d63064; jvm 17.0.11+9-LTS","logger":"Server"}{"ts":"2024-07-26T18:45:18.054Z","level":"INFO","msg":"Started Server@22c75c01{STARTING}[11.0.20,sto=3] @1114ms","logger":"Server"} {code} This PR updates the default `log4j.properties.template` with the following improvements for console logging format: * Use PatternLayout for improved human readability * Color-code log levels to simplify logging output * Visually partition the threadName and contextInfo for easy interpretation -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-48993) Maximum number of maxRecursiveFieldDepth should be a spark conf
[ https://issues.apache.org/jira/browse/SPARK-48993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868457#comment-17868457 ] Wei Liu commented on SPARK-48993: - ill followup on this > Maximum number of maxRecursiveFieldDepth should be a spark conf > --- > > Key: SPARK-48993 > URL: https://issues.apache.org/jira/browse/SPARK-48993 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > [https://github.com/apache/spark/pull/38922#discussion_r1051294998] > > There is no reason to hard code a 10 here -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48993) Maximum number of maxRecursiveFieldDepth should be a spark conf
Wei Liu created SPARK-48993: --- Summary: Maximum number of maxRecursiveFieldDepth should be a spark conf Key: SPARK-48993 URL: https://issues.apache.org/jira/browse/SPARK-48993 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0.0 Reporter: Wei Liu [https://github.com/apache/spark/pull/38922#discussion_r1051294998] There is no reason to hard code a 10 here -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-48964) Fix the discrepancy between implementation, comment and documentation of option recursive.fields.max.depth in ProtoBuf connector
[ https://issues.apache.org/jira/browse/SPARK-48964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867877#comment-17867877 ] Yuchen Liu commented on SPARK-48964: Thank you [~dongjoon]. > Fix the discrepancy between implementation, comment and documentation of > option recursive.fields.max.depth in ProtoBuf connector > > > Key: SPARK-48964 > URL: https://issues.apache.org/jira/browse/SPARK-48964 > Project: Spark > Issue Type: Documentation > Components: Connect >Affects Versions: 3.5.0, 4.0.0, 3.5.1, 3.5.2, 3.5.3 >Reporter: Yuchen Liu >Priority: Major > > After the three PRs ([https://github.com/apache/spark/pull/38922,] > [https://github.com/apache/spark/pull/40011,] > [https://github.com/apache/spark/pull/40141]) working on the same option, > there are some legacy comments and documentation that has not been updated to > the latest implementation. This task should consolidate them. Below is the > correct description of the behavior. > The `recursive.fields.max.depth` parameter can be specified in the > from_protobuf options to control the maximum allowed recursion depth for a > field. Setting `recursive.fields.max.depth` to 1 drops all-recursive fields, > setting it to 2 allows it to be recursed once, and setting it to 3 allows it > to be recursed twice. Attempting to set the `recursive.fields.max.depth` to a > value greater than 10 is not allowed. If the `recursive.fields.max.depth` is > specified to a value smaller than 1, recursive fields are not permitted. The > default value of the option is -1. if a protobuf record has more depth for > recursive fields than the allowed value, it will be truncated and some fields > may be discarded. This check is based on the fully qualified field type. SQL > Schema for the protobuf message > {code:java} > message Person { string name = 1; Person bff = 2 }{code} > will vary based on the value of `recursive.fields.max.depth`. > {code:java} > 1: struct > 2: struct> > 3: struct>> ... > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48964) Fix the discrepancy between implementation, comment and documentation of option recursive.fields.max.depth in ProtoBuf connector
[ https://issues.apache.org/jira/browse/SPARK-48964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuchen Liu updated SPARK-48964: --- Description: After the three PRs ([https://github.com/apache/spark/pull/38922,] [https://github.com/apache/spark/pull/40011,] [https://github.com/apache/spark/pull/40141]) working on the same option, there are some legacy comments and documentation that has not been updated to the latest implementation. This task should consolidate them. Below is the correct description of the behavior. The `recursive.fields.max.depth` parameter can be specified in the from_protobuf options to control the maximum allowed recursion depth for a field. Setting `recursive.fields.max.depth` to 1 drops all-recursive fields, setting it to 2 allows it to be recursed once, and setting it to 3 allows it to be recursed twice. Attempting to set the `recursive.fields.max.depth` to a value greater than 10 is not allowed. If the `recursive.fields.max.depth` is specified to a value smaller than 1, recursive fields are not permitted. The default value of the option is -1. if a protobuf record has more depth for recursive fields than the allowed value, it will be truncated and some fields may be discarded. This check is based on the fully qualified field type. SQL Schema for the protobuf message {code:java} message Person { string name = 1; Person bff = 2 }{code} will vary based on the value of `recursive.fields.max.depth`. {code:java} 1: struct 2: struct> 3: struct>> ... {code} was: After the three PRs ([https://github.com/apache/spark/pull/38922,] [https://github.com/apache/spark/pull/40011,] [https://github.com/apache/spark/pull/40141]) working on the same option, there are some legacy comments and documentation that has not been updated to the latest implementation. This task should consolidate them. Below is the correct description of the behavior. The `recursive.fields.max.depth` parameter can be specified in the from_protobuf options to control the maximum allowed recursion depth for a field. Setting `recursive.fields.max.depth` to 1 drops all-recursive fields, setting it to 2 allows it to be recursed once, and setting it to 3 allows it to be recursed twice. Attempting to set the `recursive.fields.max.depth` to a value greater than 10 is not allowed. If the `recursive.fields.max.depth` is specified to a value smaller than 1, recursive fields are not permitted. The default value of the option is -1. if a protobuf record has more depth for recursive fields than the allowed value, it will be truncated and some fields may be discarded. This check is based on the fully qualified field type. SQL Schema for the protobuf message {code:java} message Person { string name = 1; Person bff = 2 }{code} will vary based on the value of `recursive.fields.max.depth`. {code:java} 1: struct 2: struct> 3: struct>> ... {code} > Fix the discrepancy between implementation, comment and documentation of > option recursive.fields.max.depth in ProtoBuf connector > > > Key: SPARK-48964 > URL: https://issues.apache.org/jira/browse/SPARK-48964 > Project: Spark > Issue Type: Documentation > Components: Connect >Affects Versions: 3.5.0, 4.0.0, 3.5.1, 3.5.2, 3.5.3 >Reporter: Yuchen Liu >Priority: Major > > After the three PRs ([https://github.com/apache/spark/pull/38922,] > [https://github.com/apache/spark/pull/40011,] > [https://github.com/apache/spark/pull/40141]) working on the same option, > there are some legacy comments and documentation that has not been updated to > the latest implementation. This task should consolidate them. Below is the > correct description of the behavior. > The `recursive.fields.max.depth` parameter can be specified in the > from_protobuf options to control the maximum allowed recursion depth for a > field. Setting `recursive.fields.max.depth` to 1 drops all-recursive fields, > setting it to 2 allows it to be recursed once, and setting it to 3 allows it > to be recursed twice. Attempting to set the `recursive.fields.max.depth` to a > value greater than 10 is not allowed. If the `recursive.fields.max.depth` is > specified to a value smaller than 1, recursive fields are not permitted. The > default value of the option is -1. if a protobuf record has more depth for > recursive fields than the allowed value, it will be truncated and some fields > may be discarded. This check is based on the fully qualified field type. SQL > Schema for the protobuf message > {code:java} > message Person { string name = 1; Person bff = 2 }{code} > will vary based on the value of `recursive.fields.max.depth`. > {code:java} > 1: struct > 2: struct> > 3: struct>> ... > {code} > -- This me
[jira] [Created] (SPARK-48964) Fix the discrepancy between implementation, comment and documentation of option recursive.fields.max.depth in ProtoBuf connector
Yuchen Liu created SPARK-48964: -- Summary: Fix the discrepancy between implementation, comment and documentation of option recursive.fields.max.depth in ProtoBuf connector Key: SPARK-48964 URL: https://issues.apache.org/jira/browse/SPARK-48964 Project: Spark Issue Type: Documentation Components: Connect Affects Versions: 3.5.1, 3.5.0, 4.0.0, 3.5.2, 3.5.3 Reporter: Yuchen Liu Fix For: 4.0.0, 3.5.2, 3.5.3, 3.5.1, 3.5.0 After the three PRs ([https://github.com/apache/spark/pull/38922,] [https://github.com/apache/spark/pull/40011,] [https://github.com/apache/spark/pull/40141]) working on the same option, there are some legacy comments and documentation that has not been updated to the latest implementation. This task should consolidate them. Below is the correct description of the behavior. The `recursive.fields.max.depth` parameter can be specified in the from_protobuf options to control the maximum allowed recursion depth for a field. Setting `recursive.fields.max.depth` to 1 drops all-recursive fields, setting it to 2 allows it to be recursed once, and setting it to 3 allows it to be recursed twice. Attempting to set the `recursive.fields.max.depth` to a value greater than 10 is not allowed. If the `recursive.fields.max.depth` is specified to a value smaller than 1, recursive fields are not permitted. The default value of the option is -1. if a protobuf record has more depth for recursive fields than the allowed value, it will be truncated and some fields may be discarded. This check is based on the fully qualified field type. SQL Schema for the protobuf message {code:java} message Person { string name = 1; Person bff = 2 }{code} will vary based on the value of `recursive.fields.max.depth`. {code:java} 1: struct 2: struct> 3: struct>> ... {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48939) Support recursive reference of Avro schema
[ https://issues.apache.org/jira/browse/SPARK-48939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuchen Liu updated SPARK-48939: --- Description: Recursive reference denotes the case that the type of a field can be defined before in the parent nodes. A simple example is: {code:java} { "type": "record", "name": "LongList", "fields" : [ {"name": "value", "type": "long"}, {"name": "next", "type": ["null", "LongList"]} ] } {code} This is written in Avro Schema DSL and represents a linked list data structure. Spark currently will throw an error on this schema. Many users used schema like this, so we should support it. was: We should support reading Avro message with recursive reference in schema. Recursive reference denotes the case that the type of a field can be defined before in the parent nodes. A simple example is: {code:java} { "type": "record", "name": "LongList", "fields" : [ {"name": "value", "type": "long"}, {"name": "next", "type": ["null", "LongList"]} ] } {code} This is written in Avro Schema DSL and represents a linked list data structure. > Support recursive reference of Avro schema > -- > > Key: SPARK-48939 > URL: https://issues.apache.org/jira/browse/SPARK-48939 > Project: Spark > Issue Type: New Feature > Components: Connect >Affects Versions: 4.0.0 >Reporter: Yuchen Liu >Priority: Major > > Recursive reference denotes the case that the type of a field can be defined > before in the parent nodes. A simple example is: > {code:java} > { > "type": "record", > "name": "LongList", > "fields" : [ > {"name": "value", "type": "long"}, > {"name": "next", "type": ["null", "LongList"]} > ] > } > {code} > This is written in Avro Schema DSL and represents a linked list data > structure. Spark currently will throw an error on this schema. Many users > used schema like this, so we should support it. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48939) Support recursive reference of Avro schema
Yuchen Liu created SPARK-48939: -- Summary: Support recursive reference of Avro schema Key: SPARK-48939 URL: https://issues.apache.org/jira/browse/SPARK-48939 Project: Spark Issue Type: New Feature Components: Connect Affects Versions: 4.0.0 Reporter: Yuchen Liu We should support reading Avro message with recursive reference in schema. Recursive reference denotes the case that the type of a field can be defined before in the parent nodes. A simple example is: {code:java} { "type": "record", "name": "LongList", "fields" : [ {"name": "value", "type": "long"}, {"name": "next", "type": ["null", "LongList"]} ] } {code} This is written in Avro Schema DSL and represents a linked list data structure. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48890) Add Streaming related fields to log4j ThreadContext
Wei Liu created SPARK-48890: --- Summary: Add Streaming related fields to log4j ThreadContext Key: SPARK-48890 URL: https://issues.apache.org/jira/browse/SPARK-48890 Project: Spark Issue Type: Sub-task Components: Spark Core, SS Affects Versions: 4.0.0 Reporter: Wei Liu There are some special informations needed for structured streaming queries. Specifically, each query has a query_id and run_id. Also if using MicrobatchExecution (default), there is a batch_id. A (query_id, run_id, batch_id) identifies the microbatch the streaming query runs. Adding these field to a threadContext would help especially when there are multiple queries running. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48850) Add documentation for new options added to State Data Source
[ https://issues.apache.org/jira/browse/SPARK-48850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuchen Liu updated SPARK-48850: --- Epic Link: SPARK-48588 Description: In [https://github.com/apache/spark/pull/46944] and [https://github.com/apache/spark/pull/47188], we introduced some new options to the State Data Source. This task aims to explain these new features in the documentation. (was: In https://issues.apache.org/jira/browse/SPARK-48589, we introduce some new options in the State Data Source. This task aims to explain these new features in the documentation.) Priority: Major (was: Minor) Summary: Add documentation for new options added to State Data Source (was: Add documentation for snapshot related options in State Data Source) > Add documentation for new options added to State Data Source > > > Key: SPARK-48850 > URL: https://issues.apache.org/jira/browse/SPARK-48850 > Project: Spark > Issue Type: Documentation > Components: Structured Streaming >Affects Versions: 4.0.0 >Reporter: Yuchen Liu >Priority: Major > Labels: pull-request-available > > In [https://github.com/apache/spark/pull/46944] and > [https://github.com/apache/spark/pull/47188], we introduced some new options > to the State Data Source. This task aims to explain these new features in the > documentation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48859) Add documentation for change feed related options in State Data Source
Yuchen Liu created SPARK-48859: -- Summary: Add documentation for change feed related options in State Data Source Key: SPARK-48859 URL: https://issues.apache.org/jira/browse/SPARK-48859 Project: Spark Issue Type: Documentation Components: SQL, Structured Streaming Affects Versions: 4.0.0 Reporter: Yuchen Liu In this PR: [https://github.com/apache/spark/pull/47188], we added some options which are used to read change feed of state store. This task is to reflect the latest changes in the documentation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48850) Add documentation for snapshot related options in State Data Source
[ https://issues.apache.org/jira/browse/SPARK-48850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuchen Liu updated SPARK-48850: --- Description: In https://issues.apache.org/jira/browse/SPARK-48589, we introduce some new options in the State Data Source. This task aims to explain these new features in the documentation. (was: In https://issues.apache.org/jira/browse/SPARK-48589, we introduce some new options in the State Data Source. This task aims to introduce these new features in the documentation.) > Add documentation for snapshot related options in State Data Source > --- > > Key: SPARK-48850 > URL: https://issues.apache.org/jira/browse/SPARK-48850 > Project: Spark > Issue Type: Documentation > Components: Structured Streaming >Affects Versions: 4.0.0 >Reporter: Yuchen Liu >Priority: Minor > > In https://issues.apache.org/jira/browse/SPARK-48589, we introduce some new > options in the State Data Source. This task aims to explain these new > features in the documentation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48850) Add documentation for snapshot related options in State Data Source
Yuchen Liu created SPARK-48850: -- Summary: Add documentation for snapshot related options in State Data Source Key: SPARK-48850 URL: https://issues.apache.org/jira/browse/SPARK-48850 Project: Spark Issue Type: Documentation Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Yuchen Liu In https://issues.apache.org/jira/browse/SPARK-48589, we introduce some new options in the State Data Source. This task aims to introduce these new features in the documentation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48800) Deflake ClientStreamingQuerySuite
Wei Liu created SPARK-48800: --- Summary: Deflake ClientStreamingQuerySuite Key: SPARK-48800 URL: https://issues.apache.org/jira/browse/SPARK-48800 Project: Spark Issue Type: New Feature Components: Connect, SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48628) Add task peak on/off heap execution memory metrics
[ https://issues.apache.org/jira/browse/SPARK-48628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-48628: - Description: Currently there is no task on/off heap execution memory metrics. There is a [peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114] metrics, however, the semantic is a confusing: it only cover the execution memory used by shuffle/join/aggregate/sort, which is accumulated in specific operators. We can easily maintain the whole task-level peak memory in TaskMemoryManager, assuming *acquireExecutionMemory* is the only one narrow waist for acquiring execution memory. Also it's nice to cleanup/deprecate that poorly-named `peakExecutionMemory`. Creating two followup sub tickets: * https://issues.apache.org/jira/browse/SPARK-48788 :accumulate task metrics in stage, and display in Spark UI * https://issues.apache.org/jira/browse/SPARK-48789 : deprecate `peakExecutionMemory` once we have replacement for it. was: Currently there is no task on/off heap execution memory metrics. There is a [peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114] metrics, however, the semantic is a bit confusing: it only cover the execution memory used by shuffle/join/aggregate/sort, which is accumulated in specific operators. We can easily maintain the whole task-level peak memory in TaskMemoryManager, assuming *acquireExecutionMemory* is the only one narrow waist for acquiring execution memory. > Add task peak on/off heap execution memory metrics > -- > > Key: SPARK-48628 > URL: https://issues.apache.org/jira/browse/SPARK-48628 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 4.0.0 >Reporter: Ziqi Liu >Priority: Major > > Currently there is no task on/off heap execution memory metrics. There is a > [peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114] > metrics, however, the semantic is a confusing: it only cover the execution > memory used by shuffle/join/aggregate/sort, which is accumulated in specific > operators. > > We can easily maintain the whole task-level peak memory in TaskMemoryManager, > assuming *acquireExecutionMemory* is the only one narrow waist for acquiring > execution memory. > > Also it's nice to cleanup/deprecate that poorly-named `peakExecutionMemory`. > > Creating two followup sub tickets: > * https://issues.apache.org/jira/browse/SPARK-48788 :accumulate task metrics > in stage, and display in Spark UI > * https://issues.apache.org/jira/browse/SPARK-48789 : deprecate > `peakExecutionMemory` once we have replacement for it. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48788) Accumulate task-level execution memory in stage and display in Spark UI
Ziqi Liu created SPARK-48788: Summary: Accumulate task-level execution memory in stage and display in Spark UI Key: SPARK-48788 URL: https://issues.apache.org/jira/browse/SPARK-48788 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 4.0.0 Reporter: Ziqi Liu As a follow for https://issues.apache.org/jira/browse/SPARK-48628, after we have task-level onheap/offheap execution memory metrics, we should accumulate them in stage and make them available in Spark UI. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48772) State Data Source Read Change Feed
[ https://issues.apache.org/jira/browse/SPARK-48772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuchen Liu updated SPARK-48772: --- Description: The current state reader can only return the entire state at a specific version. If an error occurs related to state, knowing the change of state across versions to find out at which version state starts to go wrong is important for debugging purposes. This PR adds ability of showing the evolution of state as Change Data Capture (CDC) format to state data source. An example usage: {code:java} .format("statestore") .option("readChangeFeed", true) .option("changeStartBatchId", 5) #required .option("changeEndBatchId", 10) #not required, default: latest batch Id available {code} was: The current state reader can only return the entire state at a specific version. If an error occurs related to state, knowing the change of state across versions to find out at which version state starts to go wrong is important for debugging purposes. This adds ability of showing the evolution of state as Change Data Capture (CDC) format to state data source. An example usage: {code:java} .format("statestore") .option("readChangeFeed", true) .option("changeStartBatchId", 5) #required .option("changeEndBatchId", 10) #not required, default: latest batch Id available {code} > State Data Source Read Change Feed > -- > > Key: SPARK-48772 > URL: https://issues.apache.org/jira/browse/SPARK-48772 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 4.0.0 >Reporter: Yuchen Liu >Priority: Major > > The current state reader can only return the entire state at a specific > version. If an error occurs related to state, knowing the change of state > across versions to find out at which version state starts to go wrong is > important for debugging purposes. This PR adds ability of showing the > evolution of state as Change Data Capture (CDC) format to state data source. > An example usage: > {code:java} > .format("statestore") > .option("readChangeFeed", true) > .option("changeStartBatchId", 5) #required > .option("changeEndBatchId", 10) #not required, default: latest batch Id > available > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48772) State Data Source Read Change Feed
[ https://issues.apache.org/jira/browse/SPARK-48772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuchen Liu updated SPARK-48772: --- Description: The current state reader can only return the entire state at a specific version. If an error occurs related to state, knowing the change of state across versions to find out at which version state starts to go wrong is important for debugging purposes. This adds ability of showing the evolution of state as Change Data Capture (CDC) format to state data source. An example usage: {code:java} .format("statestore") .option("readChangeFeed", true) .option("changeStartBatchId", 5) #required .option("changeEndBatchId", 10) #not required, default: latest batch Id available {code} was: The current state reader can only return the entire state at a specific version. If an error occurs related to state, knowing the change of state across versions to find out at which version state starts to go wrong is important for debugging purposes. This adds ability of showing the evolution of state as Change Data Capture (CDC) format to state data source. An example usage: {code:java} .format("statestore") .option("readChangeFeed", true) .option("changeStartBatchId", 5) #required .option("changeEndBatchId", 10) #not required, default: latest batch Id available {code} > State Data Source Read Change Feed > -- > > Key: SPARK-48772 > URL: https://issues.apache.org/jira/browse/SPARK-48772 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 4.0.0 >Reporter: Yuchen Liu >Priority: Major > > The current state reader can only return the entire state at a specific > version. If an error occurs related to state, knowing the change of state > across versions to find out at which version state starts to go wrong is > important for debugging purposes. This adds ability of showing the evolution > of state as Change Data Capture (CDC) format to state data source. > An example usage: > {code:java} > .format("statestore") > .option("readChangeFeed", true) > .option("changeStartBatchId", 5) #required > .option("changeEndBatchId", 10) #not required, default: latest batch Id > available > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48772) State Data Source Read Change Feed
Yuchen Liu created SPARK-48772: -- Summary: State Data Source Read Change Feed Key: SPARK-48772 URL: https://issues.apache.org/jira/browse/SPARK-48772 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Yuchen Liu The current state reader can only return the entire state at a specific version. If an error occurs related to state, knowing the change of state across versions to find out at which version state starts to go wrong is important for debugging purposes. This adds ability of showing the evolution of state as Change Data Capture (CDC) format to state data source. An example usage: {code:java} .format("statestore") .option("readChangeFeed", true) .option("changeStartBatchId", 5) #required .option("changeEndBatchId", 10) #not required, default: latest batch Id available {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48759) Add migration doc for CREATE TABLE AS SELECT behavior change behavior change since Spark 3.4
[ https://issues.apache.org/jira/browse/SPARK-48759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu updated SPARK-48759: --- Summary: Add migration doc for CREATE TABLE AS SELECT behavior change behavior change since Spark 3.4 (was: Add migration doc for CREATE TABLE behavior change behavior change since Spark 3.4) > Add migration doc for CREATE TABLE AS SELECT behavior change behavior change > since Spark 3.4 > > > Key: SPARK-48759 > URL: https://issues.apache.org/jira/browse/SPARK-48759 > Project: Spark > Issue Type: Task > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Amanda Liu >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48759) Add migration doc for CREATE TABLE behavior change behavior change since Spark 3.4
Amanda Liu created SPARK-48759: -- Summary: Add migration doc for CREATE TABLE behavior change behavior change since Spark 3.4 Key: SPARK-48759 URL: https://issues.apache.org/jira/browse/SPARK-48759 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 3.4.0 Reporter: Amanda Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48743) MergingSessionIterator should catch when getStruct returns null
Wei Liu created SPARK-48743: --- Summary: MergingSessionIterator should catch when getStruct returns null Key: SPARK-48743 URL: https://issues.apache.org/jira/browse/SPARK-48743 Project: Spark Issue Type: New Feature Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48740) Catch missing window specification error early
Amanda Liu created SPARK-48740: -- Summary: Catch missing window specification error early Key: SPARK-48740 URL: https://issues.apache.org/jira/browse/SPARK-48740 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu Before, aggregate queries containing a window function without a window specification (e.g. `PARTITION BY`) would return a non-descriptive internal error message: `org.apache.spark.sql.catalyst.analysis.UnresolvedException: [INTERNAL_ERROR] Invalid call to exprId on unresolved object SQLSTATE: XX000` This PR catches the user error early and returns a more accurate description of the issue: `Window specification is not defined in the WINDOW clause.` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48717) Python foreachBatch streaming query cannot be stopped gracefully after pin thread mode is enabled and is running spark queries
Wei Liu created SPARK-48717: --- Summary: Python foreachBatch streaming query cannot be stopped gracefully after pin thread mode is enabled and is running spark queries Key: SPARK-48717 URL: https://issues.apache.org/jira/browse/SPARK-48717 Project: Spark Issue Type: New Feature Components: PySpark, SS Affects Versions: 4.0.0 Reporter: Wei Liu Followup of https://issues.apache.org/jira/browse/SPARK-39218 It only considered the InterruptedException is thrown when time.sleep(10) is intercepted. But when a spark query is executing: {code:java} def func(batch_df, batch_id): batch_df.sparkSession.range(1000).write.saveAsTable("oops") print(batch_df.count()) {code} the actual error would be: {code:java} py4j.protocol.Py4JJavaError: An error occurred while calling o2141502.saveAsTable. : org.apache.spark.SparkException: Job aborted. ... at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:262) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:262) *Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1000)* at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1308) {code} We should also add consideration to this scenario -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48676) Structured Logging Framework Scala Style Migration [Part 2]
Amanda Liu created SPARK-48676: -- Summary: Structured Logging Framework Scala Style Migration [Part 2] Key: SPARK-48676 URL: https://issues.apache.org/jira/browse/SPARK-48676 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-48632) Remove unused LogKeys
[ https://issues.apache.org/jira/browse/SPARK-48632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu resolved SPARK-48632. Resolution: Not A Problem > Remove unused LogKeys > - > > Key: SPARK-48632 > URL: https://issues.apache.org/jira/browse/SPARK-48632 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 4.0.0 >Reporter: Amanda Liu >Priority: Major > Labels: pull-request-available > > Remove unused LogKey objects to clean up LogKey.scala -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48632) Remove unused LogKeys
Amanda Liu created SPARK-48632: -- Summary: Remove unused LogKeys Key: SPARK-48632 URL: https://issues.apache.org/jira/browse/SPARK-48632 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu Remove unused LogKey objects to clean up LogKey.scala -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48628) Add task peak on/off heap execution memory metrics
Ziqi Liu created SPARK-48628: Summary: Add task peak on/off heap execution memory metrics Key: SPARK-48628 URL: https://issues.apache.org/jira/browse/SPARK-48628 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 4.0.0 Reporter: Ziqi Liu Currently there is no task on/off heap execution memory metrics. There is a [peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114] metrics, however, the semantic is a bit confusing: it only cover the execution memory used by shuffle/join/aggregate/sort, which is accumulated in specific operators. We can easily maintain the whole task-level peak memory in TaskMemoryManager, assuming *acquireExecutionMemory* is the only one narrow waist for acquiring execution memory. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48623) Structured Logging Framework Scala Style Migration
Amanda Liu created SPARK-48623: -- Summary: Structured Logging Framework Scala Style Migration Key: SPARK-48623 URL: https://issues.apache.org/jira/browse/SPARK-48623 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48610) Remove ExplainUtils.processPlan synchronize
Ziqi Liu created SPARK-48610: Summary: Remove ExplainUtils.processPlan synchronize Key: SPARK-48610 URL: https://issues.apache.org/jira/browse/SPARK-48610 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0.0 Reporter: Ziqi Liu [https://github.com/apache/spark/pull/45282] introduced synchronize to `ExplainUtils.processPlan` to avoid race condition when multiple queries refers to same cached plan. The granularity of lock is too large. We can try to fix the root cause of this concurrency issue by refactoring the usage of mutable OP_ID_TAG, which is not a good practice in terms of immutable nature of SparkPlan. Instead, we can use an auxiliary id map, with object identity as the key. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48592) Add scala style check for logging message inline variables
Amanda Liu created SPARK-48592: -- Summary: Add scala style check for logging message inline variables Key: SPARK-48592 URL: https://issues.apache.org/jira/browse/SPARK-48592 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu Ban logging messages using logInfo, logWarning, logError containing variables without {{MDC}} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48589) Add option snapshotStartBatchId and snapshotPartitionId to state data source
Yuchen Liu created SPARK-48589: -- Summary: Add option snapshotStartBatchId and snapshotPartitionId to state data source Key: SPARK-48589 URL: https://issues.apache.org/jira/browse/SPARK-48589 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Yuchen Liu Define two new options, _snapshotStartBatchId_ and _snapshotPartitionId_, for the existing state reader. Both of them should be provided at the same time. # When there is no snapshot file at that batch (note there is an off-by-one issue between version and batch Id), throw an exception. # Otherwise, the reader should continue to rebuild the state by reading delta files only, and ignore all snapshot files afterwards. # Note that if a batchId option is already specified. That batchId is the ending batchId, we should then end at that batchId. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48588) Fine-grained State Data Source
Yuchen Liu created SPARK-48588: -- Summary: Fine-grained State Data Source Key: SPARK-48588 URL: https://issues.apache.org/jira/browse/SPARK-48588 Project: Spark Issue Type: Epic Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Yuchen Liu The current state reader API replays the state store rows from the latest snapshot and newer delta files if any. The issue with this mechanism is that sometimes, the snapshot files could be wrongly constructed, or user want to know the change of state across batches. We need to improve the State Reader so that it can handle a variety of fine-grained requirements. For example, reconstruct a state based on arbitrary snapshot; support CDC mode for state evolution. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48567) Pyspark StreamingQuery lastProgress and friend should return actual StreamingQueryProgress
Wei Liu created SPARK-48567: --- Summary: Pyspark StreamingQuery lastProgress and friend should return actual StreamingQueryProgress Key: SPARK-48567 URL: https://issues.apache.org/jira/browse/SPARK-48567 Project: Spark Issue Type: New Feature Components: PySpark, SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48542) Give snapshotStartBatchId and snapshotPartitionId to the state data source
Yuchen Liu created SPARK-48542: -- Summary: Give snapshotStartBatchId and snapshotPartitionId to the state data source Key: SPARK-48542 URL: https://issues.apache.org/jira/browse/SPARK-48542 Project: Spark Issue Type: New Feature Components: SQL, Structured Streaming Affects Versions: 4.0.0 Environment: This should work for both HDFS state store and RocksDB state store. Reporter: Yuchen Liu Right now, to read a version of the state data, the state source will try to find the first snapshot file before the given version and construct it using the delta files. In some debugging scenarios, users need more granular control on how to reconstruct the given state, for example they want to start from a specific snapshot instead of the closest one. One use case is to find whether a snapshot has been corrupted after committing. This task introduces two options {{snapshotStartBatchId}} and {{snapshotPartitionId}} to the state data source. By specifying them, users can control the starting batch id of the snapshot and partition id of the state. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org