[jira] [Commented] (SPARK-47952) Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn
[ https://issues.apache.org/jira/browse/SPARK-47952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848408#comment-17848408 ] Adam Binford commented on SPARK-47952: -- We have a similar setup, currently using Livy and Zeppelin as the means of externally interacting with our Yarn cluster. I'm very curious to see how are people are using Spark Connect with Yarn, as that doesn't seem to be a common use case yet. I've thought about doing something similar to what you are, creating an intermediary management server/API for launching Spark jobs, and then reporting back the URL to connect to. Still just a thought in my head though, so definitely would be interested in any tidbits or lessons learned you encounter on the way > Support retrieving the real SparkConnectService GRPC address and port > programmatically when running on Yarn > --- > > Key: SPARK-47952 > URL: https://issues.apache.org/jira/browse/SPARK-47952 > Project: Spark > Issue Type: Story > Components: Connect >Affects Versions: 4.0.0 >Reporter: TakawaAkirayo >Priority: Minor > Labels: pull-request-available > > 1.User Story: > Our data analysts and data scientists use Jupyter notebooks provisioned on > Kubernetes (k8s) with limited CPU/memory resources to run Spark-shell/pyspark > in the terminal via Yarn Client mode. However, Yarn Client mode consumes > significant local memory if the job is heavy, and the total resource pool of > k8s for notebooks is limited. To leverage the abundant resources of our > Hadoop cluster for scalability purposes, we aim to utilize SparkConnect. This > allows the driver on Yarn with SparkConnectService started and uses > SparkConnect client to connect to the remote driver. > To provide a seamless experience with one command startup for both server and > client, we've wrapped the following processes in one script: > 1) Start a local coordinator server (implemented by us, not in this PR) with > a specified port. > 2) Start SparkConnectServer by spark-submit via Yarn Cluster mode with > user-input Spark configurations and the local coordinator server's address > and port. Append an additional listener class in the configuration for > SparkConnectService callback with the actual address and port on Yarn to the > coordinator server. > 3) Wait for the coordinator server to receive the address callback from the > SparkConnectService on Yarn and export the real address. > 4) Start the client (pyspark --remote) with the remote address. > Finally, a remote SparkConnect Server is started on Yarn with a local > SparkConnect client connected. Users no longer need to start the server > beforehand and connect to the remote server after they manually explore the > address on Yarn. > 2.Problem statement of this change: > 1) The specified port for the SparkConnectService GRPC server might be > occupied on the node of the Hadoop Cluster. To increase the success rate of > startup, it needs to retry on conflicts rather than fail directly. > 2) Because the final binding port could be uncertain based on #1 and the > remote address is unpredictable on Yarn, we need to retrieve the address and > port programmatically and inject it automatically on the start of `pyspark > --remote`. The SparkConnectService needs to communicate its location back to > the launcher side. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48314) FileStreamSource shouldn't double cache files for availableNow
Adam Binford created SPARK-48314: Summary: FileStreamSource shouldn't double cache files for availableNow Key: SPARK-48314 URL: https://issues.apache.org/jira/browse/SPARK-48314 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.5.1 Reporter: Adam Binford FileStreamSource loads and saves all files at initialization for Trigger.AvailableNow. However files will also be cached in unreadFiles, which is a waste and causes issues identified in https://issues.apache.org/jira/browse/SPARK-44924 for streams that are reading more than 10k files per batch. We should always skip using the unreadFiles cache when using available now trigger, as there is no need for it anyway. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33133) History server fails when loading invalid rolling event logs
[ https://issues.apache.org/jira/browse/SPARK-33133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793270#comment-17793270 ] Adam Binford commented on SPARK-33133: -- I never got around to fixing the issue creating the event logs, only preventing the history server from crashing if it encounters a bad log > History server fails when loading invalid rolling event logs > > > Key: SPARK-33133 > URL: https://issues.apache.org/jira/browse/SPARK-33133 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Adam Binford >Priority: Major > > We have run into an issue where our history server fails to load new > applications, and when restarted, fails to load any applications at all. This > happens when it encounters invalid rolling event log files. We encounter this > with long running streaming applications. There seems to be two issues here > that lead to problems: > * It looks like our long running streaming applications event log directory > is being cleaned up. The next time the application logs event data, it > recreates the event log directory but without recreating the "appstatus" > file. I don't know the full extent of this behavior or if something "wrong" > is happening here. > * The history server then reads this new folder, and throws an exception > because the "appstatus" file doesn't exist in the rolling event log folder. > This exception breaks the entire listing process, so no new applications will > be read, and if restarted no applications at all will be read. > There seems like a couple ways to go about fixing this, and I'm curious > anyone's thoughts who knows more about how the history server works, > specifically with rolling event logs: > * Don't completely fail checking for new applications if one bad rolling > event log folder is encountered. This seems like the simplest fix and makes > sense to me, it already checks for a few other errors and ignores them. It > doesn't necessarily fix the underlying issue that leads to this happening > though. > * Figure out why the in progress event log folder is being deleted and make > sure that doesn't happen. Maybe this is supposed to happen? Or maybe we don't > want to delete the top level folder and only delete event log files within > the folders? Again I don't know the exact current behavior here with this. > * When writing new event log data, make sure the folder and appstatus file > exist every time, creating them again if not. > Here's the stack trace we encounter when this happens, from 3.0.1 with a > couple extra MRs backported that I hoped would fix the issue: > {{2020-10-13 12:10:31,751 ERROR history.FsHistoryProvider: Exception in > checking for event log updates2020-10-13 12:10:31,751 ERROR > history.FsHistoryProvider: Exception in checking for event log > updatesjava.lang.IllegalArgumentException: requirement failed: Log directory > must contain an appstatus file! at scala.Predef$.require(Predef.scala:281) at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files$lzycompute(EventLogFileReaders.scala:214) > at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files(EventLogFileReaders.scala:211) > at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles$lzycompute(EventLogFileReaders.scala:221) > at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles(EventLogFileReaders.scala:220) > at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.lastEventLogFile(EventLogFileReaders.scala:272) > at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.fileSizeForLastIndex(EventLogFileReaders.scala:240) > at > org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7(FsHistoryProvider.scala:524) > at > org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7$adapted(FsHistoryProvider.scala:466) > at > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:256) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255) at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249) at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at > scala.collection.TraversableLike.filter(TraversableLike.scala:347) at > scala.collection.TraversableLike.filter$(TraversableLike.scala:347) at > scala.collection.AbstractTraversable.filter(Traversable.scala:108) at >
[jira] [Commented] (SPARK-22876) spark.yarn.am.attemptFailuresValidityInterval does not work correctly
[ https://issues.apache.org/jira/browse/SPARK-22876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756323#comment-17756323 ] Adam Binford commented on SPARK-22876: -- Attempting to finally add support for this: https://github.com/apache/spark/pull/42570 > spark.yarn.am.attemptFailuresValidityInterval does not work correctly > - > > Key: SPARK-22876 > URL: https://issues.apache.org/jira/browse/SPARK-22876 > Project: Spark > Issue Type: Bug > Components: Spark Core, YARN >Affects Versions: 2.2.0 > Environment: hadoop version 2.7.3 >Reporter: Jinhan Zhong >Priority: Minor > Labels: bulk-closed > > I assume we can use spark.yarn.maxAppAttempts together with > spark.yarn.am.attemptFailuresValidityInterval to make a long running > application avoid stopping after acceptable number of failures. > But after testing, I found that the application always stops after failing n > times ( n is minimum value of spark.yarn.maxAppAttempts and > yarn.resourcemanager.am.max-attempts from client yarn-site.xml) > for example, following setup will allow the application master to fail 20 > times. > * spark.yarn.am.attemptFailuresValidityInterval=1s > * spark.yarn.maxAppAttempts=20 > * yarn client: yarn.resourcemanager.am.max-attempts=20 > * yarn resource manager: yarn.resourcemanager.am.max-attempts=3 > And after checking the source code, I found in source file > ApplicationMaster.scala > https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L293 > there's a ShutdownHook that checks the attempt id against the maxAppAttempts, > if attempt id >= maxAppAttempts, it will try to unregister the application > and the application will finish. > is this a expected design or a bug? -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-22876) spark.yarn.am.attemptFailuresValidityInterval does not work correctly
[ https://issues.apache.org/jira/browse/SPARK-22876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford reopened SPARK-22876: -- > spark.yarn.am.attemptFailuresValidityInterval does not work correctly > - > > Key: SPARK-22876 > URL: https://issues.apache.org/jira/browse/SPARK-22876 > Project: Spark > Issue Type: Bug > Components: Spark Core, YARN >Affects Versions: 2.2.0 > Environment: hadoop version 2.7.3 >Reporter: Jinhan Zhong >Priority: Minor > Labels: bulk-closed > > I assume we can use spark.yarn.maxAppAttempts together with > spark.yarn.am.attemptFailuresValidityInterval to make a long running > application avoid stopping after acceptable number of failures. > But after testing, I found that the application always stops after failing n > times ( n is minimum value of spark.yarn.maxAppAttempts and > yarn.resourcemanager.am.max-attempts from client yarn-site.xml) > for example, following setup will allow the application master to fail 20 > times. > * spark.yarn.am.attemptFailuresValidityInterval=1s > * spark.yarn.maxAppAttempts=20 > * yarn client: yarn.resourcemanager.am.max-attempts=20 > * yarn resource manager: yarn.resourcemanager.am.max-attempts=3 > And after checking the source code, I found in source file > ApplicationMaster.scala > https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L293 > there's a ShutdownHook that checks the attempt id against the maxAppAttempts, > if attempt id >= maxAppAttempts, it will try to unregister the application > and the application will finish. > is this a expected design or a bug? -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44639) Add option to use Java tmp dir for RocksDB state store
Adam Binford created SPARK-44639: Summary: Add option to use Java tmp dir for RocksDB state store Key: SPARK-44639 URL: https://issues.apache.org/jira/browse/SPARK-44639 Project: Spark Issue Type: Improvement Components: SQL, Structured Streaming Affects Versions: 3.4.1 Reporter: Adam Binford Currently local RocksDB state is stored in a local directory given by Utils.getLocalDir. On yarn this is a directory created inside the root application folder such as {{/tmp/nm-local-dir/usercache//appcache//}} The problem with this is that if an executor crashes for some reason (like OOM) and the shutdown hooks don't get run, this directory will stay around forever until the application finishes, which can cause jobs to slowly accumulate more and more temporary space until finally the node manager goes unhealthy. Because this data will only ever be accessed by the executor that created this directory, it would make sense to store the data inside the container folder, which will always get cleaned up by the node manager when that yarn container gets cleaned up. Yarn sets the `java.io.tmpdir` to a path inside this directory, such as {{/tmp/nm-local-dir/usercache//appcache///tmp/}} I'm not sure the behavior for other resource managers, so this could be an opt-in config that can be specified. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-43951) RocksDB state store can become corrupt on task retries
[ https://issues.apache.org/jira/browse/SPARK-43951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford resolved SPARK-43951. -- Resolution: Fixed > RocksDB state store can become corrupt on task retries > -- > > Key: SPARK-43951 > URL: https://issues.apache.org/jira/browse/SPARK-43951 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Adam Binford >Priority: Major > > A couple of our streaming jobs have failed since upgrading to Spark 3.4 with > an error such as: > org.rocksdb.RocksDBException: Mismatch in unique ID on table file ###. > Expected: [###,###} Actual\{###,###} in file /MANIFEST- > This is due to the change from > [https://github.com/facebook/rocksdb/commit/6de7081cf37169989e289a4801187097f0c50fae] > that enabled unique ID checks by default, and I finally tracked down the > exact sequence of steps that leads to this failure in the way RocksDB state > store is used. > # A task fails after uploading the checkpoint to HDFS. Lets say it uploaded > 11.zip to version 11 of the table, but the task failed before it could finish > after successfully uploading the checkpoint. > # The same task is retried and goes back to load version 10 of the table as > expected. > # Cleanup/maintenance is called for this partition, which looks in HDFS for > persisted versions and sees up through version 11 since that zip file was > successfully uploaded on the previous task. > # As part of resolving what SST files are part of each table version, > versionToRocksDBFiles.put(version, newResolvedFiles) is called for version 11 > with its SST files that were uploaded in the first failed task. > # The second attempt at the task commits and goes to sync its checkpoint to > HDFS. > # versionToRocksDBFiles contains the SST files to upload from step 4, and > these files are considered "the same" as what's in the local working dir > because the name and file size match. > # No SST files are uploaded because they matched above, but in reality the > unique ID inside the SST files is different (presumably this is just randomly > generated and inserted into each SST file?), it just doesn't affect the size. > # A new METADATA file is uploaded which has the new unique IDs listed inside. > # When version 11 of the table is read during the next batch, the unique IDs > in the METADATA file don't match the unique IDS in the SST files, which > causes the exception. > > This is basically a ticking time bomb for anyone using RocksDB. Thoughts on > possible fixes would be: > * Disable unique ID verification. I don't currently see a binding for this > in the RocksDB java wrapper, so that would probably have to be added first. > * Disable checking if files are already uploaded with the same size, and > just always upload SST files no matter what. > * Update the "same file" check to also be able to do some kind of CRC > comparison or something like that. > * Update the mainteance/cleanup to not update the versionToRocksDBFiles map. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43951) RocksDB state store can become corrupt on task retries
[ https://issues.apache.org/jira/browse/SPARK-43951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728736#comment-17728736 ] Adam Binford commented on SPARK-43951: -- Of course as soon as I finish figuring all this out I found https://github.com/apache/spark/pull/41089 > RocksDB state store can become corrupt on task retries > -- > > Key: SPARK-43951 > URL: https://issues.apache.org/jira/browse/SPARK-43951 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Adam Binford >Priority: Major > > A couple of our streaming jobs have failed since upgrading to Spark 3.4 with > an error such as: > org.rocksdb.RocksDBException: Mismatch in unique ID on table file ###. > Expected: [###,###} Actual\{###,###} in file /MANIFEST- > This is due to the change from > [https://github.com/facebook/rocksdb/commit/6de7081cf37169989e289a4801187097f0c50fae] > that enabled unique ID checks by default, and I finally tracked down the > exact sequence of steps that leads to this failure in the way RocksDB state > store is used. > # A task fails after uploading the checkpoint to HDFS. Lets say it uploaded > 11.zip to version 11 of the table, but the task failed before it could finish > after successfully uploading the checkpoint. > # The same task is retried and goes back to load version 10 of the table as > expected. > # Cleanup/maintenance is called for this partition, which looks in HDFS for > persisted versions and sees up through version 11 since that zip file was > successfully uploaded on the previous task. > # As part of resolving what SST files are part of each table version, > versionToRocksDBFiles.put(version, newResolvedFiles) is called for version 11 > with its SST files that were uploaded in the first failed task. > # The second attempt at the task commits and goes to sync its checkpoint to > HDFS. > # versionToRocksDBFiles contains the SST files to upload from step 4, and > these files are considered "the same" as what's in the local working dir > because the name and file size match. > # No SST files are uploaded because they matched above, but in reality the > unique ID inside the SST files is different (presumably this is just randomly > generated and inserted into each SST file?), it just doesn't affect the size. > # A new METADATA file is uploaded which has the new unique IDs listed inside. > # When version 11 of the table is read during the next batch, the unique IDs > in the METADATA file don't match the unique IDS in the SST files, which > causes the exception. > > This is basically a ticking time bomb for anyone using RocksDB. Thoughts on > possible fixes would be: > * Disable unique ID verification. I don't currently see a binding for this > in the RocksDB java wrapper, so that would probably have to be added first. > * Disable checking if files are already uploaded with the same size, and > just always upload SST files no matter what. > * Update the "same file" check to also be able to do some kind of CRC > comparison or something like that. > * Update the mainteance/cleanup to not update the versionToRocksDBFiles map. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43951) RocksDB state store can become corrupt on task retries
Adam Binford created SPARK-43951: Summary: RocksDB state store can become corrupt on task retries Key: SPARK-43951 URL: https://issues.apache.org/jira/browse/SPARK-43951 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.4.0 Reporter: Adam Binford A couple of our streaming jobs have failed since upgrading to Spark 3.4 with an error such as: org.rocksdb.RocksDBException: Mismatch in unique ID on table file ###. Expected: [###,###} Actual\{###,###} in file /MANIFEST- This is due to the change from [https://github.com/facebook/rocksdb/commit/6de7081cf37169989e289a4801187097f0c50fae] that enabled unique ID checks by default, and I finally tracked down the exact sequence of steps that leads to this failure in the way RocksDB state store is used. # A task fails after uploading the checkpoint to HDFS. Lets say it uploaded 11.zip to version 11 of the table, but the task failed before it could finish after successfully uploading the checkpoint. # The same task is retried and goes back to load version 10 of the table as expected. # Cleanup/maintenance is called for this partition, which looks in HDFS for persisted versions and sees up through version 11 since that zip file was successfully uploaded on the previous task. # As part of resolving what SST files are part of each table version, versionToRocksDBFiles.put(version, newResolvedFiles) is called for version 11 with its SST files that were uploaded in the first failed task. # The second attempt at the task commits and goes to sync its checkpoint to HDFS. # versionToRocksDBFiles contains the SST files to upload from step 4, and these files are considered "the same" as what's in the local working dir because the name and file size match. # No SST files are uploaded because they matched above, but in reality the unique ID inside the SST files is different (presumably this is just randomly generated and inserted into each SST file?), it just doesn't affect the size. # A new METADATA file is uploaded which has the new unique IDs listed inside. # When version 11 of the table is read during the next batch, the unique IDs in the METADATA file don't match the unique IDS in the SST files, which causes the exception. This is basically a ticking time bomb for anyone using RocksDB. Thoughts on possible fixes would be: * Disable unique ID verification. I don't currently see a binding for this in the RocksDB java wrapper, so that would probably have to be added first. * Disable checking if files are already uploaded with the same size, and just always upload SST files no matter what. * Update the "same file" check to also be able to do some kind of CRC comparison or something like that. * Update the mainteance/cleanup to not update the versionToRocksDBFiles map. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43802) unbase64 and unhex codegen are invalid with failOnError
[ https://issues.apache.org/jira/browse/SPARK-43802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford updated SPARK-43802: - Description: to_binary with hex and base64 generate invalid codegen: {{spark.range(5).selectExpr('to_binary(base64(cast(id as binary)), "BASE64")').show()}} results in {{Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 47, Column 1: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 47, Column 1: Unknown variable or type "BASE64"}} because this is the generated code: /* 107 */ if (!org.apache.spark.sql.catalyst.expressions.UnBase64.isValidBase64(project_value_1)) { /* 108 */ throw QueryExecutionErrors.invalidInputInConversionError( /* 109 */ ((org.apache.spark.sql.types.BinaryType$) references[1] /* to */), /* 110 */ project_value_1, /* 111 */ BASE64, /* 112 */ "try_to_binary"); /* 113 */ } was: to_binary with hex and base64 generate invalid codegen: {{spark.range(5).selectExpr('to_binary(base64(cast(id as binary)), "BASE64")').show()}} results in {{Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 47, Column 1: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 47, Column 1: Unknown variable or type "BASE64"}} because this is the generated code: {{ }}{{{}/* 107 */ if (!org.apache.spark.sql.catalyst.expressions.UnBase64.isValidBase64(project_value_1)) {{}}}{{{}/* 108 */ throw QueryExecutionErrors.invalidInputInConversionError({}}}{{{}/* 109 */ ((org.apache.spark.sql.types.BinaryType$) references[1] /* to */),{}}}{{{}/* 110 */ project_value_1,{}}}{{{}/* 111 */ BASE64,{}}}{{{}/* 112 */ "try_to_binary");{}}}{{{}/* 113 */ }{}}} > unbase64 and unhex codegen are invalid with failOnError > --- > > Key: SPARK-43802 > URL: https://issues.apache.org/jira/browse/SPARK-43802 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.4.0 >Reporter: Adam Binford >Priority: Major > > to_binary with hex and base64 generate invalid codegen: > {{spark.range(5).selectExpr('to_binary(base64(cast(id as binary)), > "BASE64")').show()}} > results in > {{Caused by: org.codehaus.commons.compiler.CompileException: File > 'generated.java', Line 47, Column 1: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 47, Column 1: Unknown variable or type "BASE64"}} > because this is the generated code: > /* 107 */ if > (!org.apache.spark.sql.catalyst.expressions.UnBase64.isValidBase64(project_value_1)) > { > /* 108 */ throw QueryExecutionErrors.invalidInputInConversionError( > /* 109 */ ((org.apache.spark.sql.types.BinaryType$) references[1] > /* to */), > /* 110 */ project_value_1, > /* 111 */ BASE64, > /* 112 */ "try_to_binary"); > /* 113 */ } -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43802) unbase64 and unhex codegen are invalid with failOnError
Adam Binford created SPARK-43802: Summary: unbase64 and unhex codegen are invalid with failOnError Key: SPARK-43802 URL: https://issues.apache.org/jira/browse/SPARK-43802 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 3.4.0 Reporter: Adam Binford to_binary with hex and base64 generate invalid codegen: {{spark.range(5).selectExpr('to_binary(base64(cast(id as binary)), "BASE64")').show()}} results in {{Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 47, Column 1: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 47, Column 1: Unknown variable or type "BASE64"}} because this is the generated code: {{ }}{{{}/* 107 */ if (!org.apache.spark.sql.catalyst.expressions.UnBase64.isValidBase64(project_value_1)) {{}}}{{{}/* 108 */ throw QueryExecutionErrors.invalidInputInConversionError({}}}{{{}/* 109 */ ((org.apache.spark.sql.types.BinaryType$) references[1] /* to */),{}}}{{{}/* 110 */ project_value_1,{}}}{{{}/* 111 */ BASE64,{}}}{{{}/* 112 */ "try_to_binary");{}}}{{{}/* 113 */ }{}}} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
[ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford resolved SPARK-43244. -- Resolution: Duplicate Will be addressed by https://issues.apache.org/jira/browse/SPARK-43311 > RocksDB State Store can accumulate unbounded native memory > -- > > Key: SPARK-43244 > URL: https://issues.apache.org/jira/browse/SPARK-43244 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > We noticed in one of our production stateful streaming jobs using RocksDB > that an executor with 20g of heap was using around 40g of resident memory. I > noticed a single RocksDB instance was using around 150 MiB of memory, and > only 5 MiB or so of this was from the write batch (which is now cleared after > committing). > After reading about RocksDB memory usage (this link was helpful: > [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) > I realized a lot of this was likely the "Index and Filters" memory usage. > This job is doing a streaming duplicate with a lot of unique keys so it makes > sense these block usages would be high. The problem is that, because as it is > now the underlying RocksDB instance stays open on an executor as long as that > executor is assigned that stateful partition (to be reused across batches). > So a single executor can accumulate a large number of RocksDB instances open > at once, each using a certain amount of native memory. In the worst case, a > single executor could need to keep open every single partitions' RocksDB > instance at once. > There are a couple ways you can control the amount of memory used, such as > limiting the max open files, or adding the option to use the block cache for > the indices and filters, but neither of these solve the underlying problem of > accumulating native memory from multiple partitions on an executor. > The real fix needs to be a mechanism and option to close the underlying > RocksDB instance at the end of each task, so you have the option to only ever > have one RocksDB instance open at a time, thus having predictable memory > usage no matter the size of your data or number of shuffle partitions. > We are running this on Spark 3.3, but just kicked off a test to see if things > are any different in Spark 3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
[ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717394#comment-17717394 ] Adam Binford edited comment on SPARK-43244 at 4/27/23 11:08 PM: Yeah I just saw that PR and was looking through that, I can see that making sense. The memory usage I was seeing that I couldn't do anything about was the filter/index cache. Trying to re-use the LRUCache across all DB instances is interesting and does seem like that would solve the issue. Do you know if that definitely actually reuses the same underlying cache, and isn't just a config that tells the native RocksDB instance how big to make the cache? Edit: does appear to be the actual cache, so I think that would solve my issues was (Author: kimahriman): Yeah I just saw that PR and was looking through that, I can see that making sense. The memory usage I was seeing that I couldn't do anything about was the filter/index cache. Trying to re-use the LRUCache across all DB instances is interesting and does seem like that would solve the issue. Do you know if that definitely actually reuses the same underlying cache, and isn't just a config that tells the native RocksDB instance how big to make the cache? > RocksDB State Store can accumulate unbounded native memory > -- > > Key: SPARK-43244 > URL: https://issues.apache.org/jira/browse/SPARK-43244 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > We noticed in one of our production stateful streaming jobs using RocksDB > that an executor with 20g of heap was using around 40g of resident memory. I > noticed a single RocksDB instance was using around 150 MiB of memory, and > only 5 MiB or so of this was from the write batch (which is now cleared after > committing). > After reading about RocksDB memory usage (this link was helpful: > [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) > I realized a lot of this was likely the "Index and Filters" memory usage. > This job is doing a streaming duplicate with a lot of unique keys so it makes > sense these block usages would be high. The problem is that, because as it is > now the underlying RocksDB instance stays open on an executor as long as that > executor is assigned that stateful partition (to be reused across batches). > So a single executor can accumulate a large number of RocksDB instances open > at once, each using a certain amount of native memory. In the worst case, a > single executor could need to keep open every single partitions' RocksDB > instance at once. > There are a couple ways you can control the amount of memory used, such as > limiting the max open files, or adding the option to use the block cache for > the indices and filters, but neither of these solve the underlying problem of > accumulating native memory from multiple partitions on an executor. > The real fix needs to be a mechanism and option to close the underlying > RocksDB instance at the end of each task, so you have the option to only ever > have one RocksDB instance open at a time, thus having predictable memory > usage no matter the size of your data or number of shuffle partitions. > We are running this on Spark 3.3, but just kicked off a test to see if things > are any different in Spark 3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
[ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717394#comment-17717394 ] Adam Binford commented on SPARK-43244: -- Yeah I just saw that PR and was looking through that, I can see that making sense. The memory usage I was seeing that I couldn't do anything about was the filter/index cache. Trying to re-use the LRUCache across all DB instances is interesting and does seem like that would solve the issue. Do you know if that definitely actually reuses the same underlying cache, and isn't just a config that tells the native RocksDB instance how big to make the cache? > RocksDB State Store can accumulate unbounded native memory > -- > > Key: SPARK-43244 > URL: https://issues.apache.org/jira/browse/SPARK-43244 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > We noticed in one of our production stateful streaming jobs using RocksDB > that an executor with 20g of heap was using around 40g of resident memory. I > noticed a single RocksDB instance was using around 150 MiB of memory, and > only 5 MiB or so of this was from the write batch (which is now cleared after > committing). > After reading about RocksDB memory usage (this link was helpful: > [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) > I realized a lot of this was likely the "Index and Filters" memory usage. > This job is doing a streaming duplicate with a lot of unique keys so it makes > sense these block usages would be high. The problem is that, because as it is > now the underlying RocksDB instance stays open on an executor as long as that > executor is assigned that stateful partition (to be reused across batches). > So a single executor can accumulate a large number of RocksDB instances open > at once, each using a certain amount of native memory. In the worst case, a > single executor could need to keep open every single partitions' RocksDB > instance at once. > There are a couple ways you can control the amount of memory used, such as > limiting the max open files, or adding the option to use the block cache for > the indices and filters, but neither of these solve the underlying problem of > accumulating native memory from multiple partitions on an executor. > The real fix needs to be a mechanism and option to close the underlying > RocksDB instance at the end of each task, so you have the option to only ever > have one RocksDB instance open at a time, thus having predictable memory > usage no matter the size of your data or number of shuffle partitions. > We are running this on Spark 3.3, but just kicked off a test to see if things > are any different in Spark 3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
[ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715525#comment-17715525 ] Adam Binford commented on SPARK-43244: -- Read through [https://github.com/apache/spark/pull/30344] for all the comments about close/abort. Adding a new method (like close) in addition to abort seems like the only real option here? > RocksDB State Store can accumulate unbounded native memory > -- > > Key: SPARK-43244 > URL: https://issues.apache.org/jira/browse/SPARK-43244 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > We noticed in one of our production stateful streaming jobs using RocksDB > that an executor with 20g of heap was using around 40g of resident memory. I > noticed a single RocksDB instance was using around 150 MiB of memory, and > only 5 MiB or so of this was from the write batch (which is now cleared after > committing). > After reading about RocksDB memory usage (this link was helpful: > [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) > I realized a lot of this was likely the "Index and Filters" memory usage. > This job is doing a streaming duplicate with a lot of unique keys so it makes > sense these block usages would be high. The problem is that, because as it is > now the underlying RocksDB instance stays open on an executor as long as that > executor is assigned that stateful partition (to be reused across batches). > So a single executor can accumulate a large number of RocksDB instances open > at once, each using a certain amount of native memory. In the worst case, a > single executor could need to keep open every single partitions' RocksDB > instance at once. > There are a couple ways you can control the amount of memory used, such as > limiting the max open files, or adding the option to use the block cache for > the indices and filters, but neither of these solve the underlying problem of > accumulating native memory from multiple partitions on an executor. > The real fix needs to be a mechanism and option to close the underlying > RocksDB instance at the end of each task, so you have the option to only ever > have one RocksDB instance open at a time, thus having predictable memory > usage no matter the size of your data or number of shuffle partitions. > We are running this on Spark 3.3, but just kicked off a test to see if things > are any different in Spark 3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
[ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford updated SPARK-43244: - Description: We noticed in one of our production stateful streaming jobs using RocksDB that an executor with 20g of heap was using around 40g of resident memory. I noticed a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or so of this was from the write batch (which is now cleared after committing). After reading about RocksDB memory usage (this link was helpful: [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) I realized a lot of this was likely the "Index and Filters" memory usage. This job is doing a streaming duplicate with a lot of unique keys so it makes sense these block usages would be high. The problem is that, because as it is now the underlying RocksDB instance stays open on an executor as long as that executor is assigned that stateful partition (to be reused across batches). So a single executor can accumulate a large number of RocksDB instances open at once, each using a certain amount of native memory. In the worst case, a single executor could need to keep open every single partitions' RocksDB instance at once. There are a couple ways you can control the amount of memory used, such as limiting the max open files, or adding the option to use the block cache for the indices and filters, but neither of these solve the underlying problem of accumulating native memory from multiple partitions on an executor. The real fix needs to be a mechanism and option to close the underlying RocksDB instance at the end of each task, so you have the option to only ever have one RocksDB instance open at a time, thus having predictable memory usage no matter the size of your data or number of shuffle partitions. We are running this on Spark 3.3, but just kicked off a test to see if things are any different in Spark 3.4. was: We noticed in one of our production stateful streaming jobs using RocksDB that an executor with 20g of heap was using around 40g of resident memory. I noticed a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or so of this was from the write batch (which is now cleared after committing). After reading about RocksDB memory usage (this link was helpful: [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) I realized a lot of this was likely the "Index and Filters" memory usage. This job is doing a streaming duplicate with a lot of unique values so it makes sense these block usages would be high. The problem is that, because as it is now the underlying RocksDB instance stays open on an executor as long as that executor is assigned that stateful partition (to be reused across batches). So a single executor can accumulate a large number of RocksDB instances open at once, each using a certain amount of native memory. In the worst case, a single executor could need to keep open every single partitions' RocksDB instance at once. There are a couple ways you can control the amount of memory used, such as limiting the max open files, or adding the option to use the block cache for the indices and filters, but neither of these solve the underlying problem of accumulating native memory from multiple partitions on an executor. The real fix needs to be a mechanism and option to close the underlying RocksDB instance at the end of each task, so you have the option to only ever have one RocksDB instance open at a time, thus having predictable memory usage no matter the size of your data or number of shuffle partitions. We are running this on Spark 3.3, but just kicked off a test to see if things are any different in Spark 3.4. > RocksDB State Store can accumulate unbounded native memory > -- > > Key: SPARK-43244 > URL: https://issues.apache.org/jira/browse/SPARK-43244 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > We noticed in one of our production stateful streaming jobs using RocksDB > that an executor with 20g of heap was using around 40g of resident memory. I > noticed a single RocksDB instance was using around 150 MiB of memory, and > only 5 MiB or so of this was from the write batch (which is now cleared after > committing). > After reading about RocksDB memory usage (this link was helpful: > [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) > I realized a lot of this was likely the "Index and Filters" memory usage. > This job is doing a streaming duplicate with a lot of unique keys so it makes > sense these block usages would be high. The
[jira] [Updated] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
[ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford updated SPARK-43244: - Description: We noticed in one of our production stateful streaming jobs using RocksDB that an executor with 20g of heap was using around 40g of resident memory. I noticed a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or so of this was from the write batch (which is now cleared after committing). After reading about RocksDB memory usage (this link was helpful: [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) I realized a lot of this was likely the "Index and Filters" memory usage. This job is doing a streaming duplicate with a lot of unique values so it makes sense these block usages would be high. The problem is that, because as it is now the underlying RocksDB instance stays open on an executor as long as that executor is assigned that stateful partition (to be reused across batches). So a single executor can accumulate a large number of RocksDB instances open at once, each using a certain amount of native memory. In the worst case, a single executor could need to keep open every single partitions' RocksDB instance at once. There are a couple ways you can control the amount of memory used, such as limiting the max open files, or adding the option to use the block cache for the indices and filters, but neither of these solve the underlying problem of accumulating native memory from multiple partitions on an executor. The real fix needs to be a mechanism and option to close the underlying RocksDB instance at the end of each task, so you have the option to only ever have one RocksDB instance open at a time, thus having predictable memory usage no matter the size of your data or number of shuffle partitions. We are running this on Spark 3.3, but just kicked off a test to see if things are any different in Spark 3.4. was: We noticed in one of our production stateful streaming jobs using RocksDB that an executor with 20g of heap was using around 40g of resident memory. I noticed a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or so of this was from the write batch (which is now cleared after committing). After reading about RocksDB memory usage ([this link was helpful]([https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) I realized a lot of this was likely the "Index and Filters" memory usage. This job is doing a streaming duplicate with a lot of unique values so it makes sense these block usages would be high. The problem is that, because as it is now the underlying RocksDB instance stays open on an executor as long as that executor is assigned that stateful partition (to be reused across batches). So a single executor can accumulate a large number of RocksDB instances open at once, each using a certain amount of native memory. In the worst case, a single executor could need to keep open every single partitions' RocksDB instance at once. There are a couple ways you can control the amount of memory used, such as limiting the max open files, or adding the option to use the block cache for the indices and filters, but neither of these solve the underlying problem of accumulating native memory from multiple partitions on an executor. The real fix needs to be a mechanism and option to close the underlying RocksDB instance at the end of each task, so you have the option to only ever have one RocksDB instance open at a time, thus having predictable memory usage no matter the size of your data or number of shuffle partitions. We are running this on Spark 3.3, but just kicked off a test to see if things are any different in Spark 3.4. > RocksDB State Store can accumulate unbounded native memory > -- > > Key: SPARK-43244 > URL: https://issues.apache.org/jira/browse/SPARK-43244 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > We noticed in one of our production stateful streaming jobs using RocksDB > that an executor with 20g of heap was using around 40g of resident memory. I > noticed a single RocksDB instance was using around 150 MiB of memory, and > only 5 MiB or so of this was from the write batch (which is now cleared after > committing). > After reading about RocksDB memory usage (this link was helpful: > [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) > I realized a lot of this was likely the "Index and Filters" memory usage. > This job is doing a streaming duplicate with a lot of unique values so it > makes sense these block usages would be high.
[jira] [Updated] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
[ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford updated SPARK-43244: - Description: We noticed in one of our production stateful streaming jobs using RocksDB that an executor with 20g of heap was using around 40g of resident memory. I noticed a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or so of this was from the write batch (which is now cleared after committing). After reading about RocksDB memory usage ([this link was helpful]([https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) I realized a lot of this was likely the "Index and Filters" memory usage. This job is doing a streaming duplicate with a lot of unique values so it makes sense these block usages would be high. The problem is that, because as it is now the underlying RocksDB instance stays open on an executor as long as that executor is assigned that stateful partition (to be reused across batches). So a single executor can accumulate a large number of RocksDB instances open at once, each using a certain amount of native memory. In the worst case, a single executor could need to keep open every single partitions' RocksDB instance at once. There are a couple ways you can control the amount of memory used, such as limiting the max open files, or adding the option to use the block cache for the indices and filters, but neither of these solve the underlying problem of accumulating native memory from multiple partitions on an executor. The real fix needs to be a mechanism and option to close the underlying RocksDB instance at the end of each task, so you have the option to only ever have one RocksDB instance open at a time, thus having predictable memory usage no matter the size of your data or number of shuffle partitions. We are running this on Spark 3.3, but just kicked off a test to see if things are any different in Spark 3.4. was: We noticed in one of our production stateful streaming jobs using RocksDB that an executor with 20g of heap was using around 40g of resident memory. I noticed a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or so of this was from the write batch (which is now cleared after committing). After reading about RocksDB memory usage ([this link was helpful|[https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]),] I realized a lot of this was likely the "Index and Filters" memory usage. This job is doing a streaming duplicate with a lot of unique values so it makes sense these block usages would be high. The problem is that, because as it is now the underlying RocksDB instance stays open on an executor as long as that executor is assigned that stateful partition (to be reused across batches). So a single executor can accumulate a large number of RocksDB instances open at once, each using a certain amount of native memory. In the worst case, a single executor could need to keep open every single partitions' RocksDB instance at once. There are a couple ways you can control the amount of memory used, such as limiting the max open files, or adding the option to use the block cache for the indices and filters, but neither of these solve the underlying problem of accumulating native memory from multiple partitions on an executor. The real fix needs to be a mechanism and option to close the underlying RocksDB instance at the end of each task, so you have the option to only ever have one RocksDB instance open at a time, thus having predictable memory usage no matter the size of your data or number of shuffle partitions. We are running this on Spark 3.3, but just kicked off a test to see if things are any different in Spark 3.4. > RocksDB State Store can accumulate unbounded native memory > -- > > Key: SPARK-43244 > URL: https://issues.apache.org/jira/browse/SPARK-43244 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > We noticed in one of our production stateful streaming jobs using RocksDB > that an executor with 20g of heap was using around 40g of resident memory. I > noticed a single RocksDB instance was using around 150 MiB of memory, and > only 5 MiB or so of this was from the write batch (which is now cleared after > committing). > After reading about RocksDB memory usage ([this link was > helpful]([https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) > I realized a lot of this was likely the "Index and Filters" memory usage. > This job is doing a streaming duplicate with a lot of unique values so it > makes sense these block usages would be
[jira] [Commented] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
[ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715495#comment-17715495 ] Adam Binford commented on SPARK-43244: -- [~kabhwan] curious your thoughts on this. Seems somewhat straightforward to have an option to close the RocksDB instance after a task completes. Things like maintenance wouldn't be affected because it doesn't use the RocksDB instance to clean up old files. During the next batch I think it can reuse a lot of the files that have already been downloaded, it will just need to re-create the underlying RocksDB instance. The main question is how to actually do the closing. It can't be done at the end of a commit because some things iterate over the items after committing. Abort is only called in certain situations. So the options either seem like # Do it via the abort mechanism but change abort to be called every time even for write stores. This seems like it would be fine for the two existing state stores, but could cause issues for any custom state stores people have? # Add a new function to the StateStore classes like "finished" or "complete" or something that says the task is done, do any cleanup you want to do that's not a full unload. > RocksDB State Store can accumulate unbounded native memory > -- > > Key: SPARK-43244 > URL: https://issues.apache.org/jira/browse/SPARK-43244 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > We noticed in one of our production stateful streaming jobs using RocksDB > that an executor with 20g of heap was using around 40g of resident memory. I > noticed a single RocksDB instance was using around 150 MiB of memory, and > only 5 MiB or so of this was from the write batch (which is now cleared after > committing). > After reading about RocksDB memory usage ([this link was > helpful|[https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]),] > I realized a lot of this was likely the "Index and Filters" memory usage. > This job is doing a streaming duplicate with a lot of unique values so it > makes sense these block usages would be high. The problem is that, because as > it is now the underlying RocksDB instance stays open on an executor as long > as that executor is assigned that stateful partition (to be reused across > batches). So a single executor can accumulate a large number of RocksDB > instances open at once, each using a certain amount of native memory. In the > worst case, a single executor could need to keep open every single > partitions' RocksDB instance at once. > There are a couple ways you can control the amount of memory used, such as > limiting the max open files, or adding the option to use the block cache for > the indices and filters, but neither of these solve the underlying problem of > accumulating native memory from multiple partitions on an executor. > The real fix needs to be a mechanism and option to close the underlying > RocksDB instance at the end of each task, so you have the option to only ever > have one RocksDB instance open at a time, thus having predictable memory > usage no matter the size of your data or number of shuffle partitions. > We are running this on Spark 3.3, but just kicked off a test to see if things > are any different in Spark 3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory
Adam Binford created SPARK-43244: Summary: RocksDB State Store can accumulate unbounded native memory Key: SPARK-43244 URL: https://issues.apache.org/jira/browse/SPARK-43244 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 3.3.2 Reporter: Adam Binford We noticed in one of our production stateful streaming jobs using RocksDB that an executor with 20g of heap was using around 40g of resident memory. I noticed a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or so of this was from the write batch (which is now cleared after committing). After reading about RocksDB memory usage ([this link was helpful|[https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]),] I realized a lot of this was likely the "Index and Filters" memory usage. This job is doing a streaming duplicate with a lot of unique values so it makes sense these block usages would be high. The problem is that, because as it is now the underlying RocksDB instance stays open on an executor as long as that executor is assigned that stateful partition (to be reused across batches). So a single executor can accumulate a large number of RocksDB instances open at once, each using a certain amount of native memory. In the worst case, a single executor could need to keep open every single partitions' RocksDB instance at once. There are a couple ways you can control the amount of memory used, such as limiting the max open files, or adding the option to use the block cache for the indices and filters, but neither of these solve the underlying problem of accumulating native memory from multiple partitions on an executor. The real fix needs to be a mechanism and option to close the underlying RocksDB instance at the end of each task, so you have the option to only ever have one RocksDB instance open at a time, thus having predictable memory usage no matter the size of your data or number of shuffle partitions. We are running this on Spark 3.3, but just kicked off a test to see if things are any different in Spark 3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-42492) Add new function filter_value
[ https://issues.apache.org/jira/browse/SPARK-42492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford updated SPARK-42492: - Description: Doing data validation in Spark can lead to a lot of extra evaluations of expressions. This is because conditionally evaluated expressions aren't candidates for subexpression elimination. For example a simple expression such as {{when(validate(col), col)}} to only keep col if it matches some condition, will lead to col being evaluated twice. And if call itself is made up of a series of expensive expressions itself, like regular expression checks, this can lead to a lot of wasted computation time. The initial attempt to resolve this was https://issues.apache.org/jira/browse/SPARK-35564, adding support for subexpression elimination to conditionally evaluated expressions. However I have not been able to get that merged, so this is an alternative (though I believe that is still useful on top of this). We can add a new higher order function "filter_value" that takes the column you want to validate as an argument, and then a function that runs a lambda expression returning a boolean on whether to keep that column or not. It would have the same semantics as the above when expression, except it would guarantee to only evaluate the initial column once. An alternative would be to implement a real definition for the NullIf expression, but that would only support exact equals checks and not any generic condition. was: Doing data validation in Spark can lead to a lot of extra evaluations of expressions. This is because conditionally evaluated expressions aren't candidates for subexpression elimination. For example a simple expression such as {{when(validate(col), col)}} to only keep col if it matches some condition, will lead to col being evaluated twice. And if call itself is made up of a series of expensive expressions itself, like regular expression checks, this can lead to a lot of wasted computation time. The initial attempt to resolve this was https://issues.apache.org/jira/browse/SPARK-35564, adding support for subexpression elimination to conditionally evaluated expressions. However I have not been able to get that merged, so this is an alternative (though I believe that is still useful on top of this). We can add a new lambda function "filter_value" that takes the column you want to validate as an argument, and then a function that runs a lambda expression returning a boolean on whether to keep that column or not. It would have the same semantics as the above when expression, except it would guarantee to only evaluate the initial column once. An alternative would be to implement a real definition for the NullIf expression, but that would only support exact equals checks and not any generic condition. > Add new function filter_value > - > > Key: SPARK-42492 > URL: https://issues.apache.org/jira/browse/SPARK-42492 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.3.2 >Reporter: Adam Binford >Priority: Major > > Doing data validation in Spark can lead to a lot of extra evaluations of > expressions. This is because conditionally evaluated expressions aren't > candidates for subexpression elimination. For example a simple expression > such as > {{when(validate(col), col)}} > to only keep col if it matches some condition, will lead to col being > evaluated twice. And if call itself is made up of a series of expensive > expressions itself, like regular expression checks, this can lead to a lot of > wasted computation time. > The initial attempt to resolve this was > https://issues.apache.org/jira/browse/SPARK-35564, adding support for > subexpression elimination to conditionally evaluated expressions. However I > have not been able to get that merged, so this is an alternative (though I > believe that is still useful on top of this). > We can add a new higher order function "filter_value" that takes the column > you want to validate as an argument, and then a function that runs a lambda > expression returning a boolean on whether to keep that column or not. It > would have the same semantics as the above when expression, except it would > guarantee to only evaluate the initial column once. > An alternative would be to implement a real definition for the NullIf > expression, but that would only support exact equals checks and not any > generic condition. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-42492) Add new function filter_value
Adam Binford created SPARK-42492: Summary: Add new function filter_value Key: SPARK-42492 URL: https://issues.apache.org/jira/browse/SPARK-42492 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 3.3.2 Reporter: Adam Binford Doing data validation in Spark can lead to a lot of extra evaluations of expressions. This is because conditionally evaluated expressions aren't candidates for subexpression elimination. For example a simple expression such as {{when(validate(col), col)}} to only keep col if it matches some condition, will lead to col being evaluated twice. And if call itself is made up of a series of expensive expressions itself, like regular expression checks, this can lead to a lot of wasted computation time. The initial attempt to resolve this was https://issues.apache.org/jira/browse/SPARK-35564, adding support for subexpression elimination to conditionally evaluated expressions. However I have not been able to get that merged, so this is an alternative (though I believe that is still useful on top of this). We can add a new lambda function "filter_value" that takes the column you want to validate as an argument, and then a function that runs a lambda expression returning a boolean on whether to keep that column or not. It would have the same semantics as the above when expression, except it would guarantee to only evaluate the initial column once. An alternative would be to implement a real definition for the NullIf expression, but that would only support exact equals checks and not any generic condition. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-42347) Arrow string and binary vectors only support 1 GiB
[ https://issues.apache.org/jira/browse/SPARK-42347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684212#comment-17684212 ] Adam Binford commented on SPARK-42347: -- [https://github.com/apache/spark/pull/39572] is a potential workaround to allow enabling the large variable width vectors when users hit this limit. > Arrow string and binary vectors only support 1 GiB > -- > > Key: SPARK-42347 > URL: https://issues.apache.org/jira/browse/SPARK-42347 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Adam Binford >Priority: Major > > Since Arrow 10.0.0, BaseVariableWidthVector (the parent for string and binary > vectors), only supports expanding up to 1 GiB through the safe interfaces, > which Spark uses, instead of 2 GiB previously. This is due to > [https://github.com/apache/arrow/pull/13815.] I added a comment in there but > haven't got any responses yet, will make an issue in Arrow as well. > Basically whenever you try to add data beyond 1 GiB, the vector will try to > double itself to the next power of two, which would be {{{}2147483648{}}}, > which is greater than {{Integer.MAX_VALUE}} which is {{{}2147483647{}}}, thus > throwing a {{{}OversizedAllocationException{}}}. > See [https://github.com/apache/spark/pull/39572#issuecomment-1383195213] and > the comment above for how I recreated to show this was now the case. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-42347) Arrow string and binary vectors only support 1 GiB
Adam Binford created SPARK-42347: Summary: Arrow string and binary vectors only support 1 GiB Key: SPARK-42347 URL: https://issues.apache.org/jira/browse/SPARK-42347 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.4.0 Reporter: Adam Binford Since Arrow 10.0.0, BaseVariableWidthVector (the parent for string and binary vectors), only supports expanding up to 1 GiB through the safe interfaces, which Spark uses, instead of 2 GiB previously. This is due to [https://github.com/apache/arrow/pull/13815.] I added a comment in there but haven't got any responses yet, will make an issue in Arrow as well. Basically whenever you try to add data beyond 1 GiB, the vector will try to double itself to the next power of two, which would be {{{}2147483648{}}}, which is greater than {{Integer.MAX_VALUE}} which is {{{}2147483647{}}}, thus throwing a {{{}OversizedAllocationException{}}}. See [https://github.com/apache/spark/pull/39572#issuecomment-1383195213] and the comment above for how I recreated to show this was now the case. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-39979) IndexOutOfBoundsException on groupby + apply pandas grouped map udf function
[ https://issues.apache.org/jira/browse/SPARK-39979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17676610#comment-17676610 ] Adam Binford commented on SPARK-39979: -- We have encountered this as well and I'm am working on a fix. This is a limitation of using the BaseVariableWidthVector based classes for string and binary and not the BaseLargeVariableWidthVector based types. A single string or binary vector cannot hold more than 2 GiB of data, because it stores offsets to each value and the offsets are stored as 4 byte integers. The large types use 8 bytes for the offset instead, thus removing the limitation. Options are: * Add a config you can set to use the large types instead of the normal types * Just use the large types for everything, with the expense of an additional 4 bytes per record in the vector for storing offsets (not sure if there are additional overheads) Let me know if there's any thoughts or opinions on which route to go. > IndexOutOfBoundsException on groupby + apply pandas grouped map udf function > > > Key: SPARK-39979 > URL: https://issues.apache.org/jira/browse/SPARK-39979 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.2.1 >Reporter: yaniv oren >Priority: Major > > I'm grouping on relatively small subset of groups with big size groups. > Working with pyarrow version 2.0.0, machines memory is {color:#44}64 > GiB.{color} > I'm getting the following error: > {code:java} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 387 > in stage 162.0 failed 4 times, most recent failure: Lost task 387.3 in stage > 162.0 (TID 29957) (ip-172-21-129-187.eu-west-1.compute.internal executor 71): > java.lang.IndexOutOfBoundsException: index: 2147483628, length: 36 (expected: > range(0, 2147483648)) > at org.apache.arrow.memory.ArrowBuf.checkIndex(ArrowBuf.java:699) > at org.apache.arrow.memory.ArrowBuf.setBytes(ArrowBuf.java:890) > at > org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1087) > at > org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:251) > at > org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:130) > at > org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:95) > at > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:92) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474) > at > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:103) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2031) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270) > {code} > Why do I hit this 2 GB limit? according SPARK-34588 this is supported, > perhaps related to SPARK-34020. > Please assist. > Note: > Is it related to the usage of BaseVariableWidthVector and not > BaseLargeVariableWidthVector? > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41339) RocksDB state store WriteBatch doesn't clean up native memory
[ https://issues.apache.org/jira/browse/SPARK-41339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford updated SPARK-41339: - Summary: RocksDB state store WriteBatch doesn't clean up native memory (was: RocksDB State Store WriteBatch doesn't cleanup native memory) > RocksDB state store WriteBatch doesn't clean up native memory > - > > Key: SPARK-41339 > URL: https://issues.apache.org/jira/browse/SPARK-41339 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 3.3.1 >Reporter: Adam Binford >Priority: Major > > The RocksDB state store uses a WriteBatch to hold updates that get written in > a single transaction to commit. Somewhat indirectly abort is called after a > successful task which calls writeBatch.clear(), but the data for a writeBatch > is stored in a std::string in the native code. Not sure why it's stored as a > string, but it is. [rocksdb/write_batch.h at main · facebook/rocksdb · > GitHub|https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_batch.h#L491] > writeBatch.clear simply calls rep_.clear() and rep._resize() > ([rocksdb/write_batch.cc at main · facebook/rocksdb · > GitHub|https://github.com/facebook/rocksdb/blob/main/db/write_batch.cc#L246-L247]), > neither of which actually releases the memory built up by a std::string > instance. The only way to actually release this memory is to delete the > WriteBatch object itself. > Currently, all memory taken by all write batches will remain until the > RocksDB state store instance is closed, which never happens during the normal > course of operation as all partitions remain loaded on an executor after a > task completes. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41340) RocksDB State Store WriteBatch doesn't cleanup native memory
[ https://issues.apache.org/jira/browse/SPARK-41340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641442#comment-17641442 ] Adam Binford commented on SPARK-41340: -- Got added twice > RocksDB State Store WriteBatch doesn't cleanup native memory > > > Key: SPARK-41340 > URL: https://issues.apache.org/jira/browse/SPARK-41340 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 3.3.1 >Reporter: Adam Binford >Priority: Major > > The RocksDB state store uses a WriteBatch to hold updates that get written in > a single transaction to commit. Somewhat indirectly abort is called after a > successful task which calls writeBatch.clear(), but the data for a writeBatch > is stored in a std::string in the native code. Not sure why it's stored as a > string, but it is. [rocksdb/write_batch.h at main · facebook/rocksdb · > GitHub|https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_batch.h#L491] > writeBatch.clear simply calls rep_.clear() and rep._resize() > ([rocksdb/write_batch.cc at main · facebook/rocksdb · > GitHub|https://github.com/facebook/rocksdb/blob/main/db/write_batch.cc#L246-L247]), > neither of which actually releases the memory built up by a std::string > instance. The only way to actually release this memory is to delete the > WriteBatch object itself. > Currently, all memory taken by all write batches will remain until the > RocksDB state store instance is closed, which never happens during the normal > course of operation as all partitions remain loaded on an executor after a > task completes. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-41340) RocksDB State Store WriteBatch doesn't cleanup native memory
[ https://issues.apache.org/jira/browse/SPARK-41340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford resolved SPARK-41340. -- Resolution: Duplicate > RocksDB State Store WriteBatch doesn't cleanup native memory > > > Key: SPARK-41340 > URL: https://issues.apache.org/jira/browse/SPARK-41340 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 3.3.1 >Reporter: Adam Binford >Priority: Major > > The RocksDB state store uses a WriteBatch to hold updates that get written in > a single transaction to commit. Somewhat indirectly abort is called after a > successful task which calls writeBatch.clear(), but the data for a writeBatch > is stored in a std::string in the native code. Not sure why it's stored as a > string, but it is. [rocksdb/write_batch.h at main · facebook/rocksdb · > GitHub|https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_batch.h#L491] > writeBatch.clear simply calls rep_.clear() and rep._resize() > ([rocksdb/write_batch.cc at main · facebook/rocksdb · > GitHub|https://github.com/facebook/rocksdb/blob/main/db/write_batch.cc#L246-L247]), > neither of which actually releases the memory built up by a std::string > instance. The only way to actually release this memory is to delete the > WriteBatch object itself. > Currently, all memory taken by all write batches will remain until the > RocksDB state store instance is closed, which never happens during the normal > course of operation as all partitions remain loaded on an executor after a > task completes. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41340) RocksDB State Store WriteBatch doesn't cleanup native memory
Adam Binford created SPARK-41340: Summary: RocksDB State Store WriteBatch doesn't cleanup native memory Key: SPARK-41340 URL: https://issues.apache.org/jira/browse/SPARK-41340 Project: Spark Issue Type: Bug Components: SQL, Structured Streaming Affects Versions: 3.3.1 Reporter: Adam Binford The RocksDB state store uses a WriteBatch to hold updates that get written in a single transaction to commit. Somewhat indirectly abort is called after a successful task which calls writeBatch.clear(), but the data for a writeBatch is stored in a std::string in the native code. Not sure why it's stored as a string, but it is. [rocksdb/write_batch.h at main · facebook/rocksdb · GitHub|https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_batch.h#L491] writeBatch.clear simply calls rep_.clear() and rep._resize() ([rocksdb/write_batch.cc at main · facebook/rocksdb · GitHub|https://github.com/facebook/rocksdb/blob/main/db/write_batch.cc#L246-L247]), neither of which actually releases the memory built up by a std::string instance. The only way to actually release this memory is to delete the WriteBatch object itself. Currently, all memory taken by all write batches will remain until the RocksDB state store instance is closed, which never happens during the normal course of operation as all partitions remain loaded on an executor after a task completes. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41339) RocksDB State Store WriteBatch doesn't cleanup native memory
Adam Binford created SPARK-41339: Summary: RocksDB State Store WriteBatch doesn't cleanup native memory Key: SPARK-41339 URL: https://issues.apache.org/jira/browse/SPARK-41339 Project: Spark Issue Type: Bug Components: SQL, Structured Streaming Affects Versions: 3.3.1 Reporter: Adam Binford The RocksDB state store uses a WriteBatch to hold updates that get written in a single transaction to commit. Somewhat indirectly abort is called after a successful task which calls writeBatch.clear(), but the data for a writeBatch is stored in a std::string in the native code. Not sure why it's stored as a string, but it is. [rocksdb/write_batch.h at main · facebook/rocksdb · GitHub|https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_batch.h#L491] writeBatch.clear simply calls rep_.clear() and rep._resize() ([rocksdb/write_batch.cc at main · facebook/rocksdb · GitHub|https://github.com/facebook/rocksdb/blob/main/db/write_batch.cc#L246-L247]), neither of which actually releases the memory built up by a std::string instance. The only way to actually release this memory is to delete the WriteBatch object itself. Currently, all memory taken by all write batches will remain until the RocksDB state store instance is closed, which never happens during the normal course of operation as all partitions remain loaded on an executor after a task completes. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40775) V2 file scans have duplicative descriptions
Adam Binford created SPARK-40775: Summary: V2 file scans have duplicative descriptions Key: SPARK-40775 URL: https://issues.apache.org/jira/browse/SPARK-40775 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.0 Reporter: Adam Binford V2 file scans have duplication in the description. This is because FileScan uses the metadata to create the description, but each file type overrides metadata and the description adding the same metadata. Example from a parquet agg pushdown explain: {{ *+- BatchScan parquet file:/...[min(_3)#814, max(_3)#815, min(_1)#816, max(_1)#817, count(*)#818L, count(_1)#819L, count(_2)#820L, count(_3)#821L] ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedAggregation: [MIN(_3), MAX(_3), MIN(_1), MAX(_1), COUNT(*), COUNT(_1), COUNT(_2), COUNT(_3)], PushedFilters: [], PushedGroupBy: [], ReadSchema: struct
[jira] [Created] (SPARK-40565) Non-deterministic filters shouldn't get pushed to V2 file sources
Adam Binford created SPARK-40565: Summary: Non-deterministic filters shouldn't get pushed to V2 file sources Key: SPARK-40565 URL: https://issues.apache.org/jira/browse/SPARK-40565 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.0 Reporter: Adam Binford Currently non-deterministic filters can be pushed down to V2 file sources, which is different from V1 which prevents out non-deterministic filters from being pushed. Main consequences: * Things like doing a rand filter on a partition column will throw an exception: ** {{IllegalArgumentException: requirement failed: Nondeterministic expression org.apache.spark.sql.catalyst.expressions.Rand should be initialized before eval.}} * {{Using a non-deterministic UDF to collect metrics via accumulators gets pushed down and gives the wrong metrics}} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40314) Add inline Scala and Python bindings
Adam Binford created SPARK-40314: Summary: Add inline Scala and Python bindings Key: SPARK-40314 URL: https://issues.apache.org/jira/browse/SPARK-40314 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.0 Reporter: Adam Binford Add bindings for inline and inline_outer to Scala and Python function APIs -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40246) Logging isn't configurable via log4j2 with hadoop-provided profile
[ https://issues.apache.org/jira/browse/SPARK-40246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford updated SPARK-40246: - Component/s: Build (was: Spark Core) > Logging isn't configurable via log4j2 with hadoop-provided profile > -- > > Key: SPARK-40246 > URL: https://issues.apache.org/jira/browse/SPARK-40246 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 3.3.0 >Reporter: Adam Binford >Priority: Major > > When building Spark with -Phadoop-provided (or using the 3.3.0 build without > Hadoop), there is no slf implementation provided for log4j2, so the default > log4j2 properties are ignored and logging isn't configurable via > SparkContext.setLogLevel. > Reproduction on a fresh Ubuntu container: > > {noformat} > apt-get update > apt-get install -y wget > wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz > wget > https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-without-hadoop.tgz > tar -xvf hadoop-3.3.4.tar.gz -C /opt > tar -xvf spark-3.3.0-bin-without-hadoop.tgz -C /opt > export HADOOP_HOME=/opt/hadoop-3.3.4/ > export SPARK_HOME=/opt/spark-3.3.0-bin-without-hadoop/ > apt install -y openjdk-11-jre-headless python3 > export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64/ > export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath) > $SPARK_HOME/bin/pyspark > {noformat} > The default log level starts at INFO and you can't change it with > sc.setLogLevel -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40246) Logging isn't configurable via log4j2 with hadoop-provided profile
Adam Binford created SPARK-40246: Summary: Logging isn't configurable via log4j2 with hadoop-provided profile Key: SPARK-40246 URL: https://issues.apache.org/jira/browse/SPARK-40246 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.3.0 Reporter: Adam Binford When building Spark with -Phadoop-provided (or using the 3.3.0 build without Hadoop), there is no slf implementation provided for log4j2, so the default log4j2 properties are ignored and logging isn't configurable via SparkContext.setLogLevel. Reproduction on a fresh Ubuntu container: {noformat} apt-get update apt-get install -y wget wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz wget https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-without-hadoop.tgz tar -xvf hadoop-3.3.4.tar.gz -C /opt tar -xvf spark-3.3.0-bin-without-hadoop.tgz -C /opt export HADOOP_HOME=/opt/hadoop-3.3.4/ export SPARK_HOME=/opt/spark-3.3.0-bin-without-hadoop/ apt install -y openjdk-11-jre-headless python3 export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64/ export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath) $SPARK_HOME/bin/pyspark {noformat} The default log level starts at INFO and you can't change it with sc.setLogLevel -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39951) Support columnar batches with nested fields in Parquet V2
Adam Binford created SPARK-39951: Summary: Support columnar batches with nested fields in Parquet V2 Key: SPARK-39951 URL: https://issues.apache.org/jira/browse/SPARK-39951 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: Adam Binford Follow up to https://issues.apache.org/jira/browse/SPARK-34863 to updated `supportsColumnarReads` to account for nested fields -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38640) NPE with unpersisting memory-only RDD with RDD fetching from shuffle service enabled
Adam Binford created SPARK-38640: Summary: NPE with unpersisting memory-only RDD with RDD fetching from shuffle service enabled Key: SPARK-38640 URL: https://issues.apache.org/jira/browse/SPARK-38640 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.2.1 Reporter: Adam Binford If you have RDD fetching from shuffle service enabled, memory-only cached RDDs will fail to unpersist. ``` // spark.shuffle.service.fetch.rdd.enabled=true val df = spark.range(5) .persist(StorageLevel.MEMORY_ONLY) df.count() df.unpersist(true) ``` -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38640) NPE with unpersisting memory-only RDD with RDD fetching from shuffle service enabled
[ https://issues.apache.org/jira/browse/SPARK-38640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford updated SPARK-38640: - Description: If you have RDD fetching from shuffle service enabled, memory-only cached RDDs will fail to unpersist. {code:java} // spark.shuffle.service.fetch.rdd.enabled=true val df = spark.range(5) .persist(StorageLevel.MEMORY_ONLY) df.count() df.unpersist(true) {code} was: If you have RDD fetching from shuffle service enabled, memory-only cached RDDs will fail to unpersist. ``` // spark.shuffle.service.fetch.rdd.enabled=true val df = spark.range(5) .persist(StorageLevel.MEMORY_ONLY) df.count() df.unpersist(true) ``` > NPE with unpersisting memory-only RDD with RDD fetching from shuffle service > enabled > > > Key: SPARK-38640 > URL: https://issues.apache.org/jira/browse/SPARK-38640 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.1 >Reporter: Adam Binford >Priority: Major > > If you have RDD fetching from shuffle service enabled, memory-only cached > RDDs will fail to unpersist. > > > {code:java} > // spark.shuffle.service.fetch.rdd.enabled=true > val df = spark.range(5) > .persist(StorageLevel.MEMORY_ONLY) > df.count() > df.unpersist(true) > {code} > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38194) Make Yarn memory overhead factor configurable
Adam Binford created SPARK-38194: Summary: Make Yarn memory overhead factor configurable Key: SPARK-38194 URL: https://issues.apache.org/jira/browse/SPARK-38194 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 3.2.1 Reporter: Adam Binford Currently if the memory overhead is not provided for a Yarn job, it defaults to 10% of the respective driver/executor memory. This 10% is hard-coded and the only way to increase memory overhead is to set the exact memory overhead. We have seen more than 10% memory being used, and it would be helpful to be able to set the default overhead factor so that the overhead doesn't need to be pre-calculated for any driver/executor memory size. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35570) Shuffle file leak with external shuffle service enable
[ https://issues.apache.org/jira/browse/SPARK-35570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17463386#comment-17463386 ] Adam Binford commented on SPARK-35570: -- Just found this after making https://issues.apache.org/jira/browse/SPARK-37618. I'm working on an approach that makes use of the existing remove blocks call for RDDs served from the shuffle server. Can try to get some feedback once I have that working. > Shuffle file leak with external shuffle service enable > -- > > Key: SPARK-35570 > URL: https://issues.apache.org/jira/browse/SPARK-35570 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 3.1.2 >Reporter: ZiyueGuan >Priority: Major > > Unlike rdd block, external shuffle service doesn't offer a cleaning up of > shuffle file. The cleaning up of shuffle file mainly rely on alive executors > to response the request from context cleaner. As long as the executor exit, > the shuffle file left will not be cleaned until application exits. For > streaming application or long running application, disk may run out. > I'm confused that shuffle file was left like above while the lifecycle of rdd > block was properly handled. Is there any difference between them? -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-37618) Support cleaning up shuffle blocks from external shuffle service
Adam Binford created SPARK-37618: Summary: Support cleaning up shuffle blocks from external shuffle service Key: SPARK-37618 URL: https://issues.apache.org/jira/browse/SPARK-37618 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Affects Versions: 3.2.0 Reporter: Adam Binford Currently shuffle data is not cleaned up when an external shuffle service is used and the associated executor has been deallocated before the shuffle is cleaned up. Shuffle data is only cleaned up once the application ends. There have been various issues filed for this: https://issues.apache.org/jira/browse/SPARK-26020 https://issues.apache.org/jira/browse/SPARK-17233 https://issues.apache.org/jira/browse/SPARK-4236 But shuffle files will still stick around until an application completes. Dynamic allocation is commonly used for long running jobs (such as structured streaming), so any long running jobs with a large shuffle involved will eventually fill up local disk space. The shuffle service already supports cleaning up shuffle service persisted RDDs, so it should be able to support cleaning up shuffle blocks as well once the shuffle is removed by the ContextCleaner. The current alternative is to use shuffle tracking instead of an external shuffle service, but this is less optimal from a resource perspective as all executors must be kept alive until the shuffle has been fully consumed and cleaned up (and with the default GC interval being 30 minutes this can waste a lot of time with executors held onto but not doing anything). -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-37467) Consolidate whole stage and non-whole stage subexpression elimination
Adam Binford created SPARK-37467: Summary: Consolidate whole stage and non-whole stage subexpression elimination Key: SPARK-37467 URL: https://issues.apache.org/jira/browse/SPARK-37467 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: Adam Binford Currently there are separate subexpression elimination paths for whole stage and non-whole stage codegen. Consolidating these into a single code path would make it simpler to add further enhancements, such as supporting lambda functions (https://issues.apache.org/jira/browse/SPARK-37466). -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-37466) Support subexpression elimination in lambda functions
Adam Binford created SPARK-37466: Summary: Support subexpression elimination in lambda functions Key: SPARK-37466 URL: https://issues.apache.org/jira/browse/SPARK-37466 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: Adam Binford https://issues.apache.org/jira/browse/SPARK-37019 will add codegen support for higher order functions. However we can't support subexpression elimination inside of lambda functions because subexpressions are evaluated once per row at the beginning of the codegen. Common expressions inside lambda functions can easily result in performance degradation due to multiple evaluations of the same expression. Subexpression elimination inside of lambda functions needs to be handled specially to be evaluated once per function invocation. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37019) Add Codegen support to array higher-order-functions
[ https://issues.apache.org/jira/browse/SPARK-37019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford updated SPARK-37019: - Description: Currently all of the higher order functions use CodegenFallback. We can improve the performance of these by adding proper codegen support, so the function as well as all children can be codegen'd, and it can participate in WholeStageCodegen. This ticket is for adding support to array-based higher-order functions. was: Currently all of the higher order functions use CodegenFallback. We can improve the performance of these by adding proper codegen support, so the function as well as all children can be codegen'd, and it can participate in WholeStageCodegen. This ticket is for adding support to ArrayTransform as the first step. Summary: Add Codegen support to array higher-order-functions (was: Add Codegen support to ArrayTransform) > Add Codegen support to array higher-order-functions > --- > > Key: SPARK-37019 > URL: https://issues.apache.org/jira/browse/SPARK-37019 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Adam Binford >Priority: Major > > Currently all of the higher order functions use CodegenFallback. We can > improve the performance of these by adding proper codegen support, so the > function as well as all children can be codegen'd, and it can participate in > WholeStageCodegen. > This ticket is for adding support to array-based higher-order functions. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-37019) Add Codegen support to ArrayTransform
Adam Binford created SPARK-37019: Summary: Add Codegen support to ArrayTransform Key: SPARK-37019 URL: https://issues.apache.org/jira/browse/SPARK-37019 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Adam Binford Currently all of the higher order functions use CodegenFallback. We can improve the performance of these by adding proper codegen support, so the function as well as all children can be codegen'd, and it can participate in WholeStageCodegen. This ticket is for adding support to ArrayTransform as the first step. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36918) unionByName shouldn't consider types when comparing structs
Adam Binford created SPARK-36918: Summary: unionByName shouldn't consider types when comparing structs Key: SPARK-36918 URL: https://issues.apache.org/jira/browse/SPARK-36918 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Adam Binford Improvement/follow-on of https://issues.apache.org/jira/browse/SPARK-35290. We use StructType.sameType to see if we need to recreate the struct, but this can lead to false positives if the structure is the same but the types are different, and will lead to simply creating a new struct that's exactly the same as the original. This can cause significant overhead when unioning multiple deeply nested nullable structs, as each time it's recreated it gets wrapped in a If(IsNull()). Only comparing the field names can lead to more efficient plans. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35863) Upgrade Ivy to 2.5.0
Adam Binford created SPARK-35863: Summary: Upgrade Ivy to 2.5.0 Key: SPARK-35863 URL: https://issues.apache.org/jira/browse/SPARK-35863 Project: Spark Issue Type: Improvement Components: Spark Submit Affects Versions: 3.1.2 Reporter: Adam Binford Apache Ivy 2.5.0 was released nearly two years ago. The new bug fixes and features can be found here: [https://ant.apache.org/ivy/history/latest-milestone/release-notes.html] Most notably, the adding of ivy.maven.lookup.sources and ivy.maven.lookup.javadoc configs can significantly speed up module resolution time if these are turned off, especially behind a proxy. These could arguably be turned off by default, because when submitting jobs you probably don't care about the sources or javadoc jars. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions
[ https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358286#comment-17358286 ] Adam Binford commented on SPARK-35564: -- Is that documented somewhere? I know Boolean expressions aren't guaranteed to short circuit, but I think most spark users would assume multiple when clauses would short circuit > Support subexpression elimination for non-common branches of conditional > expressions > > > Key: SPARK-35564 > URL: https://issues.apache.org/jira/browse/SPARK-35564 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.1 >Reporter: Adam Binford >Priority: Major > > https://issues.apache.org/jira/browse/SPARK-7 added support for pulling > subexpressions out of branches of conditional expressions for expressions > present in all branches. We should be able to take this a step further and > pull out subexpressions for any branch, as long as that expression will > definitely be evaluated at least once. > Consider a common data validation example: > {code:java} > from pyspark.sql.functions import * > df = spark.createDataFrame([['word'], ['1234']]) > col = regexp_replace('_1', r'\d', '') > df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code} > We only want to keep the value if it's non-empty with numbers removed, > otherwise we want it to be null. > Because we have no otherwise value, `col` is not a candidate for > subexpression elimination (you can see two regular expression replacements in > the codegen). But whenever the length is greater than 0, we will have to > execute the regular expression replacement twice. Since we know we will > always calculate `col` at least once, it makes sense to consider that as a > subexpression since we might need it again in the branch value. So we can > update the logic from: > Create a subexpression if an expression will always be evaluated at least > twice > To: > Create a subexpression if an expression will always be evaluated at least > once AND will either always or conditionally be evaluated at least twice. > The trade off is potentially another subexpression function call (for split > subexpressions) if the second evaluation doesn't happen, but this seems like > it would be worth it for when it is evaluated the second time. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions
[ https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358275#comment-17358275 ] Adam Binford edited comment on SPARK-35564 at 6/6/21, 10:50 PM: No the values are fine, it's the tail conditions that cause the issue. {code:java} spark.range(2).select(when($"id" >= 0, lit(1)).when(myUdf($"id") > 0, lit(2)), when($"id" > -1, lit(1)).when(myUdf($"id") > 0, lit(2))).show(){code} Here myUdf($"id") gets pulled out as a subexpression even though it never should be evaluated. was (Author: kimahriman): No the values are fine, it's the condition that cause the issue. {code:java} spark.range(2).select(when($"id" >= 0, lit(1)).when(myUdf($"id") > 0, lit(2)), when($"id" > -1, lit(1)).when(myUdf($"id") > 0, lit(2))).show(){code} Here myUdf($"id") gets pulled out as a subexpression even though it never should be evaluated. > Support subexpression elimination for non-common branches of conditional > expressions > > > Key: SPARK-35564 > URL: https://issues.apache.org/jira/browse/SPARK-35564 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.1 >Reporter: Adam Binford >Priority: Major > > https://issues.apache.org/jira/browse/SPARK-7 added support for pulling > subexpressions out of branches of conditional expressions for expressions > present in all branches. We should be able to take this a step further and > pull out subexpressions for any branch, as long as that expression will > definitely be evaluated at least once. > Consider a common data validation example: > {code:java} > from pyspark.sql.functions import * > df = spark.createDataFrame([['word'], ['1234']]) > col = regexp_replace('_1', r'\d', '') > df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code} > We only want to keep the value if it's non-empty with numbers removed, > otherwise we want it to be null. > Because we have no otherwise value, `col` is not a candidate for > subexpression elimination (you can see two regular expression replacements in > the codegen). But whenever the length is greater than 0, we will have to > execute the regular expression replacement twice. Since we know we will > always calculate `col` at least once, it makes sense to consider that as a > subexpression since we might need it again in the branch value. So we can > update the logic from: > Create a subexpression if an expression will always be evaluated at least > twice > To: > Create a subexpression if an expression will always be evaluated at least > once AND will either always or conditionally be evaluated at least twice. > The trade off is potentially another subexpression function call (for split > subexpressions) if the second evaluation doesn't happen, but this seems like > it would be worth it for when it is evaluated the second time. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions
[ https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358275#comment-17358275 ] Adam Binford commented on SPARK-35564: -- No the values are fine, it's the condition that cause the issue. {code:java} spark.range(2).select(when($"id" >= 0, lit(1)).when(myUdf($"id") > 0, lit(2)), when($"id" > -1, lit(1)).when(myUdf($"id") > 0, lit(2))).show(){code} Here myUdf($"id") gets pulled out as a subexpression even though it never should be evaluated. > Support subexpression elimination for non-common branches of conditional > expressions > > > Key: SPARK-35564 > URL: https://issues.apache.org/jira/browse/SPARK-35564 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.1 >Reporter: Adam Binford >Priority: Major > > https://issues.apache.org/jira/browse/SPARK-7 added support for pulling > subexpressions out of branches of conditional expressions for expressions > present in all branches. We should be able to take this a step further and > pull out subexpressions for any branch, as long as that expression will > definitely be evaluated at least once. > Consider a common data validation example: > {code:java} > from pyspark.sql.functions import * > df = spark.createDataFrame([['word'], ['1234']]) > col = regexp_replace('_1', r'\d', '') > df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code} > We only want to keep the value if it's non-empty with numbers removed, > otherwise we want it to be null. > Because we have no otherwise value, `col` is not a candidate for > subexpression elimination (you can see two regular expression replacements in > the codegen). But whenever the length is greater than 0, we will have to > execute the regular expression replacement twice. Since we know we will > always calculate `col` at least once, it makes sense to consider that as a > subexpression since we might need it again in the branch value. So we can > update the logic from: > Create a subexpression if an expression will always be evaluated at least > twice > To: > Create a subexpression if an expression will always be evaluated at least > once AND will either always or conditionally be evaluated at least twice. > The trade off is potentially another subexpression function call (for split > subexpressions) if the second evaluation doesn't happen, but this seems like > it would be worth it for when it is evaluated the second time. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions
[ https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358210#comment-17358210 ] Adam Binford commented on SPARK-35564: -- You can construct a similar CaseWhen that could lead to a similar problem, the coalesce was just simpler to demonstrate > Support subexpression elimination for non-common branches of conditional > expressions > > > Key: SPARK-35564 > URL: https://issues.apache.org/jira/browse/SPARK-35564 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.1 >Reporter: Adam Binford >Priority: Major > > https://issues.apache.org/jira/browse/SPARK-7 added support for pulling > subexpressions out of branches of conditional expressions for expressions > present in all branches. We should be able to take this a step further and > pull out subexpressions for any branch, as long as that expression will > definitely be evaluated at least once. > Consider a common data validation example: > {code:java} > from pyspark.sql.functions import * > df = spark.createDataFrame([['word'], ['1234']]) > col = regexp_replace('_1', r'\d', '') > df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code} > We only want to keep the value if it's non-empty with numbers removed, > otherwise we want it to be null. > Because we have no otherwise value, `col` is not a candidate for > subexpression elimination (you can see two regular expression replacements in > the codegen). But whenever the length is greater than 0, we will have to > execute the regular expression replacement twice. Since we know we will > always calculate `col` at least once, it makes sense to consider that as a > subexpression since we might need it again in the branch value. So we can > update the logic from: > Create a subexpression if an expression will always be evaluated at least > twice > To: > Create a subexpression if an expression will always be evaluated at least > once AND will either always or conditionally be evaluated at least twice. > The trade off is potentially another subexpression function call (for split > subexpressions) if the second evaluation doesn't happen, but this seems like > it would be worth it for when it is evaluated the second time. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions
[ https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358206#comment-17358206 ] Adam Binford commented on SPARK-35564: -- Yes that was an example of "will run at least once and maybe more than once" that I'm proposing to add more support for in this issue. An example of current behavior that would be considered a bug is: {code:java} spark.range(2).select(coalesce($"id", myUdf($"id")), coalesce($"id" + 1, myUdf($"id"))).show() {code} myUdf will be pulled out into a subexpression even though it is never executed. > Support subexpression elimination for non-common branches of conditional > expressions > > > Key: SPARK-35564 > URL: https://issues.apache.org/jira/browse/SPARK-35564 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.1 >Reporter: Adam Binford >Priority: Major > > https://issues.apache.org/jira/browse/SPARK-7 added support for pulling > subexpressions out of branches of conditional expressions for expressions > present in all branches. We should be able to take this a step further and > pull out subexpressions for any branch, as long as that expression will > definitely be evaluated at least once. > Consider a common data validation example: > {code:java} > from pyspark.sql.functions import * > df = spark.createDataFrame([['word'], ['1234']]) > col = regexp_replace('_1', r'\d', '') > df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code} > We only want to keep the value if it's non-empty with numbers removed, > otherwise we want it to be null. > Because we have no otherwise value, `col` is not a candidate for > subexpression elimination (you can see two regular expression replacements in > the codegen). But whenever the length is greater than 0, we will have to > execute the regular expression replacement twice. Since we know we will > always calculate `col` at least once, it makes sense to consider that as a > subexpression since we might need it again in the branch value. So we can > update the logic from: > Create a subexpression if an expression will always be evaluated at least > twice > To: > Create a subexpression if an expression will always be evaluated at least > once AND will either always or conditionally be evaluated at least twice. > The trade off is potentially another subexpression function call (for split > subexpressions) if the second evaluation doesn't happen, but this seems like > it would be worth it for when it is evaluated the second time. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions
[ https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358117#comment-17358117 ] Adam Binford commented on SPARK-35564: -- Turns out this is already happening for certain when and coalesce expressions. For example: {code:java} spark.range(2).select(myUdf($"id"), coalesce($"id", myUdf($"id"))) {code} myUdf gets pulled out as a subexpression even though it might only be executed once per row. This can be a correctness issue for very specific edge cases similar to https://issues.apache.org/jira/browse/SPARK-35449 where myUdf could get executed for a row even though it doesn't pass certain conditional checks > Support subexpression elimination for non-common branches of conditional > expressions > > > Key: SPARK-35564 > URL: https://issues.apache.org/jira/browse/SPARK-35564 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.1 >Reporter: Adam Binford >Priority: Major > > https://issues.apache.org/jira/browse/SPARK-7 added support for pulling > subexpressions out of branches of conditional expressions for expressions > present in all branches. We should be able to take this a step further and > pull out subexpressions for any branch, as long as that expression will > definitely be evaluated at least once. > Consider a common data validation example: > {code:java} > from pyspark.sql.functions import * > df = spark.createDataFrame([['word'], ['1234']]) > col = regexp_replace('_1', r'\d', '') > df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code} > We only want to keep the value if it's non-empty with numbers removed, > otherwise we want it to be null. > Because we have no otherwise value, `col` is not a candidate for > subexpression elimination (you can see two regular expression replacements in > the codegen). But whenever the length is greater than 0, we will have to > execute the regular expression replacement twice. Since we know we will > always calculate `col` at least once, it makes sense to consider that as a > subexpression since we might need it again in the branch value. So we can > update the logic from: > Create a subexpression if an expression will always be evaluated at least > twice > To: > Create a subexpression if an expression will always be evaluated at least > once AND will either always or conditionally be evaluated at least twice. > The trade off is potentially another subexpression function call (for split > subexpressions) if the second evaluation doesn't happen, but this seems like > it would be worth it for when it is evaluated the second time. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions
[ https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354637#comment-17354637 ] Adam Binford commented on SPARK-35564: -- A 2x gain would be pretty significant to us, I don't know about others. I'm planning to implement this in our fork and if I get good results I'll put up a PR for further discussion. Could optionally add a config for this if it's workload dependent. Also, the only thing it could likely do to the generated code is reduce the overall size, albeit with more functional calls in worst cases. Whether smaller code size adds any value, I don't know enough about Java to know. >Oh, this is another issue. I noticed it last time when I worked on another PR >recently, but don't have time to look at it yet. I created https://issues.apache.org/jira/browse/SPARK-35580 to track what I've figured out so far. Not sure what the right fix is. > Support subexpression elimination for non-common branches of conditional > expressions > > > Key: SPARK-35564 > URL: https://issues.apache.org/jira/browse/SPARK-35564 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.1 >Reporter: Adam Binford >Priority: Major > > https://issues.apache.org/jira/browse/SPARK-7 added support for pulling > subexpressions out of branches of conditional expressions for expressions > present in all branches. We should be able to take this a step further and > pull out subexpressions for any branch, as long as that expression will > definitely be evaluated at least once. > Consider a common data validation example: > {code:java} > from pyspark.sql.functions import * > df = spark.createDataFrame([['word'], ['1234']]) > col = regexp_replace('_1', r'\d', '') > df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code} > We only want to keep the value if it's non-empty with numbers removed, > otherwise we want it to be null. > Because we have no otherwise value, `col` is not a candidate for > subexpression elimination (you can see two regular expression replacements in > the codegen). But whenever the length is greater than 0, we will have to > execute the regular expression replacement twice. Since we know we will > always calculate `col` at least once, it makes sense to consider that as a > subexpression since we might need it again in the branch value. So we can > update the logic from: > Create a subexpression if an expression will always be evaluated at least > twice > To: > Create a subexpression if an expression will always be evaluated at least > once AND will either always or conditionally be evaluated at least twice. > The trade off is potentially another subexpression function call (for split > subexpressions) if the second evaluation doesn't happen, but this seems like > it would be worth it for when it is evaluated the second time. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35580) Support subexpression elimination for higher order functions
Adam Binford created SPARK-35580: Summary: Support subexpression elimination for higher order functions Key: SPARK-35580 URL: https://issues.apache.org/jira/browse/SPARK-35580 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.1 Reporter: Adam Binford Currently higher order functions are not candidates for subexpression elimination. This is because all higher order functions have different semantic hashes, due to "exprId" and "value" in "NamedLambdaVariable". These always are unique, so the semanticHash of a NamedLambdaVariable is always unique. Also, [https://github.com/apache/spark/pull/32424] might throw a wrench in things some too, depending on how you define your expressions the name could be different. {code:java} scala> var d = transform($"a", x => x + 1) d: org.apache.spark.sql.Column = transform(a, lambdafunction((x_2 + 1), x_2)) scala> var e = transform($"a", x => x + 1) e: org.apache.spark.sql.Column = transform(a, lambdafunction((x_3 + 1), x_3)) scala> struct(d.alias("1"), d.alias("2")).expr res9: org.apache.spark.sql.catalyst.expressions.Expression = struct(NamePlaceholder, transform('a, lambdafunction((lambda 'x_2 + 1), lambda 'x_2, false)) AS 1#4, NamePlaceholder, transform('a, lambdafunction((lambda 'x_2 + 1), lambda 'x_2, false)) AS 2#5) scala> struct(d.alias("1"), e.alias("2")).expr res10: org.apache.spark.sql.catalyst.expressions.Expression = struct(NamePlaceholder, transform('a, lambdafunction((lambda 'x_2 + 1), lambda 'x_2, false)) AS 1#6, NamePlaceholder, transform('a, lambdafunction((lambda 'x_3 + 1), lambda 'x_3, false)) AS 2#7) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions
[ https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354447#comment-17354447 ] Adam Binford commented on SPARK-35564: -- >Do you mean "Create a subexpression if an expression will always be evaluated >at least once AND will be evaluated at least once in conditional expression"? Yeah you can think of it that way in terms of adding to existing functionality. I was trying to word it in a way that encompassed existing functionality as well. >And this looks like a corner case, so I'm not sure if it is worth to do this. I don't really think this is much of a corner case, but a common case of using a when expression for data validation. Most of our ETL process comes down to normalizing, cleaning, and validating strings, which at the end of the day usually looks like: {code:java} column = normalize_value(col('my_raw_value')) result = when(column != '', column){code} where "normalize_value" usually involves some combination of regexp_repace's, lower/upper, and trim. And things get worse when you are dealing with arrays of strings and want to minimize your data: {code:java} column = filter(transform(col('my_raw_array_value'), lambda x: normalize_value(x)), lambda x: x != '') result = when(size(column) > 0, column){code} though currently higher order functions are always semantically different so they don't get subexpressions regardless I think. That's something I plan to look into as a follow up. It's natural for users to think that these expressions only get evaluated once, and not that they are doubling their runtime trying to clean their data. To me the edge case is creating a subexpression in this case decreasing throughput. It would require a very large percentage of the rows to not pass the conditional check, since the additional calculation is much more expensive than the additional function call. I'm playing around with an implementation so we'll see how far I can get with it. > Support subexpression elimination for non-common branches of conditional > expressions > > > Key: SPARK-35564 > URL: https://issues.apache.org/jira/browse/SPARK-35564 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.1 >Reporter: Adam Binford >Priority: Major > > https://issues.apache.org/jira/browse/SPARK-7 added support for pulling > subexpressions out of branches of conditional expressions for expressions > present in all branches. We should be able to take this a step further and > pull out subexpressions for any branch, as long as that expression will > definitely be evaluated at least once. > Consider a common data validation example: > {code:java} > from pyspark.sql.functions import * > df = spark.createDataFrame([['word'], ['1234']]) > col = regexp_replace('_1', r'\d', '') > df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code} > We only want to keep the value if it's non-empty with numbers removed, > otherwise we want it to be null. > Because we have no otherwise value, `col` is not a candidate for > subexpression elimination (you can see two regular expression replacements in > the codegen). But whenever the length is greater than 0, we will have to > execute the regular expression replacement twice. Since we know we will > always calculate `col` at least once, it makes sense to consider that as a > subexpression since we might need it again in the branch value. So we can > update the logic from: > Create a subexpression if an expression will always be evaluated at least > twice > To: > Create a subexpression if an expression will always be evaluated at least > once AND will either always or conditionally be evaluated at least twice. > The trade off is potentially another subexpression function call (for split > subexpressions) if the second evaluation doesn't happen, but this seems like > it would be worth it for when it is evaluated the second time. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions
Adam Binford created SPARK-35564: Summary: Support subexpression elimination for non-common branches of conditional expressions Key: SPARK-35564 URL: https://issues.apache.org/jira/browse/SPARK-35564 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.1 Reporter: Adam Binford https://issues.apache.org/jira/browse/SPARK-7 added support for pulling subexpressions out of branches of conditional expressions for expressions present in all branches. We should be able to take this a step further and pull out subexpressions for any branch, as long as that expression will definitely be evaluated at least once. Consider a common data validation example: {code:java} from pyspark.sql.functions import * df = spark.createDataFrame([['word'], ['1234']]) col = regexp_replace('_1', r'\d', '') df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code} We only want to keep the value if it's non-empty with numbers removed, otherwise we want it to be null. Because we have no otherwise value, `col` is not a candidate for subexpression elimination (you can see two regular expression replacements in the codegen). But whenever the length is greater than 0, we will have to execute the regular expression replacement twice. Since we know we will always calculate `col` at least once, it makes sense to consider that as a subexpression since we might need it again in the branch value. So we can update the logic from: Create a subexpression if an expression will always be evaluated at least twice To: Create a subexpression if an expression will always be evaluated at least once AND will either always or conditionally be evaluated at least twice. The trade off is potentially another subexpression function call (for split subexpressions) if the second evaluation doesn't happen, but this seems like it would be worth it for when it is evaluated the second time. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35449) Should not extract common expressions from value expressions when elseValue is empty in CaseWhen
[ https://issues.apache.org/jira/browse/SPARK-35449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford updated SPARK-35449: - Description: [https://github.com/apache/spark/pull/30245] added support for creating subexpressions that are present in all branches of conditional statements. However, for a statement to be in "all branches" of a CaseWhen statement, it must also be in the elseValue. This can lead to a subexpression to be created and run for branches of a conditional that don't pass. This can cause issues especially with a UDF in a branch that gets executed assuming the condition is true. For example: {code:java} val col = when($"id" < 0, myUdf($"id")) spark.range(1).select(when(col > 0, col)).show() {code} myUdf($"id") gets extracted as a subexpression and executed even though both conditions don't pass and it should never be executed. > Should not extract common expressions from value expressions when elseValue > is empty in CaseWhen > > > Key: SPARK-35449 > URL: https://issues.apache.org/jira/browse/SPARK-35449 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Priority: Major > > [https://github.com/apache/spark/pull/30245] added support for creating > subexpressions that are present in all branches of conditional statements. > However, for a statement to be in "all branches" of a CaseWhen statement, it > must also be in the elseValue. This can lead to a subexpression to be created > and run for branches of a conditional that don't pass. This can cause issues > especially with a UDF in a branch that gets executed assuming the condition > is true. For example: > {code:java} > val col = when($"id" < 0, myUdf($"id")) > spark.range(1).select(when(col > 0, col)).show() > {code} > myUdf($"id") gets extracted as a subexpression and executed even though both > conditions don't pass and it should never be executed. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-35448) Subexpression elimination enhancements
[ https://issues.apache.org/jira/browse/SPARK-35448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347976#comment-17347976 ] Adam Binford edited comment on SPARK-35448 at 5/19/21, 11:21 PM: - Just spitballing after looking through some of the subexpression elimination code. Does it make any sense to treat the output of EquivalentExpressions as "candidates" for subexpressions, and then it's the codegen's job to figure out which subexpressions are actually needed and used and include the code for them? was (Author: kimahriman): Just spitballing after looking through some of the subexpression elimination code. Does it make more sense to treat the output of EquivalentExpressions as "candidates" for subexpressions, and then it's the codegen's job to figure out which subexpressions are actually needed and used and include the code for them? > Subexpression elimination enhancements > -- > > Key: SPARK-35448 > URL: https://issues.apache.org/jira/browse/SPARK-35448 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35448) Subexpression elimination enhancements
[ https://issues.apache.org/jira/browse/SPARK-35448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347976#comment-17347976 ] Adam Binford commented on SPARK-35448: -- Just spitballing after looking through some of the subexpression elimination code. Does it make more sense to treat the output of EquivalentExpressions as "candidates" for subexpressions, and then it's the codegen's job to figure out which subexpressions are actually needed and used and include the code for them? > Subexpression elimination enhancements > -- > > Key: SPARK-35448 > URL: https://issues.apache.org/jira/browse/SPARK-35448 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35256) Subexpression elimination leading to a performance regression
[ https://issues.apache.org/jira/browse/SPARK-35256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347974#comment-17347974 ] Adam Binford commented on SPARK-35256: -- See https://issues.apache.org/jira/browse/SPARK-35448 for a few related issues for subexpression elimination being worked. Specifically https://issues.apache.org/jira/browse/SPARK-35410 is probably the main issue you're facing where `when` expressions can cause a lot of extra, unused subexpressions to be included in the codegen > Subexpression elimination leading to a performance regression > - > > Key: SPARK-35256 > URL: https://issues.apache.org/jira/browse/SPARK-35256 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Ondrej Kokes >Priority: Minor > Attachments: bisect_log.txt, bisect_timing.csv > > > I'm seeing almost double the runtime between 3.0.1 and 3.1.1 in my pipeline > that does mostly str_to_map, split and a few other operations - all > projections, no joins or aggregations (it's here only to trigger the > pipeline). I cut it down to the simplest reproducible example I could - > anything I remove from this changes the runtime difference quite > dramatically. (even moving those two expressions from f.when to standalone > columns makes the difference disappear) > {code:java} > import time > import os > import pyspark > from pyspark.sql import SparkSession > import pyspark.sql.functions as f > if __name__ == '__main__': > print(pyspark.__version__) > spark = SparkSession.builder.getOrCreate() > filename = 'regression.csv' > if not os.path.isfile(filename): > with open(filename, 'wt') as fw: > fw.write('foo\n') > for _ in range(10_000_000): > fw.write('foo=bar=bak=f,o,1:2:3\n') > df = spark.read.option('header', True).csv(filename) > t = time.time() > dd = (df > .withColumn('my_map', f.expr('str_to_map(foo, "&", "=")')) > .withColumn('extracted', > # without this top level split it is only 50% > slower, with it > # the runtime almost doubles > f.split(f.split(f.col("my_map")["bar"], ",")[2], > ":")[0] >) > .select( > f.when( > f.col("extracted").startswith("foo"), f.col("extracted") > ).otherwise( > f.concat(f.lit("foo"), f.col("extracted")) > ).alias("foo") > ) > ) > # dd.explain(True) > _ = dd.groupby("foo").count().count() > print("elapsed", time.time() - t) > {code} > Running this in 3.0.1 and 3.1.1 respectively (both installed from PyPI, on my > local macOS) > {code:java} > 3.0.1 > elapsed 21.262351036071777 > 3.1.1 > elapsed 40.26582884788513 > {code} > (Meaning the transformation took 21 seconds in 3.0.1 and 40 seconds in 3.1.1) > Feel free to make the CSV smaller to get a quicker feedback loop - it scales > linearly (I developed this with 2M rows). > It might be related to my previous issue - SPARK-32989 - there are similar > operations, nesting etc. (splitting on the original column, not on a map, > makes the difference disappear) > I tried dissecting the queries in SparkUI and via explain, but both 3.0.1 and > 3.1.1 produced identical plans. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35410) Unused subexpressions leftover in WholeStageCodegen subexpression elimination
[ https://issues.apache.org/jira/browse/SPARK-35410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345044#comment-17345044 ] Adam Binford commented on SPARK-35410: -- [~viirya], thanks for adding all these great new features! Sorry I keep finding weird corner cases with them, but it's because we're heavily making use of them. > Unused subexpressions leftover in WholeStageCodegen subexpression elimination > - > > Key: SPARK-35410 > URL: https://issues.apache.org/jira/browse/SPARK-35410 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Adam Binford >Priority: Major > Attachments: codegen.txt > > > Trying to understand and debug the performance of some of our jobs, I started > digging into what the whole stage codegen code was doing. We use a lot of > case when statements, and I found that there were a lot of unused sub > expressions that were left over after the subexpression elimination, and it > gets worse the more expressions you have chained. The simple example: > {code:java} > import org.apache.spark.sql.functions._ > import spark.implicits._ > val myUdf = udf((s: String) => { > println("In UDF") > s.toUpperCase > }) > spark.range(5).select(when(length(myUdf($"id")) > 0, > length(myUdf($"id".show() > {code} > Running the code, you'll see "In UDF" printed out 10 times. And if you change > both to log(length(myUdf($"id")), "In UDF" will print out 20 times (one more > for a cast from int to double and one more for the actual log calculation I > think). > In the codegen for this (without the log), there are these initial > subexpressions: > {code:java} > /* 076 */ UTF8String project_subExprValue_0 = > project_subExpr_0(project_expr_0_0); > /* 077 */ int project_subExprValue_1 = > project_subExpr_1(project_expr_0_0); > /* 078 */ UTF8String project_subExprValue_2 = > project_subExpr_2(project_expr_0_0); > {code} > project_subExprValue_0 and project_subExprValue_2 are never actually used, so > it's properly resolving the two expressions and sharing the result of > project_subExprValue_1, but it's not removing the other sub expression calls > it seems like. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35410) Unused subexpressions leftover in WholeStageCodegen subexpression elimination
[ https://issues.apache.org/jira/browse/SPARK-35410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford updated SPARK-35410: - Attachment: codegen.txt > Unused subexpressions leftover in WholeStageCodegen subexpression elimination > - > > Key: SPARK-35410 > URL: https://issues.apache.org/jira/browse/SPARK-35410 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Adam Binford >Priority: Major > Attachments: codegen.txt > > > Trying to understand and debug the performance of some of our jobs, I started > digging into what the whole stage codegen code was doing. We use a lot of > case when statements, and I found that there were a lot of unused sub > expressions that were left over after the subexpression elimination, and it > gets worse the more expressions you have chained. The simple example: > {code:java} > import org.apache.spark.sql.functions._ > import spark.implicits._ > val myUdf = udf((s: String) => { > println("In UDF") > s.toUpperCase > }) > spark.range(5).select(when(length(myUdf($"id")) > 0, > length(myUdf($"id".show() > {code} > Running the code, you'll see "In UDF" printed out 10 times. And if you change > both to log(length(myUdf($"id")), "In UDF" will print out 20 times (one more > for a cast from int to double and one more for the actual log calculation I > think). > In the codegen for this (without the log), there are these initial > subexpressions: > {code:java} > /* 076 */ UTF8String project_subExprValue_0 = > project_subExpr_0(project_expr_0_0); > /* 077 */ int project_subExprValue_1 = > project_subExpr_1(project_expr_0_0); > /* 078 */ UTF8String project_subExprValue_2 = > project_subExpr_2(project_expr_0_0); > {code} > project_subExprValue_0 and project_subExprValue_2 are never actually used, so > it's properly resolving the two expressions and sharing the result of > project_subExprValue_1, but it's not removing the other sub expression calls > it seems like. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35410) Unused subexpressions leftover in WholeStageCodegen subexpression elimination
Adam Binford created SPARK-35410: Summary: Unused subexpressions leftover in WholeStageCodegen subexpression elimination Key: SPARK-35410 URL: https://issues.apache.org/jira/browse/SPARK-35410 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.1 Reporter: Adam Binford Trying to understand and debug the performance of some of our jobs, I started digging into what the whole stage codegen code was doing. We use a lot of case when statements, and I found that there were a lot of unused sub expressions that were left over after the subexpression elimination, and it gets worse the more expressions you have chained. The simple example: {code:java} import org.apache.spark.sql.functions._ import spark.implicits._ val myUdf = udf((s: String) => { println("In UDF") s.toUpperCase }) spark.range(5).select(when(length(myUdf($"id")) > 0, length(myUdf($"id".show() {code} Running the code, you'll see "In UDF" printed out 10 times. And if you change both to log(length(myUdf($"id")), "In UDF" will print out 20 times (one more for a cast from int to double and one more for the actual log calculation I think). In the codegen for this (without the log), there are these initial subexpressions: {code:java} /* 076 */ UTF8String project_subExprValue_0 = project_subExpr_0(project_expr_0_0); /* 077 */ int project_subExprValue_1 = project_subExpr_1(project_expr_0_0); /* 078 */ UTF8String project_subExprValue_2 = project_subExpr_2(project_expr_0_0); {code} project_subExprValue_0 and project_subExprValue_2 are never actually used, so it's properly resolving the two expressions and sharing the result of project_subExprValue_1, but it's not removing the other sub expression calls it seems like. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35309) array transform should respect alias in expression
[ https://issues.apache.org/jira/browse/SPARK-35309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338966#comment-17338966 ] Adam Binford commented on SPARK-35309: -- You want "x" to be "struct_alias"? That should be {code:java} struct(col.alias("struct_alias")){code} > array transform should respect alias in expression > -- > > Key: SPARK-35309 > URL: https://issues.apache.org/jira/browse/SPARK-35309 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.0 >Reporter: Derk Crezee >Priority: Major > > When an array is transformed using the transform function in combination with > an alias, I get an unexpected column name. > {code:scala} > // code placeholder > val data = Seq((Array(1, 2, 3))) > val df = sc.parallelize(data).toDF("a") > val dfRes = df.select( > transform( > $"a", > (col: Column) => struct(col).alias("struct_alias") > ).alias("a") > ) > dfRes.printSchema > // root > // |-- a: array (nullable = true) > // ||-- element: struct (containsNull = false) > // |||-- x: integer (nullable = false) > {code} > I expected the inner element to have the name 'struct_alias'. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35290) unionByName with null filling fails for some nested structs
[ https://issues.apache.org/jira/browse/SPARK-35290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338921#comment-17338921 ] Adam Binford commented on SPARK-35290: -- Running into some issues trying to get case insensitivity working correctly. I was hoping to be able to leave everything on both sides of the union in it's existing casing, and just get things in the right order, but I'm running into this issue: {code:java} >>> from pyspark.sql.functions import * >>> df1 = spark.range(1).withColumn('top', struct(lit('A').alias('A'))) >>> df2 = spark.range(1).withColumn('top', struct(lit('a').alias('a'))) >>> spark.conf.set('spark.sql.caseSensitive', 'true') ... pyspark.sql.utils.AnalysisException: Union can only be performed on tables with the compatible column types. struct <> struct at the second column of the second table; >>> spark.conf.set('spark.sql.caseSensitive', 'false') >>> df1.union(df2) DataFrame[id: bigint, top: struct] >>> df1.unionByName(df2) DataFrame[id: bigint, top: struct] >>> df1.unionByName(df2, True) DataFrame[id: bigint, top: struct] {code} With case sensitivity enabled, it errors out as expected because the two structs are different types. However, when case sensitivity is disabled, the union is happy because it sees them as the same type, but when the schemas are merged, it treats them as two separate fields. I assume it's related to the StructType.merge method, but I don't exactly know where that gets called in the context of a Union. I don't see anything in that merge function that handles case insensitivity. Is that a bug in itself or a feature? > unionByName with null filling fails for some nested structs > --- > > Key: SPARK-35290 > URL: https://issues.apache.org/jira/browse/SPARK-35290 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Adam Binford >Priority: Major > > We've encountered a few weird edge cases that seem to fail the new null > filling unionByName (which has been a great addition!). It seems to stem from > the fields being sorted by name and corrupted along the way. The simple > reproduction is: > {code:java} > df = spark.createDataFrame([[]]) > df1 = (df > .withColumn('top', F.struct( > F.struct( > F.lit('ba').alias('ba') > ).alias('b') > )) > ) > df2 = (df > .withColumn('top', F.struct( > F.struct( > F.lit('aa').alias('aa') > ).alias('a'), > F.struct( > F.lit('bb').alias('bb') > ).alias('b'), > )) > ) > df1.unionByName(df2, True).printSchema() > {code} > This results in the exception: > {code:java} > pyspark.sql.utils.AnalysisException: Union can only be performed on tables > with the compatible column types. > struct,b:struct> <> > struct,b:struct> at the first column > of the second table; > {code} > You can see in the second schema that it has > {code:java} > b:struct > {code} > when it should be > {code:java} > b:struct > {code} > It seems to happen somewhere during > [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala#L73,] > as everything seems correct up to that point from my testing. It's either > modifying one expression during the transformUp then corrupts other > expressions that are then modified, or the ExtractValue before the > addFieldsInto is remembering the ordinal position in the struct that is then > changing and causing issues. > > I found that simply using sortStructFields instead of > sortStructFieldsInWithFields gets things working correctly, but definitely > has a performance impact. The deep expr unionByName test takes ~1-2 seconds > normally but ~12-15 seconds with this change. I assume because the original > method tried to rewrite existing expressions vs the sortStructFields just > adds expressions on top of existing ones to project the new order. > I'm not sure if it makes sense to take the slower but works in the edge cases > method (assuming it doesn't break other cases, all existing tests pass), or > if there's a way to fix the existing method for cases like this. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35290) unionByName with null filling fails for some nested structs
[ https://issues.apache.org/jira/browse/SPARK-35290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338526#comment-17338526 ] Adam Binford commented on SPARK-35290: -- I've also been playing around with rewriting some of the logic to just directly recursively create a named struct and taking out the need for the UpdateField/WithField logic. I've gotten all existing tests to pass (including a new one for this case), without the 12-15 second overhead mentioned in the description for the deeply nested case, but I think there's still some case insensitivity I might need to take care of. I can put up a PR soon with what that looks like. > unionByName with null filling fails for some nested structs > --- > > Key: SPARK-35290 > URL: https://issues.apache.org/jira/browse/SPARK-35290 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Adam Binford >Priority: Major > > We've encountered a few weird edge cases that seem to fail the new null > filling unionByName (which has been a great addition!). It seems to stem from > the fields being sorted by name and corrupted along the way. The simple > reproduction is: > {code:java} > df = spark.createDataFrame([[]]) > df1 = (df > .withColumn('top', F.struct( > F.struct( > F.lit('ba').alias('ba') > ).alias('b') > )) > ) > df2 = (df > .withColumn('top', F.struct( > F.struct( > F.lit('aa').alias('aa') > ).alias('a'), > F.struct( > F.lit('bb').alias('bb') > ).alias('b'), > )) > ) > df1.unionByName(df2, True).printSchema() > {code} > This results in the exception: > {code:java} > pyspark.sql.utils.AnalysisException: Union can only be performed on tables > with the compatible column types. > struct,b:struct> <> > struct,b:struct> at the first column > of the second table; > {code} > You can see in the second schema that it has > {code:java} > b:struct > {code} > when it should be > {code:java} > b:struct > {code} > It seems to happen somewhere during > [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala#L73,] > as everything seems correct up to that point from my testing. It's either > modifying one expression during the transformUp then corrupts other > expressions that are then modified, or the ExtractValue before the > addFieldsInto is remembering the ordinal position in the struct that is then > changing and causing issues. > > I found that simply using sortStructFields instead of > sortStructFieldsInWithFields gets things working correctly, but definitely > has a performance impact. The deep expr unionByName test takes ~1-2 seconds > normally but ~12-15 seconds with this change. I assume because the original > method tried to rewrite existing expressions vs the sortStructFields just > adds expressions on top of existing ones to project the new order. > I'm not sure if it makes sense to take the slower but works in the edge cases > method (assuming it doesn't break other cases, all existing tests pass), or > if there's a way to fix the existing method for cases like this. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35290) unionByName with null filling fails for some nested structs
[ https://issues.apache.org/jira/browse/SPARK-35290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford updated SPARK-35290: - Description: We've encountered a few weird edge cases that seem to fail the new null filling unionByName (which has been a great addition!). It seems to stem from the fields being sorted by name and corrupted along the way. The simple reproduction is: {code:java} df = spark.createDataFrame([[]]) df1 = (df .withColumn('top', F.struct( F.struct( F.lit('ba').alias('ba') ).alias('b') )) ) df2 = (df .withColumn('top', F.struct( F.struct( F.lit('aa').alias('aa') ).alias('a'), F.struct( F.lit('bb').alias('bb') ).alias('b'), )) ) df1.unionByName(df2, True).printSchema() {code} This results in the exception: {code:java} pyspark.sql.utils.AnalysisException: Union can only be performed on tables with the compatible column types. struct,b:struct> <> struct,b:struct> at the first column of the second table; {code} You can see in the second schema that it has {code:java} b:struct {code} when it should be {code:java} b:struct {code} It seems to happen somewhere during [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala#L73,] as everything seems correct up to that point from my testing. It's either modifying one expression during the transformUp then corrupts other expressions that are then modified, or the ExtractValue before the addFieldsInto is remembering the ordinal position in the struct that is then changing and causing issues. I found that simply using sortStructFields instead of sortStructFieldsInWithFields gets things working correctly, but definitely has a performance impact. The deep expr unionByName test takes ~1-2 seconds normally but ~12-15 seconds with this change. I assume because the original method tried to rewrite existing expressions vs the sortStructFields just adds expressions on top of existing ones to project the new order. I'm not sure if it makes sense to take the slower but works in the edge cases method (assuming it doesn't break other cases, all existing tests pass), or if there's a way to fix the existing method for cases like this. was: We've encountered a few weird edge cases that seem to fail the new null filling unionByName (which has been a great addition!). It seems to stem from the fields being sorted by name and corrupted along the way. The simple reproduction is: {code:java} df = spark.createDataFrame([[]]) df1 = (df .withColumn('top', F.struct( F.struct( F.lit('ba').alias('ba') ).alias('b') )) ) df2 = (df .withColumn('top', F.struct( F.struct( F.lit('aa').alias('aa') ).alias('a'), F.struct( F.lit('bb').alias('bb') ).alias('b'), )) ) df1.unionByName(df2, True).printSchema() {code} This results in the exception: {code:java} pyspark.sql.utils.AnalysisException: Union can only be performed on tables with the compatible column types. struct,b:struct> <> struct,b:struct> at the first column of the second table; {code} You can see in the second schema that it has {code:java} b:struct {code} when it should be {code:java} b:struct {code} It seems to happen somewhere during [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala#L73,] as everything seems correct up to that point from my testing. It's either modifying one expression during the transformUp then corrupts other expressions that are then modified, or the ExtractValue before the addFieldsInto is remembering the ordinal position in the struct that is then changing and causing issues. I found that simply using sortStructFields instead of sortStructFieldsInWithFields gets things working correctly, but definitely has a performance impact. The deep expr unionByName test takes ~1-2 seconds normally but ~12-15 seconds with this change. I assume because the original method tried to rewrite existing expressions vs the sortStructFields just adds expressions on top of existing ones to project the new order. I'm not sure if it makes sense to take the slower but works in the edge cases method (assuming it doesn't break other cases, all existing tests pass), or if there's a way to fix the existing method for cases like this. > unionByName with null filling fails for some nested structs > --- > > Key: SPARK-35290 > URL: https://issues.apache.org/jira/browse/SPARK-35290 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Adam Binford >
[jira] [Created] (SPARK-35290) unionByName with null filling fails for some nested structs
Adam Binford created SPARK-35290: Summary: unionByName with null filling fails for some nested structs Key: SPARK-35290 URL: https://issues.apache.org/jira/browse/SPARK-35290 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.1 Reporter: Adam Binford We've encountered a few weird edge cases that seem to fail the new null filling unionByName (which has been a great addition!). It seems to stem from the fields being sorted by name and corrupted along the way. The simple reproduction is: {code:java} df = spark.createDataFrame([[]]) df1 = (df .withColumn('top', F.struct( F.struct( F.lit('ba').alias('ba') ).alias('b') )) ) df2 = (df .withColumn('top', F.struct( F.struct( F.lit('aa').alias('aa') ).alias('a'), F.struct( F.lit('bb').alias('bb') ).alias('b'), )) ) df1.unionByName(df2, True).printSchema() {code} This results in the exception: {code:java} pyspark.sql.utils.AnalysisException: Union can only be performed on tables with the compatible column types. struct,b:struct> <> struct,b:struct> at the first column of the second table; {code} You can see in the second schema that it has {code:java} b:struct {code} when it should be {code:java} b:struct {code} It seems to happen somewhere during [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala#L73,] as everything seems correct up to that point from my testing. It's either modifying one expression during the transformUp then corrupts other expressions that are then modified, or the ExtractValue before the addFieldsInto is remembering the ordinal position in the struct that is then changing and causing issues. I found that simply using sortStructFields instead of sortStructFieldsInWithFields gets things working correctly, but definitely has a performance impact. The deep expr unionByName test takes ~1-2 seconds normally but ~12-15 seconds with this change. I assume because the original method tried to rewrite existing expressions vs the sortStructFields just adds expressions on top of existing ones to project the new order. I'm not sure if it makes sense to take the slower but works in the edge cases method (assuming it doesn't break other cases, all existing tests pass), or if there's a way to fix the existing method for cases like this. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35213) Corrupt DataFrame for certain withField patterns
Adam Binford created SPARK-35213: Summary: Corrupt DataFrame for certain withField patterns Key: SPARK-35213 URL: https://issues.apache.org/jira/browse/SPARK-35213 Project: Spark Issue Type: Bug Components: Optimizer, SQL Affects Versions: 3.1.1 Reporter: Adam Binford We encountered a very weird bug heavily using withField in production with the Spark 3.1.1. Jobs were dying with a lot of very weird JVM crashing errors (like jshort_disjoint_arraycopy during a copyMemory call), and occasional NegativeArraySize exceptions. We finally found a work around by ordering our withField calls in a certain way, and I was finally able to create some minimal examples to reproduce similar weird/broken behavior. It seems to stem from the optimizations added in [https://github.com/apache/spark/pull/29812.] Because the same new optimization was added as an analyzer, there seems to be two different ways this issue can crop up, once at analysis time and once at runtime. While these examples might seem odd, they represent how we've created a helper function that can create columns in arbitrary nested fields even if the intermediate fields don't exist yet. Example of what I assume is an issue during analysis: {code:java} import pyspark.sql.functions as F df = spark.range(1).withColumn('data', F.struct() .withField('a', F.struct()) .withField('b', F.struct()) .withField('a.aa', F.lit('aa')) .withField('b.ba', F.lit('ba')) .withField('a.ab', F.lit('ab')) .withField('b.bb', F.lit('bb')) .withField('a.ac', F.lit('ac')) ) df.printSchema(){code} Output schema: {code:java} root |-- id: long (nullable = false) |-- data: struct (nullable = false) | |-- b: struct (nullable = false) | | |-- aa: string (nullable = false) | | |-- ab: string (nullable = false) | | |-- bb: string (nullable = false) | |-- a: struct (nullable = false) | | |-- aa: string (nullable = false) | | |-- ab: string (nullable = false) | | |-- ac: string (nullable = false){code} And an example of runtime data issue: {code:java} df = (spark.range(1) .withColumn('data', F.struct() .withField('a', F.struct().withField('aa', F.lit('aa'))) .withField('b', F.struct().withField('ba', F.lit('ba'))) ) .withColumn('data', F.col('data').withField('b.bb', F.lit('bb'))) .withColumn('data', F.col('data').withField('a.ab', F.lit('ab'))) ) df.printSchema() df.groupBy('data.a.aa', 'data.a.ab', 'data.b.ba', 'data.b.bb').count().show() {code} Output: {code:java} root |-- id: long (nullable = false) |-- data: struct (nullable = false) | |-- a: struct (nullable = false) | | |-- aa: string (nullable = false) | | |-- ab: string (nullable = false) | |-- b: struct (nullable = false) | | |-- ba: string (nullable = false) | | |-- bb: string (nullable = false) +---+---+---+---+-+ | aa| ab| ba| bb|count| +---+---+---+---+-+ | ba| bb| aa| ab|1| +---+---+---+---+-+ {code} The columns have the wrong data in them, even though the schema is correct. Additionally, if you add another column you get an exception: {code:java} df = (spark.range(1) .withColumn('data', F.struct() .withField('a', F.struct().withField('aa', F.lit('aa'))) .withField('b', F.struct().withField('ba', F.lit('ba'))) ) .withColumn('data', F.col('data').withField('a.ab', F.lit('ab'))) .withColumn('data', F.col('data').withField('b.bb', F.lit('bb'))) .withColumn('data', F.col('data').withField('a.ac', F.lit('ac'))) ) df.groupBy('data.a.aa', 'data.a.ab', 'data.a.ac', 'data.b.ba', 'data.b.bb').count().show() java.lang.ArrayIndexOutOfBoundsException: 2 at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:201) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) {code} But if you reorder the withField expressions, you get correct behavior: {code:java} df = (spark.range(1) .withColumn('data', F.struct() .withField('a', F.struct().withField('aa', F.lit('aa'))) .withField('b', F.struct().withField('ba', F.lit('ba'))) ) .withColumn('data', F.col('data').withField('a.ab', F.lit('ab'))) .withColumn('data', F.col('data').withField('a.ac', F.lit('ac')))
[jira] [Created] (SPARK-35117) UI progress bar no longer highlights in progress tasks
Adam Binford created SPARK-35117: Summary: UI progress bar no longer highlights in progress tasks Key: SPARK-35117 URL: https://issues.apache.org/jira/browse/SPARK-35117 Project: Spark Issue Type: Bug Components: Web UI Affects Versions: 3.1.1 Reporter: Adam Binford The Spark UI was updated to Bootstrap 4, and during the update the progress bar in the UI was updated to highlight the whole bar once any tasks were in progress, versus highlighting just the number of tasks that were in progress. The was a great visual queue of seeing what percentage of the stage/job was currently being worked on, and it'd be great to get that functionality back. The change can be found here: https://github.com/apache/spark/pull/27370/files#diff-809c93c57cc59e5fe3c3eb54a24aa96a38147d02323f3e690ae6b5309a3284d2L448 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33133) History server fails when loading invalid rolling event logs
[ https://issues.apache.org/jira/browse/SPARK-33133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217656#comment-17217656 ] Adam Binford commented on SPARK-33133: -- Now that the error on load is fixed, the next question is why invalid rolling log directories get created in the first place. I've seen two main types of occurrences of this: * The event log folder is just completely empty. It seems to be some way the an application fails to start up and creates the directory but never makes it to the point of creating the actual file. Not sure the exact circumstances to recreate. * The event log folder for a long running active job gets cleaned up, and the next time the event log is rolled, it recreates the folder automatically (because HDFS creates intermediate directories if they don't exist), with the wrong directory permissions, and without an appstatus file. I'm going to focus on the second case here. I'm not sure if the way the cleanup logic is done that something about the rolling file causes it to not think there are updates. Part of the check involves the "fileSizeForLastIndex", so I'm not sure if a new rolled log file causes issues with this if the log size doesn't strictly increase? I can't fully describe the logic in "shouldReloadLog". Regardless of how it happens, one potential fix is instead of only creating the directory with the correct permissions and the appstatus file in the "start" method, is to make sure both exist every time "rollEventLogFile" is called. > History server fails when loading invalid rolling event logs > > > Key: SPARK-33133 > URL: https://issues.apache.org/jira/browse/SPARK-33133 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Adam Binford >Priority: Major > > We have run into an issue where our history server fails to load new > applications, and when restarted, fails to load any applications at all. This > happens when it encounters invalid rolling event log files. We encounter this > with long running streaming applications. There seems to be two issues here > that lead to problems: > * It looks like our long running streaming applications event log directory > is being cleaned up. The next time the application logs event data, it > recreates the event log directory but without recreating the "appstatus" > file. I don't know the full extent of this behavior or if something "wrong" > is happening here. > * The history server then reads this new folder, and throws an exception > because the "appstatus" file doesn't exist in the rolling event log folder. > This exception breaks the entire listing process, so no new applications will > be read, and if restarted no applications at all will be read. > There seems like a couple ways to go about fixing this, and I'm curious > anyone's thoughts who knows more about how the history server works, > specifically with rolling event logs: > * Don't completely fail checking for new applications if one bad rolling > event log folder is encountered. This seems like the simplest fix and makes > sense to me, it already checks for a few other errors and ignores them. It > doesn't necessarily fix the underlying issue that leads to this happening > though. > * Figure out why the in progress event log folder is being deleted and make > sure that doesn't happen. Maybe this is supposed to happen? Or maybe we don't > want to delete the top level folder and only delete event log files within > the folders? Again I don't know the exact current behavior here with this. > * When writing new event log data, make sure the folder and appstatus file > exist every time, creating them again if not. > Here's the stack trace we encounter when this happens, from 3.0.1 with a > couple extra MRs backported that I hoped would fix the issue: > {{2020-10-13 12:10:31,751 ERROR history.FsHistoryProvider: Exception in > checking for event log updates2020-10-13 12:10:31,751 ERROR > history.FsHistoryProvider: Exception in checking for event log > updatesjava.lang.IllegalArgumentException: requirement failed: Log directory > must contain an appstatus file! at scala.Predef$.require(Predef.scala:281) at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files$lzycompute(EventLogFileReaders.scala:214) > at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files(EventLogFileReaders.scala:211) > at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles$lzycompute(EventLogFileReaders.scala:221) > at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles(EventLogFileReaders.scala:220) > at >
[jira] [Commented] (SPARK-33133) History server fails when loading invalid rolling event logs
[ https://issues.apache.org/jira/browse/SPARK-33133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213832#comment-17213832 ] Adam Binford commented on SPARK-33133: -- I created [https://issues.apache.org/jira/projects/SPARK/issues/SPARK-33146] to track the failing to load issue. Not sure if we want to create a separate issue for them becoming invalid in the first place or just make this issue pertain to that. > History server fails when loading invalid rolling event logs > > > Key: SPARK-33133 > URL: https://issues.apache.org/jira/browse/SPARK-33133 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Adam Binford >Priority: Major > > We have run into an issue where our history server fails to load new > applications, and when restarted, fails to load any applications at all. This > happens when it encounters invalid rolling event log files. We encounter this > with long running streaming applications. There seems to be two issues here > that lead to problems: > * It looks like our long running streaming applications event log directory > is being cleaned up. The next time the application logs event data, it > recreates the event log directory but without recreating the "appstatus" > file. I don't know the full extent of this behavior or if something "wrong" > is happening here. > * The history server then reads this new folder, and throws an exception > because the "appstatus" file doesn't exist in the rolling event log folder. > This exception breaks the entire listing process, so no new applications will > be read, and if restarted no applications at all will be read. > There seems like a couple ways to go about fixing this, and I'm curious > anyone's thoughts who knows more about how the history server works, > specifically with rolling event logs: > * Don't completely fail checking for new applications if one bad rolling > event log folder is encountered. This seems like the simplest fix and makes > sense to me, it already checks for a few other errors and ignores them. It > doesn't necessarily fix the underlying issue that leads to this happening > though. > * Figure out why the in progress event log folder is being deleted and make > sure that doesn't happen. Maybe this is supposed to happen? Or maybe we don't > want to delete the top level folder and only delete event log files within > the folders? Again I don't know the exact current behavior here with this. > * When writing new event log data, make sure the folder and appstatus file > exist every time, creating them again if not. > Here's the stack trace we encounter when this happens, from 3.0.1 with a > couple extra MRs backported that I hoped would fix the issue: > {{2020-10-13 12:10:31,751 ERROR history.FsHistoryProvider: Exception in > checking for event log updates2020-10-13 12:10:31,751 ERROR > history.FsHistoryProvider: Exception in checking for event log > updatesjava.lang.IllegalArgumentException: requirement failed: Log directory > must contain an appstatus file! at scala.Predef$.require(Predef.scala:281) at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files$lzycompute(EventLogFileReaders.scala:214) > at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files(EventLogFileReaders.scala:211) > at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles$lzycompute(EventLogFileReaders.scala:221) > at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles(EventLogFileReaders.scala:220) > at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.lastEventLogFile(EventLogFileReaders.scala:272) > at > org.apache.spark.deploy.history.RollingEventLogFilesFileReader.fileSizeForLastIndex(EventLogFileReaders.scala:240) > at > org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7(FsHistoryProvider.scala:524) > at > org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7$adapted(FsHistoryProvider.scala:466) > at > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:256) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255) at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249) at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at > scala.collection.TraversableLike.filter(TraversableLike.scala:347) at > scala.collection.TraversableLike.filter$(TraversableLike.scala:347) at >
[jira] [Created] (SPARK-33146) Encountering an invalid rolling event log folder prevents loading other applications in SHS
Adam Binford created SPARK-33146: Summary: Encountering an invalid rolling event log folder prevents loading other applications in SHS Key: SPARK-33146 URL: https://issues.apache.org/jira/browse/SPARK-33146 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.0.1 Reporter: Adam Binford A follow-on issue from https://issues.apache.org/jira/browse/SPARK-33133 If an invalid rolling event log folder is encountered by the Spark History Server upon startup, it crashes the whole loading process and prevents any valid applications from loading. We should simply catch the error, log it, and continue loading other applications. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33133) History server fails when loading invalid rolling event logs
[ https://issues.apache.org/jira/browse/SPARK-33133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Binford updated SPARK-33133: - Description: We have run into an issue where our history server fails to load new applications, and when restarted, fails to load any applications at all. This happens when it encounters invalid rolling event log files. We encounter this with long running streaming applications. There seems to be two issues here that lead to problems: * It looks like our long running streaming applications event log directory is being cleaned up. The next time the application logs event data, it recreates the event log directory but without recreating the "appstatus" file. I don't know the full extent of this behavior or if something "wrong" is happening here. * The history server then reads this new folder, and throws an exception because the "appstatus" file doesn't exist in the rolling event log folder. This exception breaks the entire listing process, so no new applications will be read, and if restarted no applications at all will be read. There seems like a couple ways to go about fixing this, and I'm curious anyone's thoughts who knows more about how the history server works, specifically with rolling event logs: * Don't completely fail checking for new applications if one bad rolling event log folder is encountered. This seems like the simplest fix and makes sense to me, it already checks for a few other errors and ignores them. It doesn't necessarily fix the underlying issue that leads to this happening though. * Figure out why the in progress event log folder is being deleted and make sure that doesn't happen. Maybe this is supposed to happen? Or maybe we don't want to delete the top level folder and only delete event log files within the folders? Again I don't know the exact current behavior here with this. * When writing new event log data, make sure the folder and appstatus file exist every time, creating them again if not. Here's the stack trace we encounter when this happens, from 3.0.1 with a couple extra MRs backported that I hoped would fix the issue: {{2020-10-13 12:10:31,751 ERROR history.FsHistoryProvider: Exception in checking for event log updates2020-10-13 12:10:31,751 ERROR history.FsHistoryProvider: Exception in checking for event log updatesjava.lang.IllegalArgumentException: requirement failed: Log directory must contain an appstatus file! at scala.Predef$.require(Predef.scala:281) at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files$lzycompute(EventLogFileReaders.scala:214) at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files(EventLogFileReaders.scala:211) at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles$lzycompute(EventLogFileReaders.scala:221) at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles(EventLogFileReaders.scala:220) at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.lastEventLogFile(EventLogFileReaders.scala:272) at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.fileSizeForLastIndex(EventLogFileReaders.scala:240) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7(FsHistoryProvider.scala:524) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7$adapted(FsHistoryProvider.scala:466) at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:256) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255) at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249) at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at scala.collection.TraversableLike.filter(TraversableLike.scala:347) at scala.collection.TraversableLike.filter$(TraversableLike.scala:347) at scala.collection.AbstractTraversable.filter(Traversable.scala:108) at org.apache.spark.deploy.history.FsHistoryProvider.checkForLogs(FsHistoryProvider.scala:466) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$startPolling$3(FsHistoryProvider.scala:287) at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1302) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$getRunner$1(FsHistoryProvider.scala:210) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at
[jira] [Created] (SPARK-33133) History server fails when loading invalid rolling event logs
Adam Binford created SPARK-33133: Summary: History server fails when loading invalid rolling event logs Key: SPARK-33133 URL: https://issues.apache.org/jira/browse/SPARK-33133 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.0.1 Reporter: Adam Binford We have run into an issue where our history server fails to load new applications, and when restarted, fails to load any applications at all. This happens when it encounters invalid rolling event log files. We encounter this with long running streaming applications. There seems to be two issues here that lead to problems: * It looks like our long running streaming applications event log directory is being cleaned up. The next time the application logs event data, it recreates the event log directory but without recreating the "appstatus" file. I don't know the full extent of this behavior or if something "wrong" is happening here. * The history server then reads this new folder, and throws an exception because the "appstatus" file doesn't exist in the rolling event log folder. This exception breaks the entire listing process, so no new applications will be read, and if restarted no applications at all will be read. There seems like a couple ways to go about fixing this, and I'm curious anyone's thoughts who knows more about how the history server works, specifically with rolling event logs: * Don't completely fail checking for new applications if one bad rolling event log folder is encountered. This seems like the simplest fix and makes sense to me, it already checks for a few other errors and ignores them. * Figure out why the in progress event log folder is being deleted and make sure that doesn't happen. Maybe this is supposed to happen? Or maybe we don't want to delete the top level folder and only delete event log files within the folders? Again I don't know the exact current behavior here with this. * When writing new event log data, make sure the folder and appstatus file exist every time, creating them again if not. Here's the stack trace we encounter when this happens, from 3.0.1 with a couple extra MRs backported that I hoped would fix the issue: {{2020-10-13 12:10:31,751 ERROR history.FsHistoryProvider: Exception in checking for event log updates2020-10-13 12:10:31,751 ERROR history.FsHistoryProvider: Exception in checking for event log updatesjava.lang.IllegalArgumentException: requirement failed: Log directory must contain an appstatus file! at scala.Predef$.require(Predef.scala:281) at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files$lzycompute(EventLogFileReaders.scala:214) at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files(EventLogFileReaders.scala:211) at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles$lzycompute(EventLogFileReaders.scala:221) at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles(EventLogFileReaders.scala:220) at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.lastEventLogFile(EventLogFileReaders.scala:272) at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.fileSizeForLastIndex(EventLogFileReaders.scala:240) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7(FsHistoryProvider.scala:524) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7$adapted(FsHistoryProvider.scala:466) at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:256) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255) at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249) at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at scala.collection.TraversableLike.filter(TraversableLike.scala:347) at scala.collection.TraversableLike.filter$(TraversableLike.scala:347) at scala.collection.AbstractTraversable.filter(Traversable.scala:108) at org.apache.spark.deploy.history.FsHistoryProvider.checkForLogs(FsHistoryProvider.scala:466) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$startPolling$3(FsHistoryProvider.scala:287) at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1302) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$getRunner$1(FsHistoryProvider.scala:210) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at
[jira] [Commented] (SPARK-32835) Add withField to PySpark Column class
[ https://issues.apache.org/jira/browse/SPARK-32835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193216#comment-17193216 ] Adam Binford commented on SPARK-32835: -- I think I already have a working version of this, I'll try to put together a PR soon. > Add withField to PySpark Column class > - > > Key: SPARK-32835 > URL: https://issues.apache.org/jira/browse/SPARK-32835 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 3.1.0 >Reporter: Adam Binford >Priority: Major > > The withField functionality was added to the Scala API, we simply need a > Python wrapper to call it. > Will need to do the same for dropField once it gets added back. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32835) Add withField to PySpark Column class
Adam Binford created SPARK-32835: Summary: Add withField to PySpark Column class Key: SPARK-32835 URL: https://issues.apache.org/jira/browse/SPARK-32835 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 3.1.0 Reporter: Adam Binford The withField functionality was added to the Scala API, we simply need a Python wrapper to call it. Will need to do the same for dropField once it gets added back. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32589) NoSuchElementException: None.get for needsUnsafeRowConversion
Adam Binford created SPARK-32589: Summary: NoSuchElementException: None.get for needsUnsafeRowConversion Key: SPARK-32589 URL: https://issues.apache.org/jira/browse/SPARK-32589 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Adam Binford I have run into an error somewhat non-deterministically where a query fails with {{NoSuchElementException: None.get}} which happens at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L182] getActiveSession apparently is returning None. I only use Pyspark, and I think this is a threading issue, since the active session comes from an InheritableThreadLocal. I encounter this both when I manually use threading to run multiple jobs at the same time, as well as occasionally when I have multiple streams active at the same time. I tried using the PYSPARK_PIN_THREAD flag but it didn't seem to help. For the former case I hacked around it in my manual threading code by doing {{spark._jvm.SparkSession.setActiveSession(spark._jvm.SparkSession.builder().getOrCreate())}} at the start of each new thread, and this sometimes doesn't work reliably either. I see this was mentioned in this [issue|https://issues.apache.org/jira/browse/SPARK-21418?focusedCommentId=16174642=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16174642] I'm not sure if the problem/solution is something to do with Python threads, or adding a default value or some other way of updating this function. One other note is that I started encountering this when using Delta Lake OSS, which reads parquet files as part of the transaction log, which is when this error always happens. It doesn't seem like anything specific to that library though that would be doing something incorrectly that would cause this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31836) input_file_name() gives wrong value following Python UDF usage
[ https://issues.apache.org/jira/browse/SPARK-31836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17120375#comment-17120375 ] Adam Binford commented on SPARK-31836: -- Confirmed also an issue on 2.4.5. Also I could recreate with just two files without streaming, using {code:java} spark.sql.files.openCostInBytes 0{code} to make sure both files ended up on a single partition. The behavior seems to be after a python UDF, all rows in a partition have the input_file_name of the last row in the partition. But that's an assumption based on a tiny test. Doing {code:java} df = (df .withColumn('before', input_file_name()) .withColumn('during', udf(lambda x: x)(input_file_name())) .withColumn('after', input_file_name()) ) {code} before and during are correct, where after is incorrect (all are the last file in the partition) > input_file_name() gives wrong value following Python UDF usage > -- > > Key: SPARK-31836 > URL: https://issues.apache.org/jira/browse/SPARK-31836 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 3.0.0 >Reporter: Wesley Hildebrandt >Priority: Major > > I'm using PySpark for Spark 3.0.0 RC1 with Python 3.6.8. > The following commands demonstrate that the input_file_name() function > sometimes returns the wrong filename following usage of a Python UDF: > $ for i in `seq 5`; do echo $i > /tmp/test-file-$i; done > $ pyspark > >>> import pyspark.sql.functions as F > >>> spark.readStream.text('file:///tmp/test-file-*', > >>> wholetext=True).withColumn('file1', > >>> F.input_file_name()).withColumn('udf', F.udf(lambda > >>> x:x)('value')).withColumn('file2', > >>> F.input_file_name()).writeStream.trigger(once=True).foreachBatch(lambda > >>> df,_: df.select('file1','file2').show(truncate=False, > >>> vertical=True)).start().awaitTermination() > A few notes about this bug: > * It happens with many different files, so it's not related to the file > contents > * It also happens loading files from HDFS, so storage location is not a > factor > * It also happens using .csv() to read the files instead of .text(), so > input format is not a factor > * I have not been able to cause the error without using readStream, so it > seems to be related to streaming > * The bug also happens using spark-submit to send a job to my cluster > * I haven't tested an older version, but it's possible that Spark pulls > 24958 and 25321([https://github.com/apache/spark/pull/24958], > [https://github.com/apache/spark/pull/25321]) to fix issue 28153 > (https://issues.apache.org/jira/browse/SPARK-28153) introduced this bug? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31376) Non-global sort support for structured streaming
[ https://issues.apache.org/jira/browse/SPARK-31376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078219#comment-17078219 ] Adam Binford edited comment on SPARK-31376 at 4/8/20, 12:40 PM: I tried multiple times to add myself to the dev@ mailing list but was unsuccessful, which is why I ended up just posting a Jira ticket. It looks like it finally worked using the subscribe link on the spark community page (subscribing from the mailing list page doesn't seem to work). Taking the discussion there now that it finally actually worked. was (Author: kimahriman): I tried multiple times to add myself to the dev@ mailing list but was unsuccessful, which is why I ended up just posting a Jira ticket. It looks like it finally worked using the subscribe link on the spark community page (subscribing from the mailing list page doesn't seem to work). > Non-global sort support for structured streaming > > > Key: SPARK-31376 > URL: https://issues.apache.org/jira/browse/SPARK-31376 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Adam Binford >Priority: Minor > > Currently, all sorting is disallowed with structured streaming queries. Not > allowing global sorting makes sense, but could non-global sorting (i.e. > sortWithinPartitions) be allowed? I'm running into this with an external > source I'm using, but not sure if this would be useful to file sources as > well. I have to foreachBatch so that I can do a sortWithinPartitions. > Two main questions: > * Does a local sort cause issues with any exactly-once guarantees streaming > queries provides? I can't say I know or understand how these semantics work. > Or are there other issues I can't think of this would cause? > * Is the change as simple as changing the unsupported operations check to > only look for global sorts instead of all sorts? > I have built a version that simply changes the unsupported check to only > disallow global sorts and it seems to be working. Anything I'm missing or is > it this simple? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31376) Non-global sort support for structured streaming
[ https://issues.apache.org/jira/browse/SPARK-31376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078219#comment-17078219 ] Adam Binford commented on SPARK-31376: -- I tried multiple times to add myself to the dev@ mailing list but was unsuccessful, which is why I ended up just posting a Jira ticket. It looks like it finally worked using the subscribe link on the spark community page (subscribing from the mailing list page doesn't seem to work). > Non-global sort support for structured streaming > > > Key: SPARK-31376 > URL: https://issues.apache.org/jira/browse/SPARK-31376 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Adam Binford >Priority: Minor > > Currently, all sorting is disallowed with structured streaming queries. Not > allowing global sorting makes sense, but could non-global sorting (i.e. > sortWithinPartitions) be allowed? I'm running into this with an external > source I'm using, but not sure if this would be useful to file sources as > well. I have to foreachBatch so that I can do a sortWithinPartitions. > Two main questions: > * Does a local sort cause issues with any exactly-once guarantees streaming > queries provides? I can't say I know or understand how these semantics work. > Or are there other issues I can't think of this would cause? > * Is the change as simple as changing the unsupported operations check to > only look for global sorts instead of all sorts? > I have built a version that simply changes the unsupported check to only > disallow global sorts and it seems to be working. Anything I'm missing or is > it this simple? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31376) Non-global sort support for structured streaming
[ https://issues.apache.org/jira/browse/SPARK-31376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077803#comment-17077803 ] Adam Binford commented on SPARK-31376: -- You can't global sort because you can't sort an infinite list. However, regardless of batch vs streaming, a partition is a fairly well defined unit of work I would think. And I'm only speaking in the straightforward file based cases, I don't know if there are other reasons this wouldn't make sense, and I'm also ignoring continuous mode right now. Say you had 5 input files you process as a batch. Your input size results in 8 partitions, resulting in 8 output files. Then let's say you processed those in a streaming fashion. And let's say the first micro batch is the first 3 files, that ends up being 5 partitions, and then your second micro batch is the last 2 files, resulting in 4 partitions. I don't feel like thinking about what math would make that end up being the case, but I'm sure that'd be a possibility. So you would end up with 9 total partitions/output files in that case. You already have a "different result" than you would have achieved via batch processing, and that doesn't even include any kind of sorting within partitions. Add in sorting, and you just get the same different number of partitions, but potentially better optimized for your use case. So it's not that one "makes sense", it's that global sort is not well defined while partition based sorting is. Though in theory you could make "global sort" mean sort within micro-batch and that would be well defined, but I'm not sure what benefits that would provide. One question I have, and this isn't the use case I care about but something that seems useful, can you order by a column in a parquet file without using a foreachBatch? This seems like a very useful well defined optimization for a streaming output. Also, I don't quite understand how repartitioning is valid on a streaming query, I was surprised when that worked. That seems more questionable than local sorting. Again, I don't know if there's non-file based reason this would cause issues, but a file based approach seems straightforward and well-defined. > Non-global sort support for structured streaming > > > Key: SPARK-31376 > URL: https://issues.apache.org/jira/browse/SPARK-31376 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Adam Binford >Priority: Minor > > Currently, all sorting is disallowed with structured streaming queries. Not > allowing global sorting makes sense, but could non-global sorting (i.e. > sortWithinPartitions) be allowed? I'm running into this with an external > source I'm using, but not sure if this would be useful to file sources as > well. I have to foreachBatch so that I can do a sortWithinPartitions. > Two main questions: > * Does a local sort cause issues with any exactly-once guarantees streaming > queries provides? I can't say I know or understand how these semantics work. > Or are there other issues I can't think of this would cause? > * Is the change as simple as changing the unsupported operations check to > only look for global sorts instead of all sorts? > I have built a version that simply changes the unsupported check to only > disallow global sorts and it seems to be working. Anything I'm missing or is > it this simple? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31376) Non-global sort support for structured streaming
Adam Binford created SPARK-31376: Summary: Non-global sort support for structured streaming Key: SPARK-31376 URL: https://issues.apache.org/jira/browse/SPARK-31376 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.0.0 Reporter: Adam Binford Currently, all sorting is disallowed with structured streaming queries. Not allowing global sorting makes sense, but could non-global sorting (i.e. sortWithinPartitions) be allowed? I'm running into this with an external source I'm using, but not sure if this would be useful to file sources as well. I have to foreachBatch so that I can do a sortWithinPartitions. Two main questions: * Does a local sort cause issues with any exactly-once guarantees streaming queries provides? I can't say I know or understand how these semantics work. Or are there other issues I can't think of this would cause? * Is the change as simple as changing the unsupported operations check to only look for global sorts instead of all sorts? I have built a version that simply changes the unsupported check to only disallow global sorts and it seems to be working. Anything I'm missing or is it this simple? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30860) Different behavior between rolling and non-rolling event log
[ https://issues.apache.org/jira/browse/SPARK-30860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17038761#comment-17038761 ] Adam Binford commented on SPARK-30860: -- Yeah I'll see if I can manage to update it. Not the best Java/Scala person but I'm sure I can figure it out. > Different behavior between rolling and non-rolling event log > > > Key: SPARK-30860 > URL: https://issues.apache.org/jira/browse/SPARK-30860 > Project: Spark > Issue Type: Bug > Components: Deploy >Affects Versions: 3.0.0 >Reporter: Adam Binford >Priority: Minor > > When creating a rolling event log, the application directory is created with > a call to FileSystem.mkdirs, with the file permission 770. The default > behavior of HDFS is to set the permission of a file created with > FileSystem.create or FileSystem.mkdirs to (P & ^umask), where P is the > permission in the API call and umask is a system value set by > fs.permissions.umask-mode and defaults to 0022. This means, with default > settings, any mkdirs call can have at most 755 permissions, which causes > rolling event log directories to be created with 750 permissions. This causes > the history server to be unable to prune old applications if they are not run > by the same user running the history server. > This is not a problem for non-rolling logs, because it uses > SparkHadoopUtils.createFile for Hadoop 2 backward compatibility, and then > calls FileSystem.setPermission with 770 after the file has been created. > setPermission doesn't have the umask applied to it, so this works fine. > Obviously this could be fixed by changing fs.permissions.umask-mode, but I'm > not sure the reason that's set in the first place or if this would hurt > anything else. The main issue is there is different behavior between rolling > and non-rolling event logs that might want to be updated in this repo to be > consistent across each. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-30860) Different behavior between rolling and non-rolling event log
Adam Binford created SPARK-30860: Summary: Different behavior between rolling and non-rolling event log Key: SPARK-30860 URL: https://issues.apache.org/jira/browse/SPARK-30860 Project: Spark Issue Type: Bug Components: Deploy Affects Versions: 3.0.0 Reporter: Adam Binford When creating a rolling event log, the application directory is created with a call to FileSystem.mkdirs, with the file permission 770. The default behavior of HDFS is to set the permission of a file created with FileSystem.create or FileSystem.mkdirs to (P & ^umask), where P is the permission in the API call and umask is a system value set by fs.permissions.umask-mode and defaults to 0022. This means, with default settings, any mkdirs call can have at most 755 permissions, which causes rolling event log directories to be created with 750 permissions. This causes the history server to be unable to prune old applications if they are not run by the same user running the history server. This is not a problem for non-rolling logs, because it uses SparkHadoopUtils.createFile for Hadoop 2 backward compatibility, and then calls FileSystem.setPermission with 770 after the file has been created. setPermission doesn't have the umask applied to it, so this works fine. Obviously this could be fixed by changing fs.permissions.umask-mode, but I'm not sure the reason that's set in the first place or if this would hurt anything else. The main issue is there is different behavior between rolling and non-rolling event logs that might want to be updated in this repo to be consistent across each. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org