[jira] [Commented] (SPARK-47952) Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn

2024-05-21 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-47952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848408#comment-17848408
 ] 

Adam Binford commented on SPARK-47952:
--

We have a similar setup, currently using Livy and Zeppelin as the means of 
externally interacting with our Yarn cluster. I'm very curious to see how are 
people are using Spark Connect with Yarn, as that doesn't seem to be a common 
use case yet. I've thought about doing something similar to what you are, 
creating an intermediary management server/API for launching Spark jobs, and 
then reporting back the URL to connect to. Still just a thought in my head 
though, so definitely would be interested in any tidbits or lessons learned you 
encounter on the way

> Support retrieving the real SparkConnectService GRPC address and port 
> programmatically when running on Yarn
> ---
>
> Key: SPARK-47952
> URL: https://issues.apache.org/jira/browse/SPARK-47952
> Project: Spark
>  Issue Type: Story
>  Components: Connect
>Affects Versions: 4.0.0
>Reporter: TakawaAkirayo
>Priority: Minor
>  Labels: pull-request-available
>
> 1.User Story:
> Our data analysts and data scientists use Jupyter notebooks provisioned on 
> Kubernetes (k8s) with limited CPU/memory resources to run Spark-shell/pyspark 
> in the terminal via Yarn Client mode. However, Yarn Client mode consumes 
> significant local memory if the job is heavy, and the total resource pool of 
> k8s for notebooks is limited. To leverage the abundant resources of our 
> Hadoop cluster for scalability purposes, we aim to utilize SparkConnect. This 
> allows the driver on Yarn with SparkConnectService started and uses 
> SparkConnect client to connect to the remote driver.
> To provide a seamless experience with one command startup for both server and 
> client, we've wrapped the following processes in one script:
> 1) Start a local coordinator server (implemented by us, not in this PR) with 
> a specified port.
> 2) Start SparkConnectServer by spark-submit via Yarn Cluster mode with 
> user-input Spark configurations and the local coordinator server's address 
> and port. Append an additional listener class in the configuration for 
> SparkConnectService callback with the actual address and port on Yarn to the 
> coordinator server.
> 3) Wait for the coordinator server to receive the address callback from the 
> SparkConnectService on Yarn and export the real address.
> 4) Start the client (pyspark --remote) with the remote address.
> Finally, a remote SparkConnect Server is started on Yarn with a local 
> SparkConnect client connected. Users no longer need to start the server 
> beforehand and connect to the remote server after they manually explore the 
> address on Yarn.
> 2.Problem statement of this change:
> 1) The specified port for the SparkConnectService GRPC server might be 
> occupied on the node of the Hadoop Cluster. To increase the success rate of 
> startup, it needs to retry on conflicts rather than fail directly.
> 2) Because the final binding port could be uncertain based on #1 and the 
> remote address is unpredictable on Yarn, we need to retrieve the address and 
> port programmatically and inject it automatically on the start of `pyspark 
> --remote`. The SparkConnectService needs to communicate its location back to 
> the launcher side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48314) FileStreamSource shouldn't double cache files for availableNow

2024-05-16 Thread Adam Binford (Jira)
Adam Binford created SPARK-48314:


 Summary: FileStreamSource shouldn't double cache files for 
availableNow
 Key: SPARK-48314
 URL: https://issues.apache.org/jira/browse/SPARK-48314
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.5.1
Reporter: Adam Binford


FileStreamSource loads and saves all files at initialization for 
Trigger.AvailableNow. However files will also be cached in unreadFiles, which 
is a waste and causes issues identified in 
https://issues.apache.org/jira/browse/SPARK-44924 for streams that are reading 
more than 10k files per batch. We should always skip using the unreadFiles 
cache when using available now trigger, as there is no need for it anyway.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33133) History server fails when loading invalid rolling event logs

2023-12-05 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793270#comment-17793270
 ] 

Adam Binford commented on SPARK-33133:
--

I never got around to fixing the issue creating the event logs, only preventing 
the history server from crashing if it encounters a bad log

> History server fails when loading invalid rolling event logs
> 
>
> Key: SPARK-33133
> URL: https://issues.apache.org/jira/browse/SPARK-33133
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Adam Binford
>Priority: Major
>
> We have run into an issue where our history server fails to load new 
> applications, and when restarted, fails to load any applications at all. This 
> happens when it encounters invalid rolling event log files. We encounter this 
> with long running streaming applications. There seems to be two issues here 
> that lead to problems:
>  * It looks like our long running streaming applications event log directory 
> is being cleaned up. The next time the application logs event data, it 
> recreates the event log directory but without recreating the "appstatus" 
> file. I don't know the full extent of this behavior or if something "wrong" 
> is happening here.
>  * The history server then reads this new folder, and throws an exception 
> because the "appstatus" file doesn't exist in the rolling event log folder. 
> This exception breaks the entire listing process, so no new applications will 
> be read, and if restarted no applications at all will be read.
> There seems like a couple ways to go about fixing this, and I'm curious 
> anyone's thoughts who knows more about how the history server works, 
> specifically with rolling event logs:
>  * Don't completely fail checking for new applications if one bad rolling 
> event log folder is encountered. This seems like the simplest fix and makes 
> sense to me, it already checks for a few other errors and ignores them. It 
> doesn't necessarily fix the underlying issue that leads to this happening 
> though.
>  * Figure out why the in progress event log folder is being deleted and make 
> sure that doesn't happen. Maybe this is supposed to happen? Or maybe we don't 
> want to delete the top level folder and only delete event log files within 
> the folders? Again I don't know the exact current behavior here with this.
>  * When writing new event log data, make sure the folder and appstatus file 
> exist every time, creating them again if not.
> Here's the stack trace we encounter when this happens, from 3.0.1 with a 
> couple extra MRs backported that I hoped would fix the issue:
> {{2020-10-13 12:10:31,751 ERROR history.FsHistoryProvider: Exception in 
> checking for event log updates2020-10-13 12:10:31,751 ERROR 
> history.FsHistoryProvider: Exception in checking for event log 
> updatesjava.lang.IllegalArgumentException: requirement failed: Log directory 
> must contain an appstatus file! at scala.Predef$.require(Predef.scala:281) at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files$lzycompute(EventLogFileReaders.scala:214)
>  at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files(EventLogFileReaders.scala:211)
>  at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles$lzycompute(EventLogFileReaders.scala:221)
>  at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles(EventLogFileReaders.scala:220)
>  at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.lastEventLogFile(EventLogFileReaders.scala:272)
>  at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.fileSizeForLastIndex(EventLogFileReaders.scala:240)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7(FsHistoryProvider.scala:524)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7$adapted(FsHistoryProvider.scala:466)
>  at 
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:256)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255) at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249) at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:347) at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:347) at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:108) at 
> 

[jira] [Commented] (SPARK-22876) spark.yarn.am.attemptFailuresValidityInterval does not work correctly

2023-08-19 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756323#comment-17756323
 ] 

Adam Binford commented on SPARK-22876:
--

Attempting to finally add support for this: 
https://github.com/apache/spark/pull/42570

> spark.yarn.am.attemptFailuresValidityInterval does not work correctly
> -
>
> Key: SPARK-22876
> URL: https://issues.apache.org/jira/browse/SPARK-22876
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, YARN
>Affects Versions: 2.2.0
> Environment: hadoop version 2.7.3
>Reporter: Jinhan Zhong
>Priority: Minor
>  Labels: bulk-closed
>
> I assume we can use spark.yarn.maxAppAttempts together with 
> spark.yarn.am.attemptFailuresValidityInterval to make a long running 
> application avoid stopping  after acceptable number of failures.
> But after testing, I found that the application always stops after failing n 
> times ( n is minimum value of spark.yarn.maxAppAttempts and 
> yarn.resourcemanager.am.max-attempts from client yarn-site.xml)
> for example, following setup will allow the application master to fail 20 
> times.
> * spark.yarn.am.attemptFailuresValidityInterval=1s
> * spark.yarn.maxAppAttempts=20
> * yarn client: yarn.resourcemanager.am.max-attempts=20
> * yarn resource manager: yarn.resourcemanager.am.max-attempts=3
> And after checking the source code, I found in source file 
> ApplicationMaster.scala 
> https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L293
> there's a ShutdownHook that checks the attempt id against the maxAppAttempts, 
> if attempt id >= maxAppAttempts, it will try to unregister the application 
> and the application will finish.
> is this a expected design or a bug?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-22876) spark.yarn.am.attemptFailuresValidityInterval does not work correctly

2023-08-19 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-22876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford reopened SPARK-22876:
--

> spark.yarn.am.attemptFailuresValidityInterval does not work correctly
> -
>
> Key: SPARK-22876
> URL: https://issues.apache.org/jira/browse/SPARK-22876
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, YARN
>Affects Versions: 2.2.0
> Environment: hadoop version 2.7.3
>Reporter: Jinhan Zhong
>Priority: Minor
>  Labels: bulk-closed
>
> I assume we can use spark.yarn.maxAppAttempts together with 
> spark.yarn.am.attemptFailuresValidityInterval to make a long running 
> application avoid stopping  after acceptable number of failures.
> But after testing, I found that the application always stops after failing n 
> times ( n is minimum value of spark.yarn.maxAppAttempts and 
> yarn.resourcemanager.am.max-attempts from client yarn-site.xml)
> for example, following setup will allow the application master to fail 20 
> times.
> * spark.yarn.am.attemptFailuresValidityInterval=1s
> * spark.yarn.maxAppAttempts=20
> * yarn client: yarn.resourcemanager.am.max-attempts=20
> * yarn resource manager: yarn.resourcemanager.am.max-attempts=3
> And after checking the source code, I found in source file 
> ApplicationMaster.scala 
> https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L293
> there's a ShutdownHook that checks the attempt id against the maxAppAttempts, 
> if attempt id >= maxAppAttempts, it will try to unregister the application 
> and the application will finish.
> is this a expected design or a bug?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44639) Add option to use Java tmp dir for RocksDB state store

2023-08-02 Thread Adam Binford (Jira)
Adam Binford created SPARK-44639:


 Summary: Add option to use Java tmp dir for RocksDB state store
 Key: SPARK-44639
 URL: https://issues.apache.org/jira/browse/SPARK-44639
 Project: Spark
  Issue Type: Improvement
  Components: SQL, Structured Streaming
Affects Versions: 3.4.1
Reporter: Adam Binford


Currently local RocksDB state is stored in a local directory given by 
Utils.getLocalDir. On yarn this is a directory created inside the root 
application folder such as

{{/tmp/nm-local-dir/usercache//appcache//}}

The problem with this is that if an executor crashes for some reason (like OOM) 
and the shutdown hooks don't get run, this directory will stay around forever 
until the application finishes, which can cause jobs to slowly accumulate more 
and more temporary space until finally the node manager goes unhealthy.

Because this data will only ever be accessed by the executor that created this 
directory, it would make sense to store the data inside the container folder, 
which will always get cleaned up by the node manager when that yarn container 
gets cleaned up. Yarn sets the `java.io.tmpdir` to a path inside this 
directory, such as

{{/tmp/nm-local-dir/usercache//appcache///tmp/}}

I'm not sure the behavior for other resource managers, so this could be an 
opt-in config that can be specified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43951) RocksDB state store can become corrupt on task retries

2023-06-02 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford resolved SPARK-43951.
--
Resolution: Fixed

> RocksDB state store can become corrupt on task retries
> --
>
> Key: SPARK-43951
> URL: https://issues.apache.org/jira/browse/SPARK-43951
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Adam Binford
>Priority: Major
>
> A couple of our streaming jobs have failed since upgrading to Spark 3.4 with 
> an error such as:
> org.rocksdb.RocksDBException: Mismatch in unique ID on table file ###. 
> Expected: [###,###} Actual\{###,###} in file /MANIFEST-
> This is due to the change from 
> [https://github.com/facebook/rocksdb/commit/6de7081cf37169989e289a4801187097f0c50fae]
>  that enabled unique ID checks by default, and I finally tracked down the 
> exact sequence of steps that leads to this failure in the way RocksDB state 
> store is used.
>  # A task fails after uploading the checkpoint to HDFS. Lets say it uploaded 
> 11.zip to version 11 of the table, but the task failed before it could finish 
> after successfully uploading the checkpoint.
>  # The same task is retried and goes back to load version 10 of the table as 
> expected.
>  # Cleanup/maintenance is called for this partition, which looks in HDFS for 
> persisted versions and sees up through version 11 since that zip file was 
> successfully uploaded on the previous task.
>  # As part of resolving what SST files are part of each table version, 
> versionToRocksDBFiles.put(version, newResolvedFiles) is called for version 11 
> with its SST files that were uploaded in the first failed task.
>  # The second attempt at the task commits and goes to sync its checkpoint to 
> HDFS.
>  # versionToRocksDBFiles contains the SST files to upload from step 4, and 
> these files are considered "the same" as what's in the local working dir 
> because the name and file size match.
>  # No SST files are uploaded because they matched above, but in reality the 
> unique ID inside the SST files is different (presumably this is just randomly 
> generated and inserted into each SST file?), it just doesn't affect the size.
>  # A new METADATA file is uploaded which has the new unique IDs listed inside.
>  # When version 11 of the table is read during the next batch, the unique IDs 
> in the METADATA file don't match the unique IDS in the SST files, which 
> causes the exception.
>  
> This is basically a ticking time bomb for anyone using RocksDB. Thoughts on 
> possible fixes would be:
>  * Disable unique ID verification. I don't currently see a binding for this 
> in the RocksDB java wrapper, so that would probably have to be added first.
>  * Disable checking if files are already uploaded with the same size, and 
> just always upload SST files no matter what.
>  * Update the "same file" check to also be able to do some kind of CRC 
> comparison or something like that.
>  * Update the mainteance/cleanup to not update the versionToRocksDBFiles map.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-43951) RocksDB state store can become corrupt on task retries

2023-06-02 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728736#comment-17728736
 ] 

Adam Binford commented on SPARK-43951:
--

Of course as soon as I finish figuring all this out I found 
https://github.com/apache/spark/pull/41089

> RocksDB state store can become corrupt on task retries
> --
>
> Key: SPARK-43951
> URL: https://issues.apache.org/jira/browse/SPARK-43951
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Adam Binford
>Priority: Major
>
> A couple of our streaming jobs have failed since upgrading to Spark 3.4 with 
> an error such as:
> org.rocksdb.RocksDBException: Mismatch in unique ID on table file ###. 
> Expected: [###,###} Actual\{###,###} in file /MANIFEST-
> This is due to the change from 
> [https://github.com/facebook/rocksdb/commit/6de7081cf37169989e289a4801187097f0c50fae]
>  that enabled unique ID checks by default, and I finally tracked down the 
> exact sequence of steps that leads to this failure in the way RocksDB state 
> store is used.
>  # A task fails after uploading the checkpoint to HDFS. Lets say it uploaded 
> 11.zip to version 11 of the table, but the task failed before it could finish 
> after successfully uploading the checkpoint.
>  # The same task is retried and goes back to load version 10 of the table as 
> expected.
>  # Cleanup/maintenance is called for this partition, which looks in HDFS for 
> persisted versions and sees up through version 11 since that zip file was 
> successfully uploaded on the previous task.
>  # As part of resolving what SST files are part of each table version, 
> versionToRocksDBFiles.put(version, newResolvedFiles) is called for version 11 
> with its SST files that were uploaded in the first failed task.
>  # The second attempt at the task commits and goes to sync its checkpoint to 
> HDFS.
>  # versionToRocksDBFiles contains the SST files to upload from step 4, and 
> these files are considered "the same" as what's in the local working dir 
> because the name and file size match.
>  # No SST files are uploaded because they matched above, but in reality the 
> unique ID inside the SST files is different (presumably this is just randomly 
> generated and inserted into each SST file?), it just doesn't affect the size.
>  # A new METADATA file is uploaded which has the new unique IDs listed inside.
>  # When version 11 of the table is read during the next batch, the unique IDs 
> in the METADATA file don't match the unique IDS in the SST files, which 
> causes the exception.
>  
> This is basically a ticking time bomb for anyone using RocksDB. Thoughts on 
> possible fixes would be:
>  * Disable unique ID verification. I don't currently see a binding for this 
> in the RocksDB java wrapper, so that would probably have to be added first.
>  * Disable checking if files are already uploaded with the same size, and 
> just always upload SST files no matter what.
>  * Update the "same file" check to also be able to do some kind of CRC 
> comparison or something like that.
>  * Update the mainteance/cleanup to not update the versionToRocksDBFiles map.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43951) RocksDB state store can become corrupt on task retries

2023-06-02 Thread Adam Binford (Jira)
Adam Binford created SPARK-43951:


 Summary: RocksDB state store can become corrupt on task retries
 Key: SPARK-43951
 URL: https://issues.apache.org/jira/browse/SPARK-43951
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.4.0
Reporter: Adam Binford


A couple of our streaming jobs have failed since upgrading to Spark 3.4 with an 
error such as:

org.rocksdb.RocksDBException: Mismatch in unique ID on table file ###. 
Expected: [###,###} Actual\{###,###} in file /MANIFEST-

This is due to the change from 
[https://github.com/facebook/rocksdb/commit/6de7081cf37169989e289a4801187097f0c50fae]
 that enabled unique ID checks by default, and I finally tracked down the exact 
sequence of steps that leads to this failure in the way RocksDB state store is 
used.
 # A task fails after uploading the checkpoint to HDFS. Lets say it uploaded 
11.zip to version 11 of the table, but the task failed before it could finish 
after successfully uploading the checkpoint.
 # The same task is retried and goes back to load version 10 of the table as 
expected.
 # Cleanup/maintenance is called for this partition, which looks in HDFS for 
persisted versions and sees up through version 11 since that zip file was 
successfully uploaded on the previous task.
 # As part of resolving what SST files are part of each table version, 
versionToRocksDBFiles.put(version, newResolvedFiles) is called for version 11 
with its SST files that were uploaded in the first failed task.
 # The second attempt at the task commits and goes to sync its checkpoint to 
HDFS.
 # versionToRocksDBFiles contains the SST files to upload from step 4, and 
these files are considered "the same" as what's in the local working dir 
because the name and file size match.
 # No SST files are uploaded because they matched above, but in reality the 
unique ID inside the SST files is different (presumably this is just randomly 
generated and inserted into each SST file?), it just doesn't affect the size.
 # A new METADATA file is uploaded which has the new unique IDs listed inside.
 # When version 11 of the table is read during the next batch, the unique IDs 
in the METADATA file don't match the unique IDS in the SST files, which causes 
the exception.

 

This is basically a ticking time bomb for anyone using RocksDB. Thoughts on 
possible fixes would be:
 * Disable unique ID verification. I don't currently see a binding for this in 
the RocksDB java wrapper, so that would probably have to be added first.
 * Disable checking if files are already uploaded with the same size, and just 
always upload SST files no matter what.
 * Update the "same file" check to also be able to do some kind of CRC 
comparison or something like that.
 * Update the mainteance/cleanup to not update the versionToRocksDBFiles map.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43802) unbase64 and unhex codegen are invalid with failOnError

2023-05-25 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-43802:
-
Description: 
to_binary with hex and base64 generate invalid codegen:

{{spark.range(5).selectExpr('to_binary(base64(cast(id as binary)), 
"BASE64")').show()}}

results in

{{Caused by: org.codehaus.commons.compiler.CompileException: File 
'generated.java', Line 47, Column 1: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 47, 
Column 1: Unknown variable or type "BASE64"}}

because this is the generated code:



/* 107 */         if 
(!org.apache.spark.sql.catalyst.expressions.UnBase64.isValidBase64(project_value_1))
 {

/* 108 */           throw QueryExecutionErrors.invalidInputInConversionError(

/* 109 */             ((org.apache.spark.sql.types.BinaryType$) references[1] 
/* to */),

/* 110 */             project_value_1,

/* 111 */             BASE64,

/* 112 */             "try_to_binary");

/* 113 */         }

  was:
to_binary with hex and base64 generate invalid codegen:

{{spark.range(5).selectExpr('to_binary(base64(cast(id as binary)), 
"BASE64")').show()}}

results in

{{Caused by: org.codehaus.commons.compiler.CompileException: File 
'generated.java', Line 47, Column 1: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 47, 
Column 1: Unknown variable or type "BASE64"}}

because this is the generated code:

{{ }}{{{}/* 107 */        if 
(!org.apache.spark.sql.catalyst.expressions.UnBase64.isValidBase64(project_value_1))
 {{}}}{{{}/* 108 */           throw 
QueryExecutionErrors.invalidInputInConversionError({}}}{{{}/* 109 */            
 ((org.apache.spark.sql.types.BinaryType$) references[1] /* to */),{}}}{{{}/* 
110 */             project_value_1,{}}}{{{}/* 111 */             
BASE64,{}}}{{{}/* 112 */             "try_to_binary");{}}}{{{}/* 113 */         
}{}}}


> unbase64 and unhex codegen are invalid with failOnError
> ---
>
> Key: SPARK-43802
> URL: https://issues.apache.org/jira/browse/SPARK-43802
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Adam Binford
>Priority: Major
>
> to_binary with hex and base64 generate invalid codegen:
> {{spark.range(5).selectExpr('to_binary(base64(cast(id as binary)), 
> "BASE64")').show()}}
> results in
> {{Caused by: org.codehaus.commons.compiler.CompileException: File 
> 'generated.java', Line 47, Column 1: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 47, Column 1: Unknown variable or type "BASE64"}}
> because this is the generated code:
> /* 107 */         if 
> (!org.apache.spark.sql.catalyst.expressions.UnBase64.isValidBase64(project_value_1))
>  {
> /* 108 */           throw QueryExecutionErrors.invalidInputInConversionError(
> /* 109 */             ((org.apache.spark.sql.types.BinaryType$) references[1] 
> /* to */),
> /* 110 */             project_value_1,
> /* 111 */             BASE64,
> /* 112 */             "try_to_binary");
> /* 113 */         }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43802) unbase64 and unhex codegen are invalid with failOnError

2023-05-25 Thread Adam Binford (Jira)
Adam Binford created SPARK-43802:


 Summary: unbase64 and unhex codegen are invalid with failOnError
 Key: SPARK-43802
 URL: https://issues.apache.org/jira/browse/SPARK-43802
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 3.4.0
Reporter: Adam Binford


to_binary with hex and base64 generate invalid codegen:

{{spark.range(5).selectExpr('to_binary(base64(cast(id as binary)), 
"BASE64")').show()}}

results in

{{Caused by: org.codehaus.commons.compiler.CompileException: File 
'generated.java', Line 47, Column 1: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 47, 
Column 1: Unknown variable or type "BASE64"}}

because this is the generated code:

{{ }}{{{}/* 107 */        if 
(!org.apache.spark.sql.catalyst.expressions.UnBase64.isValidBase64(project_value_1))
 {{}}}{{{}/* 108 */           throw 
QueryExecutionErrors.invalidInputInConversionError({}}}{{{}/* 109 */            
 ((org.apache.spark.sql.types.BinaryType$) references[1] /* to */),{}}}{{{}/* 
110 */             project_value_1,{}}}{{{}/* 111 */             
BASE64,{}}}{{{}/* 112 */             "try_to_binary");{}}}{{{}/* 113 */         
}{}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory

2023-04-29 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford resolved SPARK-43244.
--
Resolution: Duplicate

Will be addressed by https://issues.apache.org/jira/browse/SPARK-43311

> RocksDB State Store can accumulate unbounded native memory
> --
>
> Key: SPARK-43244
> URL: https://issues.apache.org/jira/browse/SPARK-43244
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.3.2
>Reporter: Adam Binford
>Priority: Major
>
> We noticed in one of our production stateful streaming jobs using RocksDB 
> that an executor with 20g of heap was using around 40g of resident memory. I 
> noticed a single RocksDB instance was using around 150 MiB of memory, and 
> only 5 MiB or so of this was from the write batch (which is now cleared after 
> committing).
> After reading about RocksDB memory usage (this link was helpful: 
> [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
>  I realized a lot of this was likely the "Index and Filters" memory usage. 
> This job is doing a streaming duplicate with a lot of unique keys so it makes 
> sense these block usages would be high. The problem is that, because as it is 
> now the underlying RocksDB instance stays open on an executor as long as that 
> executor is assigned that stateful partition (to be reused across batches). 
> So a single executor can accumulate a large number of RocksDB instances open 
> at once, each using a certain amount of native memory. In the worst case, a 
> single executor could need to keep open every single partitions' RocksDB 
> instance at once. 
> There are a couple ways you can control the amount of memory used, such as 
> limiting the max open files, or adding the option to use the block cache for 
> the indices and filters, but neither of these solve the underlying problem of 
> accumulating native memory from multiple partitions on an executor.
> The real fix needs to be a mechanism and option to close the underlying 
> RocksDB instance at the end of each task, so you have the option to only ever 
> have one RocksDB instance open at a time, thus having predictable memory 
> usage no matter the size of your data or number of shuffle partitions. 
> We are running this on Spark 3.3, but just kicked off a test to see if things 
> are any different in Spark 3.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory

2023-04-27 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717394#comment-17717394
 ] 

Adam Binford edited comment on SPARK-43244 at 4/27/23 11:08 PM:


Yeah I just saw that PR and was looking through that, I can see that making 
sense. The memory usage I was seeing that I couldn't do anything about was the 
filter/index cache. Trying to re-use the LRUCache across all DB instances is 
interesting and does seem like that would solve the issue. Do you know if that 
definitely actually reuses the same underlying cache, and isn't just a config 
that tells the native RocksDB instance how big to make the cache?

Edit: does appear to be the actual cache, so I think that would solve my issues


was (Author: kimahriman):
Yeah I just saw that PR and was looking through that, I can see that making 
sense. The memory usage I was seeing that I couldn't do anything about was the 
filter/index cache. Trying to re-use the LRUCache across all DB instances is 
interesting and does seem like that would solve the issue. Do you know if that 
definitely actually reuses the same underlying cache, and isn't just a config 
that tells the native RocksDB instance how big to make the cache?

> RocksDB State Store can accumulate unbounded native memory
> --
>
> Key: SPARK-43244
> URL: https://issues.apache.org/jira/browse/SPARK-43244
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.3.2
>Reporter: Adam Binford
>Priority: Major
>
> We noticed in one of our production stateful streaming jobs using RocksDB 
> that an executor with 20g of heap was using around 40g of resident memory. I 
> noticed a single RocksDB instance was using around 150 MiB of memory, and 
> only 5 MiB or so of this was from the write batch (which is now cleared after 
> committing).
> After reading about RocksDB memory usage (this link was helpful: 
> [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
>  I realized a lot of this was likely the "Index and Filters" memory usage. 
> This job is doing a streaming duplicate with a lot of unique keys so it makes 
> sense these block usages would be high. The problem is that, because as it is 
> now the underlying RocksDB instance stays open on an executor as long as that 
> executor is assigned that stateful partition (to be reused across batches). 
> So a single executor can accumulate a large number of RocksDB instances open 
> at once, each using a certain amount of native memory. In the worst case, a 
> single executor could need to keep open every single partitions' RocksDB 
> instance at once. 
> There are a couple ways you can control the amount of memory used, such as 
> limiting the max open files, or adding the option to use the block cache for 
> the indices and filters, but neither of these solve the underlying problem of 
> accumulating native memory from multiple partitions on an executor.
> The real fix needs to be a mechanism and option to close the underlying 
> RocksDB instance at the end of each task, so you have the option to only ever 
> have one RocksDB instance open at a time, thus having predictable memory 
> usage no matter the size of your data or number of shuffle partitions. 
> We are running this on Spark 3.3, but just kicked off a test to see if things 
> are any different in Spark 3.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory

2023-04-27 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717394#comment-17717394
 ] 

Adam Binford commented on SPARK-43244:
--

Yeah I just saw that PR and was looking through that, I can see that making 
sense. The memory usage I was seeing that I couldn't do anything about was the 
filter/index cache. Trying to re-use the LRUCache across all DB instances is 
interesting and does seem like that would solve the issue. Do you know if that 
definitely actually reuses the same underlying cache, and isn't just a config 
that tells the native RocksDB instance how big to make the cache?

> RocksDB State Store can accumulate unbounded native memory
> --
>
> Key: SPARK-43244
> URL: https://issues.apache.org/jira/browse/SPARK-43244
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.3.2
>Reporter: Adam Binford
>Priority: Major
>
> We noticed in one of our production stateful streaming jobs using RocksDB 
> that an executor with 20g of heap was using around 40g of resident memory. I 
> noticed a single RocksDB instance was using around 150 MiB of memory, and 
> only 5 MiB or so of this was from the write batch (which is now cleared after 
> committing).
> After reading about RocksDB memory usage (this link was helpful: 
> [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
>  I realized a lot of this was likely the "Index and Filters" memory usage. 
> This job is doing a streaming duplicate with a lot of unique keys so it makes 
> sense these block usages would be high. The problem is that, because as it is 
> now the underlying RocksDB instance stays open on an executor as long as that 
> executor is assigned that stateful partition (to be reused across batches). 
> So a single executor can accumulate a large number of RocksDB instances open 
> at once, each using a certain amount of native memory. In the worst case, a 
> single executor could need to keep open every single partitions' RocksDB 
> instance at once. 
> There are a couple ways you can control the amount of memory used, such as 
> limiting the max open files, or adding the option to use the block cache for 
> the indices and filters, but neither of these solve the underlying problem of 
> accumulating native memory from multiple partitions on an executor.
> The real fix needs to be a mechanism and option to close the underlying 
> RocksDB instance at the end of each task, so you have the option to only ever 
> have one RocksDB instance open at a time, thus having predictable memory 
> usage no matter the size of your data or number of shuffle partitions. 
> We are running this on Spark 3.3, but just kicked off a test to see if things 
> are any different in Spark 3.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory

2023-04-23 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715525#comment-17715525
 ] 

Adam Binford commented on SPARK-43244:
--

Read through [https://github.com/apache/spark/pull/30344] for all the comments 
about close/abort. Adding a new method (like close) in addition to abort seems 
like the only real option here?

> RocksDB State Store can accumulate unbounded native memory
> --
>
> Key: SPARK-43244
> URL: https://issues.apache.org/jira/browse/SPARK-43244
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.3.2
>Reporter: Adam Binford
>Priority: Major
>
> We noticed in one of our production stateful streaming jobs using RocksDB 
> that an executor with 20g of heap was using around 40g of resident memory. I 
> noticed a single RocksDB instance was using around 150 MiB of memory, and 
> only 5 MiB or so of this was from the write batch (which is now cleared after 
> committing).
> After reading about RocksDB memory usage (this link was helpful: 
> [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
>  I realized a lot of this was likely the "Index and Filters" memory usage. 
> This job is doing a streaming duplicate with a lot of unique keys so it makes 
> sense these block usages would be high. The problem is that, because as it is 
> now the underlying RocksDB instance stays open on an executor as long as that 
> executor is assigned that stateful partition (to be reused across batches). 
> So a single executor can accumulate a large number of RocksDB instances open 
> at once, each using a certain amount of native memory. In the worst case, a 
> single executor could need to keep open every single partitions' RocksDB 
> instance at once. 
> There are a couple ways you can control the amount of memory used, such as 
> limiting the max open files, or adding the option to use the block cache for 
> the indices and filters, but neither of these solve the underlying problem of 
> accumulating native memory from multiple partitions on an executor.
> The real fix needs to be a mechanism and option to close the underlying 
> RocksDB instance at the end of each task, so you have the option to only ever 
> have one RocksDB instance open at a time, thus having predictable memory 
> usage no matter the size of your data or number of shuffle partitions. 
> We are running this on Spark 3.3, but just kicked off a test to see if things 
> are any different in Spark 3.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory

2023-04-23 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-43244:
-
Description: 
We noticed in one of our production stateful streaming jobs using RocksDB that 
an executor with 20g of heap was using around 40g of resident memory. I noticed 
a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or 
so of this was from the write batch (which is now cleared after committing).

After reading about RocksDB memory usage (this link was helpful: 
[https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
 I realized a lot of this was likely the "Index and Filters" memory usage. This 
job is doing a streaming duplicate with a lot of unique keys so it makes sense 
these block usages would be high. The problem is that, because as it is now the 
underlying RocksDB instance stays open on an executor as long as that executor 
is assigned that stateful partition (to be reused across batches). So a single 
executor can accumulate a large number of RocksDB instances open at once, each 
using a certain amount of native memory. In the worst case, a single executor 
could need to keep open every single partitions' RocksDB instance at once. 

There are a couple ways you can control the amount of memory used, such as 
limiting the max open files, or adding the option to use the block cache for 
the indices and filters, but neither of these solve the underlying problem of 
accumulating native memory from multiple partitions on an executor.

The real fix needs to be a mechanism and option to close the underlying RocksDB 
instance at the end of each task, so you have the option to only ever have one 
RocksDB instance open at a time, thus having predictable memory usage no matter 
the size of your data or number of shuffle partitions. 

We are running this on Spark 3.3, but just kicked off a test to see if things 
are any different in Spark 3.4.

  was:
We noticed in one of our production stateful streaming jobs using RocksDB that 
an executor with 20g of heap was using around 40g of resident memory. I noticed 
a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or 
so of this was from the write batch (which is now cleared after committing).

After reading about RocksDB memory usage (this link was helpful: 
[https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
 I realized a lot of this was likely the "Index and Filters" memory usage. This 
job is doing a streaming duplicate with a lot of unique values so it makes 
sense these block usages would be high. The problem is that, because as it is 
now the underlying RocksDB instance stays open on an executor as long as that 
executor is assigned that stateful partition (to be reused across batches). So 
a single executor can accumulate a large number of RocksDB instances open at 
once, each using a certain amount of native memory. In the worst case, a single 
executor could need to keep open every single partitions' RocksDB instance at 
once. 

There are a couple ways you can control the amount of memory used, such as 
limiting the max open files, or adding the option to use the block cache for 
the indices and filters, but neither of these solve the underlying problem of 
accumulating native memory from multiple partitions on an executor.

The real fix needs to be a mechanism and option to close the underlying RocksDB 
instance at the end of each task, so you have the option to only ever have one 
RocksDB instance open at a time, thus having predictable memory usage no matter 
the size of your data or number of shuffle partitions. 

We are running this on Spark 3.3, but just kicked off a test to see if things 
are any different in Spark 3.4.


> RocksDB State Store can accumulate unbounded native memory
> --
>
> Key: SPARK-43244
> URL: https://issues.apache.org/jira/browse/SPARK-43244
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.3.2
>Reporter: Adam Binford
>Priority: Major
>
> We noticed in one of our production stateful streaming jobs using RocksDB 
> that an executor with 20g of heap was using around 40g of resident memory. I 
> noticed a single RocksDB instance was using around 150 MiB of memory, and 
> only 5 MiB or so of this was from the write batch (which is now cleared after 
> committing).
> After reading about RocksDB memory usage (this link was helpful: 
> [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
>  I realized a lot of this was likely the "Index and Filters" memory usage. 
> This job is doing a streaming duplicate with a lot of unique keys so it makes 
> sense these block usages would be high. The 

[jira] [Updated] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory

2023-04-23 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-43244:
-
Description: 
We noticed in one of our production stateful streaming jobs using RocksDB that 
an executor with 20g of heap was using around 40g of resident memory. I noticed 
a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or 
so of this was from the write batch (which is now cleared after committing).

After reading about RocksDB memory usage (this link was helpful: 
[https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
 I realized a lot of this was likely the "Index and Filters" memory usage. This 
job is doing a streaming duplicate with a lot of unique values so it makes 
sense these block usages would be high. The problem is that, because as it is 
now the underlying RocksDB instance stays open on an executor as long as that 
executor is assigned that stateful partition (to be reused across batches). So 
a single executor can accumulate a large number of RocksDB instances open at 
once, each using a certain amount of native memory. In the worst case, a single 
executor could need to keep open every single partitions' RocksDB instance at 
once. 

There are a couple ways you can control the amount of memory used, such as 
limiting the max open files, or adding the option to use the block cache for 
the indices and filters, but neither of these solve the underlying problem of 
accumulating native memory from multiple partitions on an executor.

The real fix needs to be a mechanism and option to close the underlying RocksDB 
instance at the end of each task, so you have the option to only ever have one 
RocksDB instance open at a time, thus having predictable memory usage no matter 
the size of your data or number of shuffle partitions. 

We are running this on Spark 3.3, but just kicked off a test to see if things 
are any different in Spark 3.4.

  was:
We noticed in one of our production stateful streaming jobs using RocksDB that 
an executor with 20g of heap was using around 40g of resident memory. I noticed 
a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or 
so of this was from the write batch (which is now cleared after committing).

After reading about RocksDB memory usage ([this link was 
helpful]([https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
 I realized a lot of this was likely the "Index and Filters" memory usage. This 
job is doing a streaming duplicate with a lot of unique values so it makes 
sense these block usages would be high. The problem is that, because as it is 
now the underlying RocksDB instance stays open on an executor as long as that 
executor is assigned that stateful partition (to be reused across batches). So 
a single executor can accumulate a large number of RocksDB instances open at 
once, each using a certain amount of native memory. In the worst case, a single 
executor could need to keep open every single partitions' RocksDB instance at 
once. 

There are a couple ways you can control the amount of memory used, such as 
limiting the max open files, or adding the option to use the block cache for 
the indices and filters, but neither of these solve the underlying problem of 
accumulating native memory from multiple partitions on an executor.

The real fix needs to be a mechanism and option to close the underlying RocksDB 
instance at the end of each task, so you have the option to only ever have one 
RocksDB instance open at a time, thus having predictable memory usage no matter 
the size of your data or number of shuffle partitions. 

We are running this on Spark 3.3, but just kicked off a test to see if things 
are any different in Spark 3.4.


> RocksDB State Store can accumulate unbounded native memory
> --
>
> Key: SPARK-43244
> URL: https://issues.apache.org/jira/browse/SPARK-43244
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.3.2
>Reporter: Adam Binford
>Priority: Major
>
> We noticed in one of our production stateful streaming jobs using RocksDB 
> that an executor with 20g of heap was using around 40g of resident memory. I 
> noticed a single RocksDB instance was using around 150 MiB of memory, and 
> only 5 MiB or so of this was from the write batch (which is now cleared after 
> committing).
> After reading about RocksDB memory usage (this link was helpful: 
> [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
>  I realized a lot of this was likely the "Index and Filters" memory usage. 
> This job is doing a streaming duplicate with a lot of unique values so it 
> makes sense these block usages would be high. 

[jira] [Updated] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory

2023-04-23 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-43244:
-
Description: 
We noticed in one of our production stateful streaming jobs using RocksDB that 
an executor with 20g of heap was using around 40g of resident memory. I noticed 
a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or 
so of this was from the write batch (which is now cleared after committing).

After reading about RocksDB memory usage ([this link was 
helpful]([https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
 I realized a lot of this was likely the "Index and Filters" memory usage. This 
job is doing a streaming duplicate with a lot of unique values so it makes 
sense these block usages would be high. The problem is that, because as it is 
now the underlying RocksDB instance stays open on an executor as long as that 
executor is assigned that stateful partition (to be reused across batches). So 
a single executor can accumulate a large number of RocksDB instances open at 
once, each using a certain amount of native memory. In the worst case, a single 
executor could need to keep open every single partitions' RocksDB instance at 
once. 

There are a couple ways you can control the amount of memory used, such as 
limiting the max open files, or adding the option to use the block cache for 
the indices and filters, but neither of these solve the underlying problem of 
accumulating native memory from multiple partitions on an executor.

The real fix needs to be a mechanism and option to close the underlying RocksDB 
instance at the end of each task, so you have the option to only ever have one 
RocksDB instance open at a time, thus having predictable memory usage no matter 
the size of your data or number of shuffle partitions. 

We are running this on Spark 3.3, but just kicked off a test to see if things 
are any different in Spark 3.4.

  was:
We noticed in one of our production stateful streaming jobs using RocksDB that 
an executor with 20g of heap was using around 40g of resident memory. I noticed 
a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or 
so of this was from the write batch (which is now cleared after committing).

After reading about RocksDB memory usage ([this link was 
helpful|[https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]),]
 I realized a lot of this was likely the "Index and Filters" memory usage. This 
job is doing a streaming duplicate with a lot of unique values so it makes 
sense these block usages would be high. The problem is that, because as it is 
now the underlying RocksDB instance stays open on an executor as long as that 
executor is assigned that stateful partition (to be reused across batches). So 
a single executor can accumulate a large number of RocksDB instances open at 
once, each using a certain amount of native memory. In the worst case, a single 
executor could need to keep open every single partitions' RocksDB instance at 
once. 

There are a couple ways you can control the amount of memory used, such as 
limiting the max open files, or adding the option to use the block cache for 
the indices and filters, but neither of these solve the underlying problem of 
accumulating native memory from multiple partitions on an executor.

The real fix needs to be a mechanism and option to close the underlying RocksDB 
instance at the end of each task, so you have the option to only ever have one 
RocksDB instance open at a time, thus having predictable memory usage no matter 
the size of your data or number of shuffle partitions. 

We are running this on Spark 3.3, but just kicked off a test to see if things 
are any different in Spark 3.4.


> RocksDB State Store can accumulate unbounded native memory
> --
>
> Key: SPARK-43244
> URL: https://issues.apache.org/jira/browse/SPARK-43244
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.3.2
>Reporter: Adam Binford
>Priority: Major
>
> We noticed in one of our production stateful streaming jobs using RocksDB 
> that an executor with 20g of heap was using around 40g of resident memory. I 
> noticed a single RocksDB instance was using around 150 MiB of memory, and 
> only 5 MiB or so of this was from the write batch (which is now cleared after 
> committing).
> After reading about RocksDB memory usage ([this link was 
> helpful]([https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
>  I realized a lot of this was likely the "Index and Filters" memory usage. 
> This job is doing a streaming duplicate with a lot of unique values so it 
> makes sense these block usages would be 

[jira] [Commented] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory

2023-04-23 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715495#comment-17715495
 ] 

Adam Binford commented on SPARK-43244:
--

[~kabhwan] curious your thoughts on this. Seems somewhat straightforward to 
have an option to close the RocksDB instance after a task completes. Things 
like maintenance wouldn't be affected because it doesn't use the RocksDB 
instance to clean up old files. During the next batch I think it can reuse a 
lot of the files that have already been downloaded, it will just need to 
re-create the underlying RocksDB instance.

The main question is how to actually do the closing. It can't be done at the 
end of a commit because some things iterate over the items after committing. 
Abort is only called in certain situations. So the options either seem like 
 # Do it via the abort mechanism but change abort to be called every time even 
for write stores. This seems like it would be fine for the two existing state 
stores, but could cause issues for any custom state stores people have?
 # Add a new function to the StateStore classes like "finished" or "complete" 
or something that says the task is done, do any cleanup you want to do that's 
not a full unload.

> RocksDB State Store can accumulate unbounded native memory
> --
>
> Key: SPARK-43244
> URL: https://issues.apache.org/jira/browse/SPARK-43244
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.3.2
>Reporter: Adam Binford
>Priority: Major
>
> We noticed in one of our production stateful streaming jobs using RocksDB 
> that an executor with 20g of heap was using around 40g of resident memory. I 
> noticed a single RocksDB instance was using around 150 MiB of memory, and 
> only 5 MiB or so of this was from the write batch (which is now cleared after 
> committing).
> After reading about RocksDB memory usage ([this link was 
> helpful|[https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]),]
>  I realized a lot of this was likely the "Index and Filters" memory usage. 
> This job is doing a streaming duplicate with a lot of unique values so it 
> makes sense these block usages would be high. The problem is that, because as 
> it is now the underlying RocksDB instance stays open on an executor as long 
> as that executor is assigned that stateful partition (to be reused across 
> batches). So a single executor can accumulate a large number of RocksDB 
> instances open at once, each using a certain amount of native memory. In the 
> worst case, a single executor could need to keep open every single 
> partitions' RocksDB instance at once. 
> There are a couple ways you can control the amount of memory used, such as 
> limiting the max open files, or adding the option to use the block cache for 
> the indices and filters, but neither of these solve the underlying problem of 
> accumulating native memory from multiple partitions on an executor.
> The real fix needs to be a mechanism and option to close the underlying 
> RocksDB instance at the end of each task, so you have the option to only ever 
> have one RocksDB instance open at a time, thus having predictable memory 
> usage no matter the size of your data or number of shuffle partitions. 
> We are running this on Spark 3.3, but just kicked off a test to see if things 
> are any different in Spark 3.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory

2023-04-23 Thread Adam Binford (Jira)
Adam Binford created SPARK-43244:


 Summary: RocksDB State Store can accumulate unbounded native memory
 Key: SPARK-43244
 URL: https://issues.apache.org/jira/browse/SPARK-43244
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 3.3.2
Reporter: Adam Binford


We noticed in one of our production stateful streaming jobs using RocksDB that 
an executor with 20g of heap was using around 40g of resident memory. I noticed 
a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or 
so of this was from the write batch (which is now cleared after committing).

After reading about RocksDB memory usage ([this link was 
helpful|[https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]),]
 I realized a lot of this was likely the "Index and Filters" memory usage. This 
job is doing a streaming duplicate with a lot of unique values so it makes 
sense these block usages would be high. The problem is that, because as it is 
now the underlying RocksDB instance stays open on an executor as long as that 
executor is assigned that stateful partition (to be reused across batches). So 
a single executor can accumulate a large number of RocksDB instances open at 
once, each using a certain amount of native memory. In the worst case, a single 
executor could need to keep open every single partitions' RocksDB instance at 
once. 

There are a couple ways you can control the amount of memory used, such as 
limiting the max open files, or adding the option to use the block cache for 
the indices and filters, but neither of these solve the underlying problem of 
accumulating native memory from multiple partitions on an executor.

The real fix needs to be a mechanism and option to close the underlying RocksDB 
instance at the end of each task, so you have the option to only ever have one 
RocksDB instance open at a time, thus having predictable memory usage no matter 
the size of your data or number of shuffle partitions. 

We are running this on Spark 3.3, but just kicked off a test to see if things 
are any different in Spark 3.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-42492) Add new function filter_value

2023-02-19 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-42492:
-
Description: 
Doing data validation in Spark can lead to a lot of extra evaluations of 
expressions. This is because conditionally evaluated expressions aren't 
candidates for subexpression elimination. For example a simple expression such 
as 

{{when(validate(col), col)}}

to only keep col if it matches some condition, will lead to col being evaluated 
twice. And if call itself is made up of a series of expensive expressions 
itself, like regular expression checks, this can lead to a lot of wasted 
computation time.

The initial attempt to resolve this was 
https://issues.apache.org/jira/browse/SPARK-35564, adding support for 
subexpression elimination to conditionally evaluated expressions. However I 
have not been able to get that merged, so this is an alternative (though I 
believe that is still useful on top of this).

We can add a new higher order function "filter_value" that takes the column you 
want to validate as an argument, and then a function that runs a lambda 
expression returning a boolean on whether to keep that column or not. It would 
have the same semantics as the above when expression, except it would guarantee 
to only evaluate the initial column once.

An alternative would be to implement a real definition for the NullIf 
expression, but that would only support exact equals checks and not any generic 
condition.

  was:
Doing data validation in Spark can lead to a lot of extra evaluations of 
expressions. This is because conditionally evaluated expressions aren't 
candidates for subexpression elimination. For example a simple expression such 
as 

{{when(validate(col), col)}}

to only keep col if it matches some condition, will lead to col being evaluated 
twice. And if call itself is made up of a series of expensive expressions 
itself, like regular expression checks, this can lead to a lot of wasted 
computation time.

The initial attempt to resolve this was 
https://issues.apache.org/jira/browse/SPARK-35564, adding support for 
subexpression elimination to conditionally evaluated expressions. However I 
have not been able to get that merged, so this is an alternative (though I 
believe that is still useful on top of this).

We can add a new lambda function "filter_value" that takes the column you want 
to validate as an argument, and then a function that runs a lambda expression 
returning a boolean on whether to keep that column or not. It would have the 
same semantics as the above when expression, except it would guarantee to only 
evaluate the initial column once.

An alternative would be to implement a real definition for the NullIf 
expression, but that would only support exact equals checks and not any generic 
condition.


> Add new function filter_value
> -
>
> Key: SPARK-42492
> URL: https://issues.apache.org/jira/browse/SPARK-42492
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.3.2
>Reporter: Adam Binford
>Priority: Major
>
> Doing data validation in Spark can lead to a lot of extra evaluations of 
> expressions. This is because conditionally evaluated expressions aren't 
> candidates for subexpression elimination. For example a simple expression 
> such as 
> {{when(validate(col), col)}}
> to only keep col if it matches some condition, will lead to col being 
> evaluated twice. And if call itself is made up of a series of expensive 
> expressions itself, like regular expression checks, this can lead to a lot of 
> wasted computation time.
> The initial attempt to resolve this was 
> https://issues.apache.org/jira/browse/SPARK-35564, adding support for 
> subexpression elimination to conditionally evaluated expressions. However I 
> have not been able to get that merged, so this is an alternative (though I 
> believe that is still useful on top of this).
> We can add a new higher order function "filter_value" that takes the column 
> you want to validate as an argument, and then a function that runs a lambda 
> expression returning a boolean on whether to keep that column or not. It 
> would have the same semantics as the above when expression, except it would 
> guarantee to only evaluate the initial column once.
> An alternative would be to implement a real definition for the NullIf 
> expression, but that would only support exact equals checks and not any 
> generic condition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42492) Add new function filter_value

2023-02-19 Thread Adam Binford (Jira)
Adam Binford created SPARK-42492:


 Summary: Add new function filter_value
 Key: SPARK-42492
 URL: https://issues.apache.org/jira/browse/SPARK-42492
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 3.3.2
Reporter: Adam Binford


Doing data validation in Spark can lead to a lot of extra evaluations of 
expressions. This is because conditionally evaluated expressions aren't 
candidates for subexpression elimination. For example a simple expression such 
as 

{{when(validate(col), col)}}

to only keep col if it matches some condition, will lead to col being evaluated 
twice. And if call itself is made up of a series of expensive expressions 
itself, like regular expression checks, this can lead to a lot of wasted 
computation time.

The initial attempt to resolve this was 
https://issues.apache.org/jira/browse/SPARK-35564, adding support for 
subexpression elimination to conditionally evaluated expressions. However I 
have not been able to get that merged, so this is an alternative (though I 
believe that is still useful on top of this).

We can add a new lambda function "filter_value" that takes the column you want 
to validate as an argument, and then a function that runs a lambda expression 
returning a boolean on whether to keep that column or not. It would have the 
same semantics as the above when expression, except it would guarantee to only 
evaluate the initial column once.

An alternative would be to implement a real definition for the NullIf 
expression, but that would only support exact equals checks and not any generic 
condition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42347) Arrow string and binary vectors only support 1 GiB

2023-02-04 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684212#comment-17684212
 ] 

Adam Binford commented on SPARK-42347:
--

[https://github.com/apache/spark/pull/39572] is a potential workaround to allow 
enabling the large variable width vectors when users hit this limit.

> Arrow string and binary vectors only support 1 GiB
> --
>
> Key: SPARK-42347
> URL: https://issues.apache.org/jira/browse/SPARK-42347
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Adam Binford
>Priority: Major
>
> Since Arrow 10.0.0, BaseVariableWidthVector (the parent for string and binary 
> vectors), only supports expanding up to 1 GiB through the safe interfaces, 
> which Spark uses, instead of 2 GiB previously. This is due to 
> [https://github.com/apache/arrow/pull/13815.] I added a comment in there but 
> haven't got any responses yet, will make an issue in Arrow as well.
> Basically whenever you try to add data beyond 1 GiB, the vector will try to 
> double itself to the next power of two, which would be {{{}2147483648{}}}, 
> which is greater than {{Integer.MAX_VALUE}} which is {{{}2147483647{}}}, thus 
> throwing a {{{}OversizedAllocationException{}}}.
> See [https://github.com/apache/spark/pull/39572#issuecomment-1383195213] and 
> the comment above for how I recreated to show this was now the case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42347) Arrow string and binary vectors only support 1 GiB

2023-02-04 Thread Adam Binford (Jira)
Adam Binford created SPARK-42347:


 Summary: Arrow string and binary vectors only support 1 GiB
 Key: SPARK-42347
 URL: https://issues.apache.org/jira/browse/SPARK-42347
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.4.0
Reporter: Adam Binford


Since Arrow 10.0.0, BaseVariableWidthVector (the parent for string and binary 
vectors), only supports expanding up to 1 GiB through the safe interfaces, 
which Spark uses, instead of 2 GiB previously. This is due to 
[https://github.com/apache/arrow/pull/13815.] I added a comment in there but 
haven't got any responses yet, will make an issue in Arrow as well.

Basically whenever you try to add data beyond 1 GiB, the vector will try to 
double itself to the next power of two, which would be {{{}2147483648{}}}, 
which is greater than {{Integer.MAX_VALUE}} which is {{{}2147483647{}}}, thus 
throwing a {{{}OversizedAllocationException{}}}.

See [https://github.com/apache/spark/pull/39572#issuecomment-1383195213] and 
the comment above for how I recreated to show this was now the case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-39979) IndexOutOfBoundsException on groupby + apply pandas grouped map udf function

2023-01-13 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-39979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17676610#comment-17676610
 ] 

Adam Binford commented on SPARK-39979:
--

We have encountered this as well and I'm am working on a fix. This is a 
limitation of using the BaseVariableWidthVector based classes for string and 
binary and not the BaseLargeVariableWidthVector based types. A single string or 
binary vector cannot hold more than 2 GiB of data, because it stores offsets to 
each value and the offsets are stored as 4 byte integers. The large types use 8 
bytes for the offset instead, thus removing the limitation.

Options are:
 * Add a config you can set to use the large types instead of the normal types
 * Just use the large types for everything, with the expense of an additional 4 
bytes per record in the vector for storing offsets (not sure if there are 
additional overheads)

Let me know if there's any thoughts or opinions on which route to go.

> IndexOutOfBoundsException on groupby + apply pandas grouped map udf function
> 
>
> Key: SPARK-39979
> URL: https://issues.apache.org/jira/browse/SPARK-39979
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.2.1
>Reporter: yaniv oren
>Priority: Major
>
> I'm grouping on relatively small subset of groups with big size groups.
> Working with pyarrow version 2.0.0, machines memory is {color:#44}64 
> GiB.{color}
> I'm getting the following error:
> {code:java}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 387 
> in stage 162.0 failed 4 times, most recent failure: Lost task 387.3 in stage 
> 162.0 (TID 29957) (ip-172-21-129-187.eu-west-1.compute.internal executor 71): 
> java.lang.IndexOutOfBoundsException: index: 2147483628, length: 36 (expected: 
> range(0, 2147483648))
>   at org.apache.arrow.memory.ArrowBuf.checkIndex(ArrowBuf.java:699)
>   at org.apache.arrow.memory.ArrowBuf.setBytes(ArrowBuf.java:890)
>   at 
> org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1087)
>   at 
> org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:251)
>   at 
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:130)
>   at 
> org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:95)
>   at 
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:92)
>   at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
>   at 
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:103)
>   at 
> org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435)
>   at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2031)
>   at 
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270)
>  {code}
> Why do I hit this 2 GB limit? according SPARK-34588 this is supported, 
> perhaps related to SPARK-34020.
> Please assist.
> Note:
> Is it related to the usage of BaseVariableWidthVector and not 
> BaseLargeVariableWidthVector?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41339) RocksDB state store WriteBatch doesn't clean up native memory

2022-11-30 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-41339:
-
Summary: RocksDB state store WriteBatch doesn't clean up native memory  
(was: RocksDB State Store WriteBatch doesn't cleanup native memory)

> RocksDB state store WriteBatch doesn't clean up native memory
> -
>
> Key: SPARK-41339
> URL: https://issues.apache.org/jira/browse/SPARK-41339
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 3.3.1
>Reporter: Adam Binford
>Priority: Major
>
> The RocksDB state store uses a WriteBatch to hold updates that get written in 
> a single transaction to commit. Somewhat indirectly abort is called after a 
> successful task which calls writeBatch.clear(), but the data for a writeBatch 
> is stored in a std::string in the native code. Not sure why it's stored as a 
> string, but it is. [rocksdb/write_batch.h at main · facebook/rocksdb · 
> GitHub|https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_batch.h#L491]
> writeBatch.clear simply calls rep_.clear() and rep._resize() 
> ([rocksdb/write_batch.cc at main · facebook/rocksdb · 
> GitHub|https://github.com/facebook/rocksdb/blob/main/db/write_batch.cc#L246-L247]),
>  neither of which actually releases the memory built up by a std::string 
> instance. The only way to actually release this memory is to delete the 
> WriteBatch object itself.
> Currently, all memory taken by all write batches will remain until the 
> RocksDB state store instance is closed, which never happens during the normal 
> course of operation as all partitions remain loaded on an executor after a 
> task completes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41340) RocksDB State Store WriteBatch doesn't cleanup native memory

2022-11-30 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641442#comment-17641442
 ] 

Adam Binford commented on SPARK-41340:
--

Got added twice

> RocksDB State Store WriteBatch doesn't cleanup native memory
> 
>
> Key: SPARK-41340
> URL: https://issues.apache.org/jira/browse/SPARK-41340
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 3.3.1
>Reporter: Adam Binford
>Priority: Major
>
> The RocksDB state store uses a WriteBatch to hold updates that get written in 
> a single transaction to commit. Somewhat indirectly abort is called after a 
> successful task which calls writeBatch.clear(), but the data for a writeBatch 
> is stored in a std::string in the native code. Not sure why it's stored as a 
> string, but it is. [rocksdb/write_batch.h at main · facebook/rocksdb · 
> GitHub|https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_batch.h#L491]
> writeBatch.clear simply calls rep_.clear() and rep._resize() 
> ([rocksdb/write_batch.cc at main · facebook/rocksdb · 
> GitHub|https://github.com/facebook/rocksdb/blob/main/db/write_batch.cc#L246-L247]),
>  neither of which actually releases the memory built up by a std::string 
> instance. The only way to actually release this memory is to delete the 
> WriteBatch object itself.
> Currently, all memory taken by all write batches will remain until the 
> RocksDB state store instance is closed, which never happens during the normal 
> course of operation as all partitions remain loaded on an executor after a 
> task completes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41340) RocksDB State Store WriteBatch doesn't cleanup native memory

2022-11-30 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford resolved SPARK-41340.
--
Resolution: Duplicate

> RocksDB State Store WriteBatch doesn't cleanup native memory
> 
>
> Key: SPARK-41340
> URL: https://issues.apache.org/jira/browse/SPARK-41340
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 3.3.1
>Reporter: Adam Binford
>Priority: Major
>
> The RocksDB state store uses a WriteBatch to hold updates that get written in 
> a single transaction to commit. Somewhat indirectly abort is called after a 
> successful task which calls writeBatch.clear(), but the data for a writeBatch 
> is stored in a std::string in the native code. Not sure why it's stored as a 
> string, but it is. [rocksdb/write_batch.h at main · facebook/rocksdb · 
> GitHub|https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_batch.h#L491]
> writeBatch.clear simply calls rep_.clear() and rep._resize() 
> ([rocksdb/write_batch.cc at main · facebook/rocksdb · 
> GitHub|https://github.com/facebook/rocksdb/blob/main/db/write_batch.cc#L246-L247]),
>  neither of which actually releases the memory built up by a std::string 
> instance. The only way to actually release this memory is to delete the 
> WriteBatch object itself.
> Currently, all memory taken by all write batches will remain until the 
> RocksDB state store instance is closed, which never happens during the normal 
> course of operation as all partitions remain loaded on an executor after a 
> task completes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41340) RocksDB State Store WriteBatch doesn't cleanup native memory

2022-11-30 Thread Adam Binford (Jira)
Adam Binford created SPARK-41340:


 Summary: RocksDB State Store WriteBatch doesn't cleanup native 
memory
 Key: SPARK-41340
 URL: https://issues.apache.org/jira/browse/SPARK-41340
 Project: Spark
  Issue Type: Bug
  Components: SQL, Structured Streaming
Affects Versions: 3.3.1
Reporter: Adam Binford


The RocksDB state store uses a WriteBatch to hold updates that get written in a 
single transaction to commit. Somewhat indirectly abort is called after a 
successful task which calls writeBatch.clear(), but the data for a writeBatch 
is stored in a std::string in the native code. Not sure why it's stored as a 
string, but it is. [rocksdb/write_batch.h at main · facebook/rocksdb · 
GitHub|https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_batch.h#L491]

writeBatch.clear simply calls rep_.clear() and rep._resize() 
([rocksdb/write_batch.cc at main · facebook/rocksdb · 
GitHub|https://github.com/facebook/rocksdb/blob/main/db/write_batch.cc#L246-L247]),
 neither of which actually releases the memory built up by a std::string 
instance. The only way to actually release this memory is to delete the 
WriteBatch object itself.

Currently, all memory taken by all write batches will remain until the RocksDB 
state store instance is closed, which never happens during the normal course of 
operation as all partitions remain loaded on an executor after a task completes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41339) RocksDB State Store WriteBatch doesn't cleanup native memory

2022-11-30 Thread Adam Binford (Jira)
Adam Binford created SPARK-41339:


 Summary: RocksDB State Store WriteBatch doesn't cleanup native 
memory
 Key: SPARK-41339
 URL: https://issues.apache.org/jira/browse/SPARK-41339
 Project: Spark
  Issue Type: Bug
  Components: SQL, Structured Streaming
Affects Versions: 3.3.1
Reporter: Adam Binford


The RocksDB state store uses a WriteBatch to hold updates that get written in a 
single transaction to commit. Somewhat indirectly abort is called after a 
successful task which calls writeBatch.clear(), but the data for a writeBatch 
is stored in a std::string in the native code. Not sure why it's stored as a 
string, but it is. [rocksdb/write_batch.h at main · facebook/rocksdb · 
GitHub|https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_batch.h#L491]

writeBatch.clear simply calls rep_.clear() and rep._resize() 
([rocksdb/write_batch.cc at main · facebook/rocksdb · 
GitHub|https://github.com/facebook/rocksdb/blob/main/db/write_batch.cc#L246-L247]),
 neither of which actually releases the memory built up by a std::string 
instance. The only way to actually release this memory is to delete the 
WriteBatch object itself.

Currently, all memory taken by all write batches will remain until the RocksDB 
state store instance is closed, which never happens during the normal course of 
operation as all partitions remain loaded on an executor after a task completes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40775) V2 file scans have duplicative descriptions

2022-10-12 Thread Adam Binford (Jira)
Adam Binford created SPARK-40775:


 Summary: V2 file scans have duplicative descriptions
 Key: SPARK-40775
 URL: https://issues.apache.org/jira/browse/SPARK-40775
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: Adam Binford


V2 file scans have duplication in the description. This is because FileScan 
uses the metadata to create the description, but each file type overrides 
metadata and the description adding the same metadata.

Example from a parquet agg pushdown explain:

{{ *+- BatchScan parquet file:/...[min(_3)#814, max(_3)#815, min(_1)#816, 
max(_1)#817, count(*)#818L, count(_1)#819L, count(_2)#820L, count(_3)#821L] 
ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 
paths)[file:/..., PartitionFilters: [], PushedAggregation: [MIN(_3), MAX(_3), 
MIN(_1), MAX(_1), COUNT(*), COUNT(_1), COUNT(_2), COUNT(_3)], PushedFilters: 
[], PushedGroupBy: [], ReadSchema: 
struct

[jira] [Created] (SPARK-40565) Non-deterministic filters shouldn't get pushed to V2 file sources

2022-09-26 Thread Adam Binford (Jira)
Adam Binford created SPARK-40565:


 Summary: Non-deterministic filters shouldn't get pushed to V2 file 
sources
 Key: SPARK-40565
 URL: https://issues.apache.org/jira/browse/SPARK-40565
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: Adam Binford


Currently non-deterministic filters can be pushed down to V2 file sources, 
which is different from V1 which prevents out non-deterministic filters from 
being pushed.

Main consequences:
 * Things like doing a rand filter on a partition column will throw an 
exception:
 ** {{IllegalArgumentException: requirement failed: Nondeterministic expression 
org.apache.spark.sql.catalyst.expressions.Rand should be initialized before 
eval.}}
 * {{Using a non-deterministic UDF to collect metrics via accumulators gets 
pushed down and gives the wrong metrics}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40314) Add inline Scala and Python bindings

2022-09-02 Thread Adam Binford (Jira)
Adam Binford created SPARK-40314:


 Summary: Add inline Scala and Python bindings
 Key: SPARK-40314
 URL: https://issues.apache.org/jira/browse/SPARK-40314
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: Adam Binford


Add bindings for inline and inline_outer to Scala and Python function APIs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40246) Logging isn't configurable via log4j2 with hadoop-provided profile

2022-08-27 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-40246:
-
Component/s: Build
 (was: Spark Core)

> Logging isn't configurable via log4j2 with hadoop-provided profile
> --
>
> Key: SPARK-40246
> URL: https://issues.apache.org/jira/browse/SPARK-40246
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 3.3.0
>Reporter: Adam Binford
>Priority: Major
>
> When building Spark with -Phadoop-provided (or using the 3.3.0 build without 
> Hadoop), there is no slf implementation provided for log4j2, so the default 
> log4j2 properties are ignored and logging isn't configurable via 
> SparkContext.setLogLevel.
> Reproduction on a fresh Ubuntu container:
>  
> {noformat}
> apt-get update
> apt-get install -y wget
> wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz
> wget 
> https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-without-hadoop.tgz
> tar -xvf hadoop-3.3.4.tar.gz -C /opt
> tar -xvf spark-3.3.0-bin-without-hadoop.tgz -C /opt
> export HADOOP_HOME=/opt/hadoop-3.3.4/
> export SPARK_HOME=/opt/spark-3.3.0-bin-without-hadoop/
> apt install -y openjdk-11-jre-headless python3
> export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64/
> export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
> $SPARK_HOME/bin/pyspark
> {noformat}
> The default log level starts at INFO and you can't change it with 
> sc.setLogLevel



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40246) Logging isn't configurable via log4j2 with hadoop-provided profile

2022-08-27 Thread Adam Binford (Jira)
Adam Binford created SPARK-40246:


 Summary: Logging isn't configurable via log4j2 with 
hadoop-provided profile
 Key: SPARK-40246
 URL: https://issues.apache.org/jira/browse/SPARK-40246
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.3.0
Reporter: Adam Binford


When building Spark with -Phadoop-provided (or using the 3.3.0 build without 
Hadoop), there is no slf implementation provided for log4j2, so the default 
log4j2 properties are ignored and logging isn't configurable via 
SparkContext.setLogLevel.

Reproduction on a fresh Ubuntu container:

 
{noformat}
apt-get update
apt-get install -y wget
wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz
wget 
https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-without-hadoop.tgz
tar -xvf hadoop-3.3.4.tar.gz -C /opt
tar -xvf spark-3.3.0-bin-without-hadoop.tgz -C /opt
export HADOOP_HOME=/opt/hadoop-3.3.4/
export SPARK_HOME=/opt/spark-3.3.0-bin-without-hadoop/
apt install -y openjdk-11-jre-headless python3
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64/
export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
$SPARK_HOME/bin/pyspark
{noformat}
The default log level starts at INFO and you can't change it with sc.setLogLevel



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-39951) Support columnar batches with nested fields in Parquet V2

2022-08-02 Thread Adam Binford (Jira)
Adam Binford created SPARK-39951:


 Summary: Support columnar batches with nested fields in Parquet V2
 Key: SPARK-39951
 URL: https://issues.apache.org/jira/browse/SPARK-39951
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: Adam Binford


Follow up to https://issues.apache.org/jira/browse/SPARK-34863 to updated 
`supportsColumnarReads` to account for nested fields



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38640) NPE with unpersisting memory-only RDD with RDD fetching from shuffle service enabled

2022-03-23 Thread Adam Binford (Jira)
Adam Binford created SPARK-38640:


 Summary: NPE with unpersisting memory-only RDD with RDD fetching 
from shuffle service enabled
 Key: SPARK-38640
 URL: https://issues.apache.org/jira/browse/SPARK-38640
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.2.1
Reporter: Adam Binford


If you have RDD fetching from shuffle service enabled, memory-only cached RDDs 
will fail to unpersist.

 

```

// spark.shuffle.service.fetch.rdd.enabled=true
val df = spark.range(5)
  .persist(StorageLevel.MEMORY_ONLY)

df.count()
df.unpersist(true)
```



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38640) NPE with unpersisting memory-only RDD with RDD fetching from shuffle service enabled

2022-03-23 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-38640:
-
Description: 
If you have RDD fetching from shuffle service enabled, memory-only cached RDDs 
will fail to unpersist.

 

 
{code:java}
// spark.shuffle.service.fetch.rdd.enabled=true
val df = spark.range(5)
  .persist(StorageLevel.MEMORY_ONLY)
df.count()
df.unpersist(true)
{code}
 

  was:
If you have RDD fetching from shuffle service enabled, memory-only cached RDDs 
will fail to unpersist.

 

```

// spark.shuffle.service.fetch.rdd.enabled=true
val df = spark.range(5)
  .persist(StorageLevel.MEMORY_ONLY)

df.count()
df.unpersist(true)
```


> NPE with unpersisting memory-only RDD with RDD fetching from shuffle service 
> enabled
> 
>
> Key: SPARK-38640
> URL: https://issues.apache.org/jira/browse/SPARK-38640
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.1
>Reporter: Adam Binford
>Priority: Major
>
> If you have RDD fetching from shuffle service enabled, memory-only cached 
> RDDs will fail to unpersist.
>  
>  
> {code:java}
> // spark.shuffle.service.fetch.rdd.enabled=true
> val df = spark.range(5)
>   .persist(StorageLevel.MEMORY_ONLY)
> df.count()
> df.unpersist(true)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38194) Make Yarn memory overhead factor configurable

2022-02-13 Thread Adam Binford (Jira)
Adam Binford created SPARK-38194:


 Summary: Make Yarn memory overhead factor configurable
 Key: SPARK-38194
 URL: https://issues.apache.org/jira/browse/SPARK-38194
 Project: Spark
  Issue Type: Improvement
  Components: YARN
Affects Versions: 3.2.1
Reporter: Adam Binford


Currently if the memory overhead is not provided for a Yarn job, it defaults to 
10% of the respective driver/executor memory. This 10% is hard-coded and the 
only way to increase memory overhead is to set the exact memory overhead. We 
have seen more than 10% memory being used, and it would be helpful to be able 
to set the default overhead factor so that the overhead doesn't need to be 
pre-calculated for any driver/executor memory size. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35570) Shuffle file leak with external shuffle service enable

2021-12-21 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17463386#comment-17463386
 ] 

Adam Binford commented on SPARK-35570:
--

Just found this after making https://issues.apache.org/jira/browse/SPARK-37618. 
I'm working on an approach that makes use of the existing remove blocks call 
for RDDs served from the shuffle server. Can try to get some feedback once I 
have that working.

> Shuffle file leak with external shuffle service enable
> --
>
> Key: SPARK-35570
> URL: https://issues.apache.org/jira/browse/SPARK-35570
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Shuffle
>Affects Versions: 3.1.2
>Reporter: ZiyueGuan
>Priority: Major
>
> Unlike rdd block, external shuffle service doesn't offer a cleaning up of 
> shuffle file. The cleaning up of shuffle file mainly rely on alive executors 
> to response the request from context cleaner. As long as the executor exit, 
> the shuffle file left will not be cleaned until application exits. For 
> streaming application or long running application, disk may run out. 
> I'm confused that shuffle file was left like above while the lifecycle of rdd 
> block was properly handled. Is there any difference between them? 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37618) Support cleaning up shuffle blocks from external shuffle service

2021-12-12 Thread Adam Binford (Jira)
Adam Binford created SPARK-37618:


 Summary: Support cleaning up shuffle blocks from external shuffle 
service
 Key: SPARK-37618
 URL: https://issues.apache.org/jira/browse/SPARK-37618
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 3.2.0
Reporter: Adam Binford


Currently shuffle data is not cleaned up when an external shuffle service is 
used and the associated executor has been deallocated before the shuffle is 
cleaned up. Shuffle data is only cleaned up once the application ends.

There have been various issues filed for this:

https://issues.apache.org/jira/browse/SPARK-26020

https://issues.apache.org/jira/browse/SPARK-17233

https://issues.apache.org/jira/browse/SPARK-4236

But shuffle files will still stick around until an application completes. 
Dynamic allocation is commonly used for long running jobs (such as structured 
streaming), so any long running jobs with a large shuffle involved will 
eventually fill up local disk space. The shuffle service already supports 
cleaning up shuffle service persisted RDDs, so it should be able to support 
cleaning up shuffle blocks as well once the shuffle is removed by the 
ContextCleaner. 

The current alternative is to use shuffle tracking instead of an external 
shuffle service, but this is less optimal from a resource perspective as all 
executors must be kept alive until the shuffle has been fully consumed and 
cleaned up (and with the default GC interval being 30 minutes this can waste a 
lot of time with executors held onto but not doing anything).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37467) Consolidate whole stage and non-whole stage subexpression elimination

2021-11-25 Thread Adam Binford (Jira)
Adam Binford created SPARK-37467:


 Summary: Consolidate whole stage and non-whole stage subexpression 
elimination
 Key: SPARK-37467
 URL: https://issues.apache.org/jira/browse/SPARK-37467
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: Adam Binford


Currently there are separate subexpression elimination paths for whole stage 
and non-whole stage codegen. Consolidating these into a single code path would 
make it simpler to add further enhancements, such as supporting lambda 
functions  (https://issues.apache.org/jira/browse/SPARK-37466).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37466) Support subexpression elimination in lambda functions

2021-11-25 Thread Adam Binford (Jira)
Adam Binford created SPARK-37466:


 Summary: Support subexpression elimination in lambda functions
 Key: SPARK-37466
 URL: https://issues.apache.org/jira/browse/SPARK-37466
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: Adam Binford


https://issues.apache.org/jira/browse/SPARK-37019 will add codegen support for 
higher order functions. However we can't support subexpression elimination 
inside of lambda functions because subexpressions are evaluated once per row at 
the beginning of the codegen. Common expressions inside lambda functions can 
easily result in performance degradation due to multiple evaluations of the 
same expression. Subexpression elimination inside of lambda functions needs to 
be handled specially to be evaluated once per function invocation.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37019) Add Codegen support to array higher-order-functions

2021-11-11 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-37019:
-
Description: 
Currently all of the higher order functions use CodegenFallback. We can improve 
the performance of these by adding proper codegen support, so the function as 
well as all children can be codegen'd, and it can participate in 
WholeStageCodegen.

This ticket is for adding support to array-based higher-order functions.

  was:
Currently all of the higher order functions use CodegenFallback. We can improve 
the performance of these by adding proper codegen support, so the function as 
well as all children can be codegen'd, and it can participate in 
WholeStageCodegen.

This ticket is for adding support to ArrayTransform as the first step.

Summary: Add Codegen support to array higher-order-functions  (was: Add 
Codegen support to ArrayTransform)

> Add Codegen support to array higher-order-functions
> ---
>
> Key: SPARK-37019
> URL: https://issues.apache.org/jira/browse/SPARK-37019
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Adam Binford
>Priority: Major
>
> Currently all of the higher order functions use CodegenFallback. We can 
> improve the performance of these by adding proper codegen support, so the 
> function as well as all children can be codegen'd, and it can participate in 
> WholeStageCodegen.
> This ticket is for adding support to array-based higher-order functions.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37019) Add Codegen support to ArrayTransform

2021-10-15 Thread Adam Binford (Jira)
Adam Binford created SPARK-37019:


 Summary: Add Codegen support to ArrayTransform
 Key: SPARK-37019
 URL: https://issues.apache.org/jira/browse/SPARK-37019
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Adam Binford


Currently all of the higher order functions use CodegenFallback. We can improve 
the performance of these by adding proper codegen support, so the function as 
well as all children can be codegen'd, and it can participate in 
WholeStageCodegen.

This ticket is for adding support to ArrayTransform as the first step.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36918) unionByName shouldn't consider types when comparing structs

2021-10-03 Thread Adam Binford (Jira)
Adam Binford created SPARK-36918:


 Summary: unionByName shouldn't consider types when comparing 
structs
 Key: SPARK-36918
 URL: https://issues.apache.org/jira/browse/SPARK-36918
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Adam Binford


Improvement/follow-on of https://issues.apache.org/jira/browse/SPARK-35290.

We use StructType.sameType to see if we need to recreate the struct, but this 
can lead to false positives if the structure is the same but the types are 
different, and will lead to simply creating a new struct that's exactly the 
same as the original. This can cause significant overhead when unioning 
multiple deeply nested nullable structs, as each time it's recreated it gets 
wrapped in a If(IsNull()). Only comparing the field names can lead to more 
efficient plans.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35863) Upgrade Ivy to 2.5.0

2021-06-23 Thread Adam Binford (Jira)
Adam Binford created SPARK-35863:


 Summary: Upgrade Ivy to 2.5.0
 Key: SPARK-35863
 URL: https://issues.apache.org/jira/browse/SPARK-35863
 Project: Spark
  Issue Type: Improvement
  Components: Spark Submit
Affects Versions: 3.1.2
Reporter: Adam Binford


Apache Ivy 2.5.0 was released nearly two years ago. The new bug fixes and 
features can be found here: 
[https://ant.apache.org/ivy/history/latest-milestone/release-notes.html]

Most notably, the adding of ivy.maven.lookup.sources and 
ivy.maven.lookup.javadoc configs can significantly speed up module resolution 
time if these are turned off, especially behind a proxy. These could arguably 
be turned off by default, because when submitting jobs you probably don't care 
about the sources or javadoc jars.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions

2021-06-06 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358286#comment-17358286
 ] 

Adam Binford commented on SPARK-35564:
--

Is that documented somewhere? I know Boolean expressions aren't guaranteed to 
short circuit, but I think most spark users would assume multiple when clauses 
would short circuit

> Support subexpression elimination for non-common branches of conditional 
> expressions
> 
>
> Key: SPARK-35564
> URL: https://issues.apache.org/jira/browse/SPARK-35564
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Adam Binford
>Priority: Major
>
> https://issues.apache.org/jira/browse/SPARK-7 added support for pulling 
> subexpressions out of branches of conditional expressions for expressions 
> present in all branches. We should be able to take this a step further and 
> pull out subexpressions for any branch, as long as that expression will 
> definitely be evaluated at least once.
> Consider a common data validation example:
> {code:java}
> from pyspark.sql.functions import *
> df = spark.createDataFrame([['word'], ['1234']])
> col = regexp_replace('_1', r'\d', '')
> df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code}
> We only want to keep the value if it's non-empty with numbers removed, 
> otherwise we want it to be null. 
> Because we have no otherwise value, `col` is not a candidate for 
> subexpression elimination (you can see two regular expression replacements in 
> the codegen). But whenever the length is greater than 0, we will have to 
> execute the regular expression replacement twice. Since we know we will 
> always calculate `col` at least once, it makes sense to consider that as a 
> subexpression since we might need it again in the branch value. So we can 
> update the logic from:
> Create a subexpression if an expression will always be evaluated at least 
> twice
> To:
> Create a subexpression if an expression will always be evaluated at least 
> once AND will either always or conditionally be evaluated at least twice.
> The trade off is potentially another subexpression function call (for split 
> subexpressions) if the second evaluation doesn't happen, but this seems like 
> it would be worth it for when it is evaluated the second time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions

2021-06-06 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358275#comment-17358275
 ] 

Adam Binford edited comment on SPARK-35564 at 6/6/21, 10:50 PM:


No the values are fine, it's the tail conditions that cause the issue.
{code:java}
spark.range(2).select(when($"id" >= 0, lit(1)).when(myUdf($"id") > 0, lit(2)), 
when($"id" > -1, lit(1)).when(myUdf($"id") > 0, lit(2))).show(){code}
Here myUdf($"id") gets pulled out as a subexpression even though it never 
should be evaluated.


was (Author: kimahriman):
No the values are fine, it's the condition that cause the issue.
{code:java}
spark.range(2).select(when($"id" >= 0, lit(1)).when(myUdf($"id") > 0, lit(2)), 
when($"id" > -1, lit(1)).when(myUdf($"id") > 0, lit(2))).show(){code}
Here myUdf($"id") gets pulled out as a subexpression even though it never 
should be evaluated.

> Support subexpression elimination for non-common branches of conditional 
> expressions
> 
>
> Key: SPARK-35564
> URL: https://issues.apache.org/jira/browse/SPARK-35564
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Adam Binford
>Priority: Major
>
> https://issues.apache.org/jira/browse/SPARK-7 added support for pulling 
> subexpressions out of branches of conditional expressions for expressions 
> present in all branches. We should be able to take this a step further and 
> pull out subexpressions for any branch, as long as that expression will 
> definitely be evaluated at least once.
> Consider a common data validation example:
> {code:java}
> from pyspark.sql.functions import *
> df = spark.createDataFrame([['word'], ['1234']])
> col = regexp_replace('_1', r'\d', '')
> df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code}
> We only want to keep the value if it's non-empty with numbers removed, 
> otherwise we want it to be null. 
> Because we have no otherwise value, `col` is not a candidate for 
> subexpression elimination (you can see two regular expression replacements in 
> the codegen). But whenever the length is greater than 0, we will have to 
> execute the regular expression replacement twice. Since we know we will 
> always calculate `col` at least once, it makes sense to consider that as a 
> subexpression since we might need it again in the branch value. So we can 
> update the logic from:
> Create a subexpression if an expression will always be evaluated at least 
> twice
> To:
> Create a subexpression if an expression will always be evaluated at least 
> once AND will either always or conditionally be evaluated at least twice.
> The trade off is potentially another subexpression function call (for split 
> subexpressions) if the second evaluation doesn't happen, but this seems like 
> it would be worth it for when it is evaluated the second time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions

2021-06-06 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358275#comment-17358275
 ] 

Adam Binford commented on SPARK-35564:
--

No the values are fine, it's the condition that cause the issue.
{code:java}
spark.range(2).select(when($"id" >= 0, lit(1)).when(myUdf($"id") > 0, lit(2)), 
when($"id" > -1, lit(1)).when(myUdf($"id") > 0, lit(2))).show(){code}
Here myUdf($"id") gets pulled out as a subexpression even though it never 
should be evaluated.

> Support subexpression elimination for non-common branches of conditional 
> expressions
> 
>
> Key: SPARK-35564
> URL: https://issues.apache.org/jira/browse/SPARK-35564
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Adam Binford
>Priority: Major
>
> https://issues.apache.org/jira/browse/SPARK-7 added support for pulling 
> subexpressions out of branches of conditional expressions for expressions 
> present in all branches. We should be able to take this a step further and 
> pull out subexpressions for any branch, as long as that expression will 
> definitely be evaluated at least once.
> Consider a common data validation example:
> {code:java}
> from pyspark.sql.functions import *
> df = spark.createDataFrame([['word'], ['1234']])
> col = regexp_replace('_1', r'\d', '')
> df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code}
> We only want to keep the value if it's non-empty with numbers removed, 
> otherwise we want it to be null. 
> Because we have no otherwise value, `col` is not a candidate for 
> subexpression elimination (you can see two regular expression replacements in 
> the codegen). But whenever the length is greater than 0, we will have to 
> execute the regular expression replacement twice. Since we know we will 
> always calculate `col` at least once, it makes sense to consider that as a 
> subexpression since we might need it again in the branch value. So we can 
> update the logic from:
> Create a subexpression if an expression will always be evaluated at least 
> twice
> To:
> Create a subexpression if an expression will always be evaluated at least 
> once AND will either always or conditionally be evaluated at least twice.
> The trade off is potentially another subexpression function call (for split 
> subexpressions) if the second evaluation doesn't happen, but this seems like 
> it would be worth it for when it is evaluated the second time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions

2021-06-06 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358210#comment-17358210
 ] 

Adam Binford commented on SPARK-35564:
--

You can construct a similar CaseWhen that could lead to a similar problem, the 
coalesce was just simpler to demonstrate

> Support subexpression elimination for non-common branches of conditional 
> expressions
> 
>
> Key: SPARK-35564
> URL: https://issues.apache.org/jira/browse/SPARK-35564
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Adam Binford
>Priority: Major
>
> https://issues.apache.org/jira/browse/SPARK-7 added support for pulling 
> subexpressions out of branches of conditional expressions for expressions 
> present in all branches. We should be able to take this a step further and 
> pull out subexpressions for any branch, as long as that expression will 
> definitely be evaluated at least once.
> Consider a common data validation example:
> {code:java}
> from pyspark.sql.functions import *
> df = spark.createDataFrame([['word'], ['1234']])
> col = regexp_replace('_1', r'\d', '')
> df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code}
> We only want to keep the value if it's non-empty with numbers removed, 
> otherwise we want it to be null. 
> Because we have no otherwise value, `col` is not a candidate for 
> subexpression elimination (you can see two regular expression replacements in 
> the codegen). But whenever the length is greater than 0, we will have to 
> execute the regular expression replacement twice. Since we know we will 
> always calculate `col` at least once, it makes sense to consider that as a 
> subexpression since we might need it again in the branch value. So we can 
> update the logic from:
> Create a subexpression if an expression will always be evaluated at least 
> twice
> To:
> Create a subexpression if an expression will always be evaluated at least 
> once AND will either always or conditionally be evaluated at least twice.
> The trade off is potentially another subexpression function call (for split 
> subexpressions) if the second evaluation doesn't happen, but this seems like 
> it would be worth it for when it is evaluated the second time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions

2021-06-06 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358206#comment-17358206
 ] 

Adam Binford commented on SPARK-35564:
--

Yes that was an example of "will run at least once and maybe more than once" 
that I'm proposing to add more support for in this issue.

An example of current behavior that would be considered a bug is:
{code:java}
spark.range(2).select(coalesce($"id", myUdf($"id")), coalesce($"id" + 1, 
myUdf($"id"))).show()
{code}
myUdf will be pulled out into a subexpression even though it is never executed.

> Support subexpression elimination for non-common branches of conditional 
> expressions
> 
>
> Key: SPARK-35564
> URL: https://issues.apache.org/jira/browse/SPARK-35564
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Adam Binford
>Priority: Major
>
> https://issues.apache.org/jira/browse/SPARK-7 added support for pulling 
> subexpressions out of branches of conditional expressions for expressions 
> present in all branches. We should be able to take this a step further and 
> pull out subexpressions for any branch, as long as that expression will 
> definitely be evaluated at least once.
> Consider a common data validation example:
> {code:java}
> from pyspark.sql.functions import *
> df = spark.createDataFrame([['word'], ['1234']])
> col = regexp_replace('_1', r'\d', '')
> df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code}
> We only want to keep the value if it's non-empty with numbers removed, 
> otherwise we want it to be null. 
> Because we have no otherwise value, `col` is not a candidate for 
> subexpression elimination (you can see two regular expression replacements in 
> the codegen). But whenever the length is greater than 0, we will have to 
> execute the regular expression replacement twice. Since we know we will 
> always calculate `col` at least once, it makes sense to consider that as a 
> subexpression since we might need it again in the branch value. So we can 
> update the logic from:
> Create a subexpression if an expression will always be evaluated at least 
> twice
> To:
> Create a subexpression if an expression will always be evaluated at least 
> once AND will either always or conditionally be evaluated at least twice.
> The trade off is potentially another subexpression function call (for split 
> subexpressions) if the second evaluation doesn't happen, but this seems like 
> it would be worth it for when it is evaluated the second time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions

2021-06-06 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358117#comment-17358117
 ] 

Adam Binford commented on SPARK-35564:
--

Turns out this is already happening for certain when and coalesce expressions. 
For example:
{code:java}
spark.range(2).select(myUdf($"id"), coalesce($"id", myUdf($"id")))
{code}
myUdf gets pulled out as a subexpression even though it might only be executed 
once per row. This can be a correctness issue for very specific edge cases 
similar to https://issues.apache.org/jira/browse/SPARK-35449 where myUdf could 
get executed for a row even though it doesn't pass certain conditional checks

> Support subexpression elimination for non-common branches of conditional 
> expressions
> 
>
> Key: SPARK-35564
> URL: https://issues.apache.org/jira/browse/SPARK-35564
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Adam Binford
>Priority: Major
>
> https://issues.apache.org/jira/browse/SPARK-7 added support for pulling 
> subexpressions out of branches of conditional expressions for expressions 
> present in all branches. We should be able to take this a step further and 
> pull out subexpressions for any branch, as long as that expression will 
> definitely be evaluated at least once.
> Consider a common data validation example:
> {code:java}
> from pyspark.sql.functions import *
> df = spark.createDataFrame([['word'], ['1234']])
> col = regexp_replace('_1', r'\d', '')
> df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code}
> We only want to keep the value if it's non-empty with numbers removed, 
> otherwise we want it to be null. 
> Because we have no otherwise value, `col` is not a candidate for 
> subexpression elimination (you can see two regular expression replacements in 
> the codegen). But whenever the length is greater than 0, we will have to 
> execute the regular expression replacement twice. Since we know we will 
> always calculate `col` at least once, it makes sense to consider that as a 
> subexpression since we might need it again in the branch value. So we can 
> update the logic from:
> Create a subexpression if an expression will always be evaluated at least 
> twice
> To:
> Create a subexpression if an expression will always be evaluated at least 
> once AND will either always or conditionally be evaluated at least twice.
> The trade off is potentially another subexpression function call (for split 
> subexpressions) if the second evaluation doesn't happen, but this seems like 
> it would be worth it for when it is evaluated the second time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions

2021-05-31 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354637#comment-17354637
 ] 

Adam Binford commented on SPARK-35564:
--

A 2x gain would be pretty significant to us, I don't know about others. I'm 
planning to implement this in our fork and if I get good results I'll put up a 
PR for further discussion. Could optionally add a config for this if it's 
workload dependent. Also, the only thing it could likely do to the generated 
code is reduce the overall size, albeit with more functional calls in worst 
cases. Whether smaller code size adds any value, I don't know enough about Java 
to know.

>Oh, this is another issue. I noticed it last time when I worked on another PR 
>recently, but don't have time to look at it yet.

I created https://issues.apache.org/jira/browse/SPARK-35580 to track what I've 
figured out so far. Not sure what the right fix is.

> Support subexpression elimination for non-common branches of conditional 
> expressions
> 
>
> Key: SPARK-35564
> URL: https://issues.apache.org/jira/browse/SPARK-35564
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Adam Binford
>Priority: Major
>
> https://issues.apache.org/jira/browse/SPARK-7 added support for pulling 
> subexpressions out of branches of conditional expressions for expressions 
> present in all branches. We should be able to take this a step further and 
> pull out subexpressions for any branch, as long as that expression will 
> definitely be evaluated at least once.
> Consider a common data validation example:
> {code:java}
> from pyspark.sql.functions import *
> df = spark.createDataFrame([['word'], ['1234']])
> col = regexp_replace('_1', r'\d', '')
> df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code}
> We only want to keep the value if it's non-empty with numbers removed, 
> otherwise we want it to be null. 
> Because we have no otherwise value, `col` is not a candidate for 
> subexpression elimination (you can see two regular expression replacements in 
> the codegen). But whenever the length is greater than 0, we will have to 
> execute the regular expression replacement twice. Since we know we will 
> always calculate `col` at least once, it makes sense to consider that as a 
> subexpression since we might need it again in the branch value. So we can 
> update the logic from:
> Create a subexpression if an expression will always be evaluated at least 
> twice
> To:
> Create a subexpression if an expression will always be evaluated at least 
> once AND will either always or conditionally be evaluated at least twice.
> The trade off is potentially another subexpression function call (for split 
> subexpressions) if the second evaluation doesn't happen, but this seems like 
> it would be worth it for when it is evaluated the second time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35580) Support subexpression elimination for higher order functions

2021-05-31 Thread Adam Binford (Jira)
Adam Binford created SPARK-35580:


 Summary: Support subexpression elimination for higher order 
functions
 Key: SPARK-35580
 URL: https://issues.apache.org/jira/browse/SPARK-35580
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.1
Reporter: Adam Binford


Currently higher order functions are not candidates for subexpression 
elimination. This is because all higher order functions have different semantic 
hashes, due to "exprId" and "value" in "NamedLambdaVariable". These always are 
unique, so the semanticHash of a NamedLambdaVariable is always unique. Also, 
[https://github.com/apache/spark/pull/32424] might throw a wrench in things 
some too, depending on how you define your expressions the name could be 
different.
{code:java}
scala> var d = transform($"a", x => x + 1)
d: org.apache.spark.sql.Column = transform(a, lambdafunction((x_2 + 1), x_2))

scala> var e = transform($"a", x => x + 1)
e: org.apache.spark.sql.Column = transform(a, lambdafunction((x_3 + 1), x_3))

scala> struct(d.alias("1"), d.alias("2")).expr
res9: org.apache.spark.sql.catalyst.expressions.Expression = 
struct(NamePlaceholder, transform('a, lambdafunction((lambda 'x_2 + 1), lambda 
'x_2, false)) AS 1#4, NamePlaceholder, transform('a, lambdafunction((lambda 
'x_2 + 1), lambda 'x_2, false)) AS 2#5)

scala> struct(d.alias("1"), e.alias("2")).expr
res10: org.apache.spark.sql.catalyst.expressions.Expression = 
struct(NamePlaceholder, transform('a, lambdafunction((lambda 'x_2 + 1), lambda 
'x_2, false)) AS 1#6, NamePlaceholder, transform('a, lambdafunction((lambda 
'x_3 + 1), lambda 'x_3, false)) AS 2#7)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions

2021-05-31 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354447#comment-17354447
 ] 

Adam Binford commented on SPARK-35564:
--

>Do you mean "Create a subexpression if an expression will always be evaluated 
>at least once AND will be evaluated at least once in conditional expression"?

Yeah you can think of it that way in terms of adding to existing functionality. 
I was trying to word it in a way that encompassed existing functionality as 
well.

>And this looks like a corner case, so I'm not sure if it is worth to do this.

I don't really think this is much of a corner case, but a common case of using 
a when expression for data validation. Most of our ETL process comes down to 
normalizing, cleaning, and validating strings, which at the end of the day 
usually looks like:
{code:java}
column = normalize_value(col('my_raw_value'))
result = when(column != '', column){code}
where "normalize_value" usually involves some combination of regexp_repace's, 
lower/upper, and trim.

And things get worse when you are dealing with arrays of strings and want to 
minimize your data:
{code:java}
column = filter(transform(col('my_raw_array_value'), lambda x: 
normalize_value(x)), lambda x: x != '')
result = when(size(column) > 0, column){code}
though currently higher order functions are always semantically different so 
they don't get subexpressions regardless I think. That's something I plan to 
look into as a follow up.

It's natural for users to think that these expressions only get evaluated once, 
and not that they are doubling their runtime trying to clean their data. To me 
the edge case is creating a subexpression in this case decreasing throughput. 
It would require a very large percentage of the rows to not pass the 
conditional check, since the additional calculation is much more expensive than 
the additional function call. I'm playing around with an implementation so 
we'll see how far I can get with it.

 

> Support subexpression elimination for non-common branches of conditional 
> expressions
> 
>
> Key: SPARK-35564
> URL: https://issues.apache.org/jira/browse/SPARK-35564
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Adam Binford
>Priority: Major
>
> https://issues.apache.org/jira/browse/SPARK-7 added support for pulling 
> subexpressions out of branches of conditional expressions for expressions 
> present in all branches. We should be able to take this a step further and 
> pull out subexpressions for any branch, as long as that expression will 
> definitely be evaluated at least once.
> Consider a common data validation example:
> {code:java}
> from pyspark.sql.functions import *
> df = spark.createDataFrame([['word'], ['1234']])
> col = regexp_replace('_1', r'\d', '')
> df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code}
> We only want to keep the value if it's non-empty with numbers removed, 
> otherwise we want it to be null. 
> Because we have no otherwise value, `col` is not a candidate for 
> subexpression elimination (you can see two regular expression replacements in 
> the codegen). But whenever the length is greater than 0, we will have to 
> execute the regular expression replacement twice. Since we know we will 
> always calculate `col` at least once, it makes sense to consider that as a 
> subexpression since we might need it again in the branch value. So we can 
> update the logic from:
> Create a subexpression if an expression will always be evaluated at least 
> twice
> To:
> Create a subexpression if an expression will always be evaluated at least 
> once AND will either always or conditionally be evaluated at least twice.
> The trade off is potentially another subexpression function call (for split 
> subexpressions) if the second evaluation doesn't happen, but this seems like 
> it would be worth it for when it is evaluated the second time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35564) Support subexpression elimination for non-common branches of conditional expressions

2021-05-29 Thread Adam Binford (Jira)
Adam Binford created SPARK-35564:


 Summary: Support subexpression elimination for non-common branches 
of conditional expressions
 Key: SPARK-35564
 URL: https://issues.apache.org/jira/browse/SPARK-35564
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.1
Reporter: Adam Binford


https://issues.apache.org/jira/browse/SPARK-7 added support for pulling 
subexpressions out of branches of conditional expressions for expressions 
present in all branches. We should be able to take this a step further and pull 
out subexpressions for any branch, as long as that expression will definitely 
be evaluated at least once.

Consider a common data validation example:
{code:java}
from pyspark.sql.functions import *
df = spark.createDataFrame([['word'], ['1234']])
col = regexp_replace('_1', r'\d', '')
df = df.withColumn('numbers_removed', when(length(col) > 0, col)){code}
We only want to keep the value if it's non-empty with numbers removed, 
otherwise we want it to be null. 

Because we have no otherwise value, `col` is not a candidate for subexpression 
elimination (you can see two regular expression replacements in the codegen). 
But whenever the length is greater than 0, we will have to execute the regular 
expression replacement twice. Since we know we will always calculate `col` at 
least once, it makes sense to consider that as a subexpression since we might 
need it again in the branch value. So we can update the logic from:

Create a subexpression if an expression will always be evaluated at least twice

To:

Create a subexpression if an expression will always be evaluated at least once 
AND will either always or conditionally be evaluated at least twice.

The trade off is potentially another subexpression function call (for split 
subexpressions) if the second evaluation doesn't happen, but this seems like it 
would be worth it for when it is evaluated the second time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35449) Should not extract common expressions from value expressions when elseValue is empty in CaseWhen

2021-05-23 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-35449:
-
Description: 
[https://github.com/apache/spark/pull/30245]  added support for creating 
subexpressions that are present in all branches of conditional statements. 
However, for a statement to be in "all branches" of a CaseWhen statement, it 
must also be in the elseValue. This can lead to a subexpression to be created 
and run for branches of a conditional that don't pass. This can cause issues 
especially with a UDF in a branch that gets executed assuming the condition is 
true. For example:
{code:java}
val col = when($"id" < 0, myUdf($"id"))
spark.range(1).select(when(col > 0, col)).show()
{code}
myUdf($"id") gets extracted as a subexpression and executed even though both 
conditions don't pass and it should never be executed.

> Should not extract common expressions from value expressions when elseValue 
> is empty in CaseWhen
> 
>
> Key: SPARK-35449
> URL: https://issues.apache.org/jira/browse/SPARK-35449
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: L. C. Hsieh
>Priority: Major
>
> [https://github.com/apache/spark/pull/30245]  added support for creating 
> subexpressions that are present in all branches of conditional statements. 
> However, for a statement to be in "all branches" of a CaseWhen statement, it 
> must also be in the elseValue. This can lead to a subexpression to be created 
> and run for branches of a conditional that don't pass. This can cause issues 
> especially with a UDF in a branch that gets executed assuming the condition 
> is true. For example:
> {code:java}
> val col = when($"id" < 0, myUdf($"id"))
> spark.range(1).select(when(col > 0, col)).show()
> {code}
> myUdf($"id") gets extracted as a subexpression and executed even though both 
> conditions don't pass and it should never be executed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-35448) Subexpression elimination enhancements

2021-05-19 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347976#comment-17347976
 ] 

Adam Binford edited comment on SPARK-35448 at 5/19/21, 11:21 PM:
-

Just spitballing after looking through some of the subexpression elimination 
code. Does it make any sense to treat the output of EquivalentExpressions as 
"candidates" for subexpressions, and then it's the codegen's job to figure out 
which subexpressions are actually needed and used and include the code for them?


was (Author: kimahriman):
Just spitballing after looking through some of the subexpression elimination 
code. Does it make more sense to treat the output of EquivalentExpressions as 
"candidates" for subexpressions, and then it's the codegen's job to figure out 
which subexpressions are actually needed and used and include the code for them?

> Subexpression elimination enhancements
> --
>
> Key: SPARK-35448
> URL: https://issues.apache.org/jira/browse/SPARK-35448
> Project: Spark
>  Issue Type: Umbrella
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: L. C. Hsieh
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35448) Subexpression elimination enhancements

2021-05-19 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347976#comment-17347976
 ] 

Adam Binford commented on SPARK-35448:
--

Just spitballing after looking through some of the subexpression elimination 
code. Does it make more sense to treat the output of EquivalentExpressions as 
"candidates" for subexpressions, and then it's the codegen's job to figure out 
which subexpressions are actually needed and used and include the code for them?

> Subexpression elimination enhancements
> --
>
> Key: SPARK-35448
> URL: https://issues.apache.org/jira/browse/SPARK-35448
> Project: Spark
>  Issue Type: Umbrella
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: L. C. Hsieh
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35256) Subexpression elimination leading to a performance regression

2021-05-19 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347974#comment-17347974
 ] 

Adam Binford commented on SPARK-35256:
--

See https://issues.apache.org/jira/browse/SPARK-35448 for a few related issues 
for subexpression elimination being worked. Specifically 
https://issues.apache.org/jira/browse/SPARK-35410 is probably the main issue 
you're facing where `when` expressions can cause a lot of extra, unused 
subexpressions to be included in the codegen

> Subexpression elimination leading to a performance regression
> -
>
> Key: SPARK-35256
> URL: https://issues.apache.org/jira/browse/SPARK-35256
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Ondrej Kokes
>Priority: Minor
> Attachments: bisect_log.txt, bisect_timing.csv
>
>
> I'm seeing almost double the runtime between 3.0.1 and 3.1.1 in my pipeline 
> that does mostly str_to_map, split and a few other operations - all 
> projections, no joins or aggregations (it's here only to trigger the 
> pipeline). I cut it down to the simplest reproducible example I could - 
> anything I remove from this changes the runtime difference quite 
> dramatically. (even moving those two expressions from f.when to standalone 
> columns makes the difference disappear)
> {code:java}
> import time
> import os
> import pyspark  
> from pyspark.sql import SparkSession
> import pyspark.sql.functions as f
> if __name__ == '__main__':
> print(pyspark.__version__)
> spark = SparkSession.builder.getOrCreate()
> filename = 'regression.csv'
> if not os.path.isfile(filename):
> with open(filename, 'wt') as fw:
> fw.write('foo\n')
> for _ in range(10_000_000):
> fw.write('foo=bar=bak=f,o,1:2:3\n')
> df = spark.read.option('header', True).csv(filename)
> t = time.time()
> dd = (df
> .withColumn('my_map', f.expr('str_to_map(foo, "&", "=")'))
> .withColumn('extracted',
> # without this top level split it is only 50% 
> slower, with it
> # the runtime almost doubles
> f.split(f.split(f.col("my_map")["bar"], ",")[2], 
> ":")[0]
>)
> .select(
> f.when(
> f.col("extracted").startswith("foo"), f.col("extracted")
> ).otherwise(
> f.concat(f.lit("foo"), f.col("extracted"))
> ).alias("foo")
> )
> )
> # dd.explain(True)
> _ = dd.groupby("foo").count().count()
> print("elapsed", time.time() - t)
> {code}
> Running this in 3.0.1 and 3.1.1 respectively (both installed from PyPI, on my 
> local macOS)
> {code:java}
> 3.0.1
> elapsed 21.262351036071777
> 3.1.1
> elapsed 40.26582884788513
> {code}
> (Meaning the transformation took 21 seconds in 3.0.1 and 40 seconds in 3.1.1)
> Feel free to make the CSV smaller to get a quicker feedback loop - it scales 
> linearly (I developed this with 2M rows).
> It might be related to my previous issue - SPARK-32989 - there are similar 
> operations, nesting etc. (splitting on the original column, not on a map, 
> makes the difference disappear)
> I tried dissecting the queries in SparkUI and via explain, but both 3.0.1 and 
> 3.1.1 produced identical plans.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35410) Unused subexpressions leftover in WholeStageCodegen subexpression elimination

2021-05-15 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345044#comment-17345044
 ] 

Adam Binford commented on SPARK-35410:
--

[~viirya], thanks for adding all these great new features! Sorry I keep finding 
weird corner cases with them, but it's because we're heavily making use of them.

> Unused subexpressions leftover in WholeStageCodegen subexpression elimination
> -
>
> Key: SPARK-35410
> URL: https://issues.apache.org/jira/browse/SPARK-35410
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Adam Binford
>Priority: Major
> Attachments: codegen.txt
>
>
> Trying to understand and debug the performance of some of our jobs, I started 
> digging into what the whole stage codegen code was doing. We use a lot of 
> case when statements, and I found that there were a lot of unused sub 
> expressions that were left over after the subexpression elimination, and it 
> gets worse the more expressions you have chained. The simple example:
> {code:java}
> import org.apache.spark.sql.functions._
> import spark.implicits._
> val myUdf = udf((s: String) => {
>   println("In UDF")
>   s.toUpperCase
> })
> spark.range(5).select(when(length(myUdf($"id")) > 0, 
> length(myUdf($"id".show()
> {code}
> Running the code, you'll see "In UDF" printed out 10 times. And if you change 
> both to log(length(myUdf($"id")), "In UDF" will print out 20 times (one more 
> for a cast from int to double and one more for the actual log calculation I 
> think).
> In the codegen for this (without the log), there are these initial 
> subexpressions:
> {code:java}
> /* 076 */ UTF8String project_subExprValue_0 = 
> project_subExpr_0(project_expr_0_0);
> /* 077 */ int project_subExprValue_1 = 
> project_subExpr_1(project_expr_0_0);
> /* 078 */ UTF8String project_subExprValue_2 = 
> project_subExpr_2(project_expr_0_0);
> {code}
> project_subExprValue_0 and project_subExprValue_2 are never actually used, so 
> it's properly resolving the two expressions and sharing the result of 
> project_subExprValue_1, but it's not removing the other sub expression calls 
> it seems like.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35410) Unused subexpressions leftover in WholeStageCodegen subexpression elimination

2021-05-15 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-35410:
-
Attachment: codegen.txt

> Unused subexpressions leftover in WholeStageCodegen subexpression elimination
> -
>
> Key: SPARK-35410
> URL: https://issues.apache.org/jira/browse/SPARK-35410
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Adam Binford
>Priority: Major
> Attachments: codegen.txt
>
>
> Trying to understand and debug the performance of some of our jobs, I started 
> digging into what the whole stage codegen code was doing. We use a lot of 
> case when statements, and I found that there were a lot of unused sub 
> expressions that were left over after the subexpression elimination, and it 
> gets worse the more expressions you have chained. The simple example:
> {code:java}
> import org.apache.spark.sql.functions._
> import spark.implicits._
> val myUdf = udf((s: String) => {
>   println("In UDF")
>   s.toUpperCase
> })
> spark.range(5).select(when(length(myUdf($"id")) > 0, 
> length(myUdf($"id".show()
> {code}
> Running the code, you'll see "In UDF" printed out 10 times. And if you change 
> both to log(length(myUdf($"id")), "In UDF" will print out 20 times (one more 
> for a cast from int to double and one more for the actual log calculation I 
> think).
> In the codegen for this (without the log), there are these initial 
> subexpressions:
> {code:java}
> /* 076 */ UTF8String project_subExprValue_0 = 
> project_subExpr_0(project_expr_0_0);
> /* 077 */ int project_subExprValue_1 = 
> project_subExpr_1(project_expr_0_0);
> /* 078 */ UTF8String project_subExprValue_2 = 
> project_subExpr_2(project_expr_0_0);
> {code}
> project_subExprValue_0 and project_subExprValue_2 are never actually used, so 
> it's properly resolving the two expressions and sharing the result of 
> project_subExprValue_1, but it's not removing the other sub expression calls 
> it seems like.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35410) Unused subexpressions leftover in WholeStageCodegen subexpression elimination

2021-05-15 Thread Adam Binford (Jira)
Adam Binford created SPARK-35410:


 Summary: Unused subexpressions leftover in WholeStageCodegen 
subexpression elimination
 Key: SPARK-35410
 URL: https://issues.apache.org/jira/browse/SPARK-35410
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.1
Reporter: Adam Binford


Trying to understand and debug the performance of some of our jobs, I started 
digging into what the whole stage codegen code was doing. We use a lot of case 
when statements, and I found that there were a lot of unused sub expressions 
that were left over after the subexpression elimination, and it gets worse the 
more expressions you have chained. The simple example:
{code:java}
import org.apache.spark.sql.functions._
import spark.implicits._
val myUdf = udf((s: String) => {
  println("In UDF")
  s.toUpperCase
})
spark.range(5).select(when(length(myUdf($"id")) > 0, 
length(myUdf($"id".show()
{code}
Running the code, you'll see "In UDF" printed out 10 times. And if you change 
both to log(length(myUdf($"id")), "In UDF" will print out 20 times (one more 
for a cast from int to double and one more for the actual log calculation I 
think).

In the codegen for this (without the log), there are these initial 
subexpressions:
{code:java}
/* 076 */ UTF8String project_subExprValue_0 = 
project_subExpr_0(project_expr_0_0);
/* 077 */ int project_subExprValue_1 = project_subExpr_1(project_expr_0_0);
/* 078 */ UTF8String project_subExprValue_2 = 
project_subExpr_2(project_expr_0_0);
{code}
project_subExprValue_0 and project_subExprValue_2 are never actually used, so 
it's properly resolving the two expressions and sharing the result of 
project_subExprValue_1, but it's not removing the other sub expression calls it 
seems like.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35309) array transform should respect alias in expression

2021-05-04 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338966#comment-17338966
 ] 

Adam Binford commented on SPARK-35309:
--

You want "x" to be "struct_alias"? That should be
{code:java}
struct(col.alias("struct_alias")){code}

> array transform should respect alias in expression
> --
>
> Key: SPARK-35309
> URL: https://issues.apache.org/jira/browse/SPARK-35309
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Derk Crezee
>Priority: Major
>
> When an array is transformed using the transform function in combination with 
> an alias, I get an unexpected column name.
> {code:scala}
> // code placeholder
> val data = Seq((Array(1, 2, 3)))
> val df = sc.parallelize(data).toDF("a")
> val dfRes = df.select(
>   transform(
> $"a",
> (col: Column) => struct(col).alias("struct_alias")
>   ).alias("a")
> )
> dfRes.printSchema
> // root
> //  |-- a: array (nullable = true)
> //  ||-- element: struct (containsNull = false)
> //  |||-- x: integer (nullable = false)
> {code}
> I expected the inner element to have the name 'struct_alias'.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35290) unionByName with null filling fails for some nested structs

2021-05-04 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338921#comment-17338921
 ] 

Adam Binford commented on SPARK-35290:
--

Running into some issues trying to get case insensitivity working correctly. I 
was hoping to be able to leave everything on both sides of the union in it's 
existing casing, and just get things in the right order, but I'm running into 
this issue:
{code:java}
>>> from pyspark.sql.functions import *
>>> df1 = spark.range(1).withColumn('top', struct(lit('A').alias('A')))
>>> df2 = spark.range(1).withColumn('top', struct(lit('a').alias('a')))
>>> spark.conf.set('spark.sql.caseSensitive', 'true')
...
pyspark.sql.utils.AnalysisException: Union can only be performed on tables with 
the compatible column types. struct <> struct at the second 
column of the second table;

>>> spark.conf.set('spark.sql.caseSensitive', 'false')
>>> df1.union(df2)
DataFrame[id: bigint, top: struct]
>>> df1.unionByName(df2)
DataFrame[id: bigint, top: struct]
>>> df1.unionByName(df2, True)
DataFrame[id: bigint, top: struct]
{code}
With case sensitivity enabled, it errors out as expected because the two 
structs are different types. However, when case sensitivity is disabled, the 
union is happy because it sees them as the same type, but when the schemas are 
merged, it treats them as two separate fields. I assume it's related to the 
StructType.merge method, but I don't exactly know where that gets called in the 
context of a Union. I don't see anything in that merge function that handles 
case insensitivity. Is that a bug in itself or a feature?

> unionByName with null filling fails for some nested structs
> ---
>
> Key: SPARK-35290
> URL: https://issues.apache.org/jira/browse/SPARK-35290
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Adam Binford
>Priority: Major
>
> We've encountered a few weird edge cases that seem to fail the new null 
> filling unionByName (which has been a great addition!). It seems to stem from 
> the fields being sorted by name and corrupted along the way. The simple 
> reproduction is:
> {code:java}
> df = spark.createDataFrame([[]])
> df1 = (df
> .withColumn('top', F.struct(
> F.struct(
> F.lit('ba').alias('ba')
> ).alias('b')
> ))
> )
> df2 = (df
> .withColumn('top', F.struct(
> F.struct(
> F.lit('aa').alias('aa')
> ).alias('a'),
> F.struct(
> F.lit('bb').alias('bb')
> ).alias('b'),
> ))
> )
> df1.unionByName(df2, True).printSchema()
> {code}
> This results in the exception:
> {code:java}
> pyspark.sql.utils.AnalysisException: Union can only be performed on tables 
> with the compatible column types. 
> struct,b:struct> <> 
> struct,b:struct> at the first column 
> of the second table;
> {code}
> You can see in the second schema that it has 
> {code:java}
> b:struct
> {code}
> when it should be
> {code:java}
> b:struct
> {code}
> It seems to happen somewhere during 
> [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala#L73,]
>  as everything seems correct up to that point from my testing. It's either 
> modifying one expression during the transformUp then corrupts other 
> expressions that are then modified, or the ExtractValue before the 
> addFieldsInto is remembering the ordinal position in the struct that is then 
> changing and causing issues.
>  
> I found that simply using sortStructFields instead of 
> sortStructFieldsInWithFields gets things working correctly, but definitely 
> has a performance impact. The deep expr unionByName test takes ~1-2 seconds 
> normally but ~12-15 seconds with this change. I assume because the original 
> method tried to rewrite existing expressions vs the sortStructFields just 
> adds expressions on top of existing ones to project the new order.
> I'm not sure if it makes sense to take the slower but works in the edge cases 
> method (assuming it doesn't break other cases, all existing tests pass), or 
> if there's a way to fix the existing method for cases like this.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35290) unionByName with null filling fails for some nested structs

2021-05-03 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338526#comment-17338526
 ] 

Adam Binford commented on SPARK-35290:
--

I've also been playing around with rewriting some of the logic to just directly 
recursively create a named struct and taking out the need for the 
UpdateField/WithField logic. I've gotten all existing tests to pass (including 
a new one for this case), without the 12-15 second overhead mentioned in the 
description for the deeply nested case, but I think there's still some case 
insensitivity I might need to take care of. I can put up a PR soon with what 
that looks like.

> unionByName with null filling fails for some nested structs
> ---
>
> Key: SPARK-35290
> URL: https://issues.apache.org/jira/browse/SPARK-35290
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Adam Binford
>Priority: Major
>
> We've encountered a few weird edge cases that seem to fail the new null 
> filling unionByName (which has been a great addition!). It seems to stem from 
> the fields being sorted by name and corrupted along the way. The simple 
> reproduction is:
> {code:java}
> df = spark.createDataFrame([[]])
> df1 = (df
> .withColumn('top', F.struct(
> F.struct(
> F.lit('ba').alias('ba')
> ).alias('b')
> ))
> )
> df2 = (df
> .withColumn('top', F.struct(
> F.struct(
> F.lit('aa').alias('aa')
> ).alias('a'),
> F.struct(
> F.lit('bb').alias('bb')
> ).alias('b'),
> ))
> )
> df1.unionByName(df2, True).printSchema()
> {code}
> This results in the exception:
> {code:java}
> pyspark.sql.utils.AnalysisException: Union can only be performed on tables 
> with the compatible column types. 
> struct,b:struct> <> 
> struct,b:struct> at the first column 
> of the second table;
> {code}
> You can see in the second schema that it has 
> {code:java}
> b:struct
> {code}
> when it should be
> {code:java}
> b:struct
> {code}
> It seems to happen somewhere during 
> [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala#L73,]
>  as everything seems correct up to that point from my testing. It's either 
> modifying one expression during the transformUp then corrupts other 
> expressions that are then modified, or the ExtractValue before the 
> addFieldsInto is remembering the ordinal position in the struct that is then 
> changing and causing issues.
>  
> I found that simply using sortStructFields instead of 
> sortStructFieldsInWithFields gets things working correctly, but definitely 
> has a performance impact. The deep expr unionByName test takes ~1-2 seconds 
> normally but ~12-15 seconds with this change. I assume because the original 
> method tried to rewrite existing expressions vs the sortStructFields just 
> adds expressions on top of existing ones to project the new order.
> I'm not sure if it makes sense to take the slower but works in the edge cases 
> method (assuming it doesn't break other cases, all existing tests pass), or 
> if there's a way to fix the existing method for cases like this.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35290) unionByName with null filling fails for some nested structs

2021-05-02 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-35290:
-
Description: 
We've encountered a few weird edge cases that seem to fail the new null filling 
unionByName (which has been a great addition!). It seems to stem from the 
fields being sorted by name and corrupted along the way. The simple 
reproduction is:
{code:java}
df = spark.createDataFrame([[]])
df1 = (df
.withColumn('top', F.struct(
F.struct(
F.lit('ba').alias('ba')
).alias('b')
))
)
df2 = (df
.withColumn('top', F.struct(
F.struct(
F.lit('aa').alias('aa')
).alias('a'),
F.struct(
F.lit('bb').alias('bb')
).alias('b'),
))
)
df1.unionByName(df2, True).printSchema()
{code}
This results in the exception:
{code:java}
pyspark.sql.utils.AnalysisException: Union can only be performed on tables with 
the compatible column types. 
struct,b:struct> <> 
struct,b:struct> at the first column 
of the second table;
{code}
You can see in the second schema that it has 
{code:java}
b:struct
{code}
when it should be
{code:java}
b:struct
{code}
It seems to happen somewhere during 
[https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala#L73,]
 as everything seems correct up to that point from my testing. It's either 
modifying one expression during the transformUp then corrupts other expressions 
that are then modified, or the ExtractValue before the addFieldsInto is 
remembering the ordinal position in the struct that is then changing and 
causing issues.

 

I found that simply using sortStructFields instead of 
sortStructFieldsInWithFields gets things working correctly, but definitely has 
a performance impact. The deep expr unionByName test takes ~1-2 seconds 
normally but ~12-15 seconds with this change. I assume because the original 
method tried to rewrite existing expressions vs the sortStructFields just adds 
expressions on top of existing ones to project the new order.

I'm not sure if it makes sense to take the slower but works in the edge cases 
method (assuming it doesn't break other cases, all existing tests pass), or if 
there's a way to fix the existing method for cases like this.

 

  was:
We've encountered a few weird edge cases that seem to fail the new null filling 
unionByName (which has been a great addition!). It seems to stem from the 
fields being sorted by name and corrupted along the way. The simple 
reproduction is:

 
{code:java}
df = spark.createDataFrame([[]])
df1 = (df
.withColumn('top', F.struct(
F.struct(
F.lit('ba').alias('ba')
).alias('b')
))
)
df2 = (df
.withColumn('top', F.struct(
F.struct(
F.lit('aa').alias('aa')
).alias('a'),
F.struct(
F.lit('bb').alias('bb')
).alias('b'),
))
)
df1.unionByName(df2, True).printSchema()
{code}
This results in the exception:

 

 
{code:java}
pyspark.sql.utils.AnalysisException: Union can only be performed on tables with 
the compatible column types. 
struct,b:struct> <> 
struct,b:struct> at the first column 
of the second table;
{code}
You can see in the second schema that it has 

 

 
{code:java}
b:struct
{code}
when it should be

 

 
{code:java}
b:struct
{code}
It seems to happen somewhere during 
[https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala#L73,]
 as everything seems correct up to that point from my testing. It's either 
modifying one expression during the transformUp then corrupts other expressions 
that are then modified, or the ExtractValue before the addFieldsInto is 
remembering the ordinal position in the struct that is then changing and 
causing issues.

 

I found that simply using sortStructFields instead of 
sortStructFieldsInWithFields gets things working correctly, but definitely has 
a performance impact. The deep expr unionByName test takes ~1-2 seconds 
normally but ~12-15 seconds with this change. I assume because the original 
method tried to rewrite existing expressions vs the sortStructFields just adds 
expressions on top of existing ones to project the new order.

I'm not sure if it makes sense to take the slower but works in the edge cases 
method (assuming it doesn't break other cases, all existing tests pass), or if 
there's a way to fix the existing method for cases like this.

 


> unionByName with null filling fails for some nested structs
> ---
>
> Key: SPARK-35290
> URL: https://issues.apache.org/jira/browse/SPARK-35290
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Adam Binford
>

[jira] [Created] (SPARK-35290) unionByName with null filling fails for some nested structs

2021-05-02 Thread Adam Binford (Jira)
Adam Binford created SPARK-35290:


 Summary: unionByName with null filling fails for some nested 
structs
 Key: SPARK-35290
 URL: https://issues.apache.org/jira/browse/SPARK-35290
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.1
Reporter: Adam Binford


We've encountered a few weird edge cases that seem to fail the new null filling 
unionByName (which has been a great addition!). It seems to stem from the 
fields being sorted by name and corrupted along the way. The simple 
reproduction is:

 
{code:java}
df = spark.createDataFrame([[]])
df1 = (df
.withColumn('top', F.struct(
F.struct(
F.lit('ba').alias('ba')
).alias('b')
))
)
df2 = (df
.withColumn('top', F.struct(
F.struct(
F.lit('aa').alias('aa')
).alias('a'),
F.struct(
F.lit('bb').alias('bb')
).alias('b'),
))
)
df1.unionByName(df2, True).printSchema()
{code}
This results in the exception:

 

 
{code:java}
pyspark.sql.utils.AnalysisException: Union can only be performed on tables with 
the compatible column types. 
struct,b:struct> <> 
struct,b:struct> at the first column 
of the second table;
{code}
You can see in the second schema that it has 

 

 
{code:java}
b:struct
{code}
when it should be

 

 
{code:java}
b:struct
{code}
It seems to happen somewhere during 
[https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala#L73,]
 as everything seems correct up to that point from my testing. It's either 
modifying one expression during the transformUp then corrupts other expressions 
that are then modified, or the ExtractValue before the addFieldsInto is 
remembering the ordinal position in the struct that is then changing and 
causing issues.

 

I found that simply using sortStructFields instead of 
sortStructFieldsInWithFields gets things working correctly, but definitely has 
a performance impact. The deep expr unionByName test takes ~1-2 seconds 
normally but ~12-15 seconds with this change. I assume because the original 
method tried to rewrite existing expressions vs the sortStructFields just adds 
expressions on top of existing ones to project the new order.

I'm not sure if it makes sense to take the slower but works in the edge cases 
method (assuming it doesn't break other cases, all existing tests pass), or if 
there's a way to fix the existing method for cases like this.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35213) Corrupt DataFrame for certain withField patterns

2021-04-24 Thread Adam Binford (Jira)
Adam Binford created SPARK-35213:


 Summary: Corrupt DataFrame for certain withField patterns
 Key: SPARK-35213
 URL: https://issues.apache.org/jira/browse/SPARK-35213
 Project: Spark
  Issue Type: Bug
  Components: Optimizer, SQL
Affects Versions: 3.1.1
Reporter: Adam Binford


We encountered a very weird bug heavily using withField in production with the 
Spark 3.1.1. Jobs were dying with a lot of very weird JVM crashing errors (like 
jshort_disjoint_arraycopy during a copyMemory call), and occasional 
NegativeArraySize exceptions. We finally found a work around by ordering our 
withField calls in a certain way, and I was finally able to create some minimal 
examples to reproduce similar weird/broken behavior.

It seems to stem from the optimizations added in 
[https://github.com/apache/spark/pull/29812.] Because the same new optimization 
was added as an analyzer, there seems to be two different ways this issue can 
crop up, once at analysis time and once at runtime.

While these examples might seem odd, they represent how we've created a helper 
function that can create columns in arbitrary nested fields even if the 
intermediate fields don't exist yet.

Example of what I assume is an issue during analysis:
{code:java}
import pyspark.sql.functions as F 
df = spark.range(1).withColumn('data', F.struct()
.withField('a', F.struct())
.withField('b', F.struct())
.withField('a.aa', F.lit('aa'))
.withField('b.ba', F.lit('ba'))
.withField('a.ab', F.lit('ab'))
.withField('b.bb', F.lit('bb'))
.withField('a.ac', F.lit('ac'))
)
df.printSchema(){code}
Output schema:
{code:java}
root
 |-- id: long (nullable = false)
 |-- data: struct (nullable = false)
 | |-- b: struct (nullable = false)
 | | |-- aa: string (nullable = false)
 | | |-- ab: string (nullable = false)
 | | |-- bb: string (nullable = false)
 | |-- a: struct (nullable = false)
 | | |-- aa: string (nullable = false)
 | | |-- ab: string (nullable = false)
 | | |-- ac: string (nullable = false){code}
And an example of runtime data issue:
{code:java}
df = (spark.range(1)
 .withColumn('data', F.struct()
   .withField('a', F.struct().withField('aa', F.lit('aa')))
   .withField('b', F.struct().withField('ba', F.lit('ba')))
 )
 .withColumn('data', F.col('data').withField('b.bb', F.lit('bb')))
 .withColumn('data', F.col('data').withField('a.ab', F.lit('ab')))
)
df.printSchema()
df.groupBy('data.a.aa', 'data.a.ab', 'data.b.ba', 'data.b.bb').count().show()
{code}
 Output:
{code:java}
root
 |-- id: long (nullable = false)
 |-- data: struct (nullable = false)
 | |-- a: struct (nullable = false)
 | | |-- aa: string (nullable = false)
 | | |-- ab: string (nullable = false)
 | |-- b: struct (nullable = false)
 | | |-- ba: string (nullable = false)
 | | |-- bb: string (nullable = false)
+---+---+---+---+-+
| aa| ab| ba| bb|count|
+---+---+---+---+-+
| ba| bb| aa| ab|1|
+---+---+---+---+-+
{code}
The columns have the wrong data in them, even though the schema is correct. 
Additionally, if you add another column you get an exception:
{code:java}
df = (spark.range(1)
 .withColumn('data', F.struct()
   .withField('a', F.struct().withField('aa', F.lit('aa')))
   .withField('b', F.struct().withField('ba', F.lit('ba')))
 )
 .withColumn('data', F.col('data').withField('a.ab', F.lit('ab')))
 .withColumn('data', F.col('data').withField('b.bb', F.lit('bb')))
 .withColumn('data', F.col('data').withField('a.ac', F.lit('ac')))
)
df.groupBy('data.a.aa', 'data.a.ab', 'data.a.ac', 'data.b.ba', 
'data.b.bb').count().show()
java.lang.ArrayIndexOutOfBoundsException: 2 at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:201)
 at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35)
 at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
 at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
 at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doConsume_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
{code}
 But if you reorder the withField expressions, you get correct behavior:
{code:java}
df = (spark.range(1)
 .withColumn('data', F.struct()
   .withField('a', F.struct().withField('aa', F.lit('aa')))
   .withField('b', F.struct().withField('ba', F.lit('ba')))
 )
 .withColumn('data', F.col('data').withField('a.ab', F.lit('ab')))
 .withColumn('data', F.col('data').withField('a.ac', F.lit('ac')))
 

[jira] [Created] (SPARK-35117) UI progress bar no longer highlights in progress tasks

2021-04-17 Thread Adam Binford (Jira)
Adam Binford created SPARK-35117:


 Summary: UI progress bar no longer highlights in progress tasks
 Key: SPARK-35117
 URL: https://issues.apache.org/jira/browse/SPARK-35117
 Project: Spark
  Issue Type: Bug
  Components: Web UI
Affects Versions: 3.1.1
Reporter: Adam Binford


The Spark UI was updated to Bootstrap 4, and during the update the progress bar 
in the UI was updated to highlight the whole bar once any tasks were in 
progress, versus highlighting just the number of tasks that were in progress. 
The was a great visual queue of seeing what percentage of the stage/job was 
currently being worked on, and it'd be great to get that functionality back.

The change can be found here: 
https://github.com/apache/spark/pull/27370/files#diff-809c93c57cc59e5fe3c3eb54a24aa96a38147d02323f3e690ae6b5309a3284d2L448



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33133) History server fails when loading invalid rolling event logs

2020-10-20 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217656#comment-17217656
 ] 

Adam Binford commented on SPARK-33133:
--

Now that the error on load is fixed, the next question is why invalid rolling 
log directories get created in the first place. I've seen two main types of 
occurrences of this:
 * The event log folder is just completely empty. It seems to be some way the 
an application fails to start up and creates the directory but never makes it 
to the point of creating the actual file. Not sure the exact circumstances to 
recreate.
 * The event log folder for a long running active job gets cleaned up, and the 
next time the event log is rolled, it recreates the folder automatically 
(because HDFS creates intermediate directories if they don't exist), with the 
wrong directory permissions, and without an appstatus file.

I'm going to focus on the second case here. I'm not sure if the way the cleanup 
logic is done that something about the rolling file causes it to not think 
there are updates. Part of the check involves the "fileSizeForLastIndex", so 
I'm not sure if a new rolled log file causes issues with this if the log size 
doesn't strictly increase? I can't fully describe the logic in 
"shouldReloadLog".

Regardless of how it happens, one potential fix is instead of only creating the 
directory with the correct permissions and the appstatus file in the "start" 
method, is to make sure both exist every time "rollEventLogFile" is called.

> History server fails when loading invalid rolling event logs
> 
>
> Key: SPARK-33133
> URL: https://issues.apache.org/jira/browse/SPARK-33133
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Adam Binford
>Priority: Major
>
> We have run into an issue where our history server fails to load new 
> applications, and when restarted, fails to load any applications at all. This 
> happens when it encounters invalid rolling event log files. We encounter this 
> with long running streaming applications. There seems to be two issues here 
> that lead to problems:
>  * It looks like our long running streaming applications event log directory 
> is being cleaned up. The next time the application logs event data, it 
> recreates the event log directory but without recreating the "appstatus" 
> file. I don't know the full extent of this behavior or if something "wrong" 
> is happening here.
>  * The history server then reads this new folder, and throws an exception 
> because the "appstatus" file doesn't exist in the rolling event log folder. 
> This exception breaks the entire listing process, so no new applications will 
> be read, and if restarted no applications at all will be read.
> There seems like a couple ways to go about fixing this, and I'm curious 
> anyone's thoughts who knows more about how the history server works, 
> specifically with rolling event logs:
>  * Don't completely fail checking for new applications if one bad rolling 
> event log folder is encountered. This seems like the simplest fix and makes 
> sense to me, it already checks for a few other errors and ignores them. It 
> doesn't necessarily fix the underlying issue that leads to this happening 
> though.
>  * Figure out why the in progress event log folder is being deleted and make 
> sure that doesn't happen. Maybe this is supposed to happen? Or maybe we don't 
> want to delete the top level folder and only delete event log files within 
> the folders? Again I don't know the exact current behavior here with this.
>  * When writing new event log data, make sure the folder and appstatus file 
> exist every time, creating them again if not.
> Here's the stack trace we encounter when this happens, from 3.0.1 with a 
> couple extra MRs backported that I hoped would fix the issue:
> {{2020-10-13 12:10:31,751 ERROR history.FsHistoryProvider: Exception in 
> checking for event log updates2020-10-13 12:10:31,751 ERROR 
> history.FsHistoryProvider: Exception in checking for event log 
> updatesjava.lang.IllegalArgumentException: requirement failed: Log directory 
> must contain an appstatus file! at scala.Predef$.require(Predef.scala:281) at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files$lzycompute(EventLogFileReaders.scala:214)
>  at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files(EventLogFileReaders.scala:211)
>  at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles$lzycompute(EventLogFileReaders.scala:221)
>  at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles(EventLogFileReaders.scala:220)
>  at 
> 

[jira] [Commented] (SPARK-33133) History server fails when loading invalid rolling event logs

2020-10-14 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213832#comment-17213832
 ] 

Adam Binford commented on SPARK-33133:
--

I created [https://issues.apache.org/jira/projects/SPARK/issues/SPARK-33146] to 
track the failing to load issue. Not sure if we want to create a separate issue 
for them becoming invalid in the first place or just make this issue pertain to 
that.

> History server fails when loading invalid rolling event logs
> 
>
> Key: SPARK-33133
> URL: https://issues.apache.org/jira/browse/SPARK-33133
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Adam Binford
>Priority: Major
>
> We have run into an issue where our history server fails to load new 
> applications, and when restarted, fails to load any applications at all. This 
> happens when it encounters invalid rolling event log files. We encounter this 
> with long running streaming applications. There seems to be two issues here 
> that lead to problems:
>  * It looks like our long running streaming applications event log directory 
> is being cleaned up. The next time the application logs event data, it 
> recreates the event log directory but without recreating the "appstatus" 
> file. I don't know the full extent of this behavior or if something "wrong" 
> is happening here.
>  * The history server then reads this new folder, and throws an exception 
> because the "appstatus" file doesn't exist in the rolling event log folder. 
> This exception breaks the entire listing process, so no new applications will 
> be read, and if restarted no applications at all will be read.
> There seems like a couple ways to go about fixing this, and I'm curious 
> anyone's thoughts who knows more about how the history server works, 
> specifically with rolling event logs:
>  * Don't completely fail checking for new applications if one bad rolling 
> event log folder is encountered. This seems like the simplest fix and makes 
> sense to me, it already checks for a few other errors and ignores them. It 
> doesn't necessarily fix the underlying issue that leads to this happening 
> though.
>  * Figure out why the in progress event log folder is being deleted and make 
> sure that doesn't happen. Maybe this is supposed to happen? Or maybe we don't 
> want to delete the top level folder and only delete event log files within 
> the folders? Again I don't know the exact current behavior here with this.
>  * When writing new event log data, make sure the folder and appstatus file 
> exist every time, creating them again if not.
> Here's the stack trace we encounter when this happens, from 3.0.1 with a 
> couple extra MRs backported that I hoped would fix the issue:
> {{2020-10-13 12:10:31,751 ERROR history.FsHistoryProvider: Exception in 
> checking for event log updates2020-10-13 12:10:31,751 ERROR 
> history.FsHistoryProvider: Exception in checking for event log 
> updatesjava.lang.IllegalArgumentException: requirement failed: Log directory 
> must contain an appstatus file! at scala.Predef$.require(Predef.scala:281) at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files$lzycompute(EventLogFileReaders.scala:214)
>  at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files(EventLogFileReaders.scala:211)
>  at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles$lzycompute(EventLogFileReaders.scala:221)
>  at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles(EventLogFileReaders.scala:220)
>  at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.lastEventLogFile(EventLogFileReaders.scala:272)
>  at 
> org.apache.spark.deploy.history.RollingEventLogFilesFileReader.fileSizeForLastIndex(EventLogFileReaders.scala:240)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7(FsHistoryProvider.scala:524)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7$adapted(FsHistoryProvider.scala:466)
>  at 
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:256)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255) at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249) at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:347) at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:347) at 
> 

[jira] [Created] (SPARK-33146) Encountering an invalid rolling event log folder prevents loading other applications in SHS

2020-10-14 Thread Adam Binford (Jira)
Adam Binford created SPARK-33146:


 Summary: Encountering an invalid rolling event log folder prevents 
loading other applications in SHS
 Key: SPARK-33146
 URL: https://issues.apache.org/jira/browse/SPARK-33146
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.0.1
Reporter: Adam Binford


A follow-on issue from https://issues.apache.org/jira/browse/SPARK-33133

If an invalid rolling event log folder is encountered by the Spark History 
Server upon startup, it crashes the whole loading process and prevents any 
valid applications from loading. We should simply catch the error, log it, and 
continue loading other applications.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33133) History server fails when loading invalid rolling event logs

2020-10-13 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-33133:
-
Description: 
We have run into an issue where our history server fails to load new 
applications, and when restarted, fails to load any applications at all. This 
happens when it encounters invalid rolling event log files. We encounter this 
with long running streaming applications. There seems to be two issues here 
that lead to problems:
 * It looks like our long running streaming applications event log directory is 
being cleaned up. The next time the application logs event data, it recreates 
the event log directory but without recreating the "appstatus" file. I don't 
know the full extent of this behavior or if something "wrong" is happening here.
 * The history server then reads this new folder, and throws an exception 
because the "appstatus" file doesn't exist in the rolling event log folder. 
This exception breaks the entire listing process, so no new applications will 
be read, and if restarted no applications at all will be read.

There seems like a couple ways to go about fixing this, and I'm curious 
anyone's thoughts who knows more about how the history server works, 
specifically with rolling event logs:
 * Don't completely fail checking for new applications if one bad rolling event 
log folder is encountered. This seems like the simplest fix and makes sense to 
me, it already checks for a few other errors and ignores them. It doesn't 
necessarily fix the underlying issue that leads to this happening though.
 * Figure out why the in progress event log folder is being deleted and make 
sure that doesn't happen. Maybe this is supposed to happen? Or maybe we don't 
want to delete the top level folder and only delete event log files within the 
folders? Again I don't know the exact current behavior here with this.
 * When writing new event log data, make sure the folder and appstatus file 
exist every time, creating them again if not.

Here's the stack trace we encounter when this happens, from 3.0.1 with a couple 
extra MRs backported that I hoped would fix the issue:

{{2020-10-13 12:10:31,751 ERROR history.FsHistoryProvider: Exception in 
checking for event log updates2020-10-13 12:10:31,751 ERROR 
history.FsHistoryProvider: Exception in checking for event log 
updatesjava.lang.IllegalArgumentException: requirement failed: Log directory 
must contain an appstatus file! at scala.Predef$.require(Predef.scala:281) at 
org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files$lzycompute(EventLogFileReaders.scala:214)
 at 
org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files(EventLogFileReaders.scala:211)
 at 
org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles$lzycompute(EventLogFileReaders.scala:221)
 at 
org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles(EventLogFileReaders.scala:220)
 at 
org.apache.spark.deploy.history.RollingEventLogFilesFileReader.lastEventLogFile(EventLogFileReaders.scala:272)
 at 
org.apache.spark.deploy.history.RollingEventLogFilesFileReader.fileSizeForLastIndex(EventLogFileReaders.scala:240)
 at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7(FsHistoryProvider.scala:524)
 at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7$adapted(FsHistoryProvider.scala:466)
 at 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:256)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255) at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249) at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at 
scala.collection.TraversableLike.filter(TraversableLike.scala:347) at 
scala.collection.TraversableLike.filter$(TraversableLike.scala:347) at 
scala.collection.AbstractTraversable.filter(Traversable.scala:108) at 
org.apache.spark.deploy.history.FsHistoryProvider.checkForLogs(FsHistoryProvider.scala:466)
 at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$startPolling$3(FsHistoryProvider.scala:287)
 at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1302) at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$getRunner$1(FsHistoryProvider.scala:210)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 

[jira] [Created] (SPARK-33133) History server fails when loading invalid rolling event logs

2020-10-13 Thread Adam Binford (Jira)
Adam Binford created SPARK-33133:


 Summary: History server fails when loading invalid rolling event 
logs
 Key: SPARK-33133
 URL: https://issues.apache.org/jira/browse/SPARK-33133
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.0.1
Reporter: Adam Binford


We have run into an issue where our history server fails to load new 
applications, and when restarted, fails to load any applications at all. This 
happens when it encounters invalid rolling event log files. We encounter this 
with long running streaming applications. There seems to be two issues here 
that lead to problems:
 * It looks like our long running streaming applications event log directory is 
being cleaned up. The next time the application logs event data, it recreates 
the event log directory but without recreating the "appstatus" file. I don't 
know the full extent of this behavior or if something "wrong" is happening here.
 * The history server then reads this new folder, and throws an exception 
because the "appstatus" file doesn't exist in the rolling event log folder. 
This exception breaks the entire listing process, so no new applications will 
be read, and if restarted no applications at all will be read.

There seems like a couple ways to go about fixing this, and I'm curious 
anyone's thoughts who knows more about how the history server works, 
specifically with rolling event logs:
 * Don't completely fail checking for new applications if one bad rolling event 
log folder is encountered. This seems like the simplest fix and makes sense to 
me, it already checks for a few other errors and ignores them.
 * Figure out why the in progress event log folder is being deleted and make 
sure that doesn't happen. Maybe this is supposed to happen? Or maybe we don't 
want to delete the top level folder and only delete event log files within the 
folders? Again I don't know the exact current behavior here with this.
 * When writing new event log data, make sure the folder and appstatus file 
exist every time, creating them again if not.

Here's the stack trace we encounter when this happens, from 3.0.1 with a couple 
extra MRs backported that I hoped would fix the issue:

{{2020-10-13 12:10:31,751 ERROR history.FsHistoryProvider: Exception in 
checking for event log updates2020-10-13 12:10:31,751 ERROR 
history.FsHistoryProvider: Exception in checking for event log 
updatesjava.lang.IllegalArgumentException: requirement failed: Log directory 
must contain an appstatus file! at scala.Predef$.require(Predef.scala:281) at 
org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files$lzycompute(EventLogFileReaders.scala:214)
 at 
org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files(EventLogFileReaders.scala:211)
 at 
org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles$lzycompute(EventLogFileReaders.scala:221)
 at 
org.apache.spark.deploy.history.RollingEventLogFilesFileReader.eventLogFiles(EventLogFileReaders.scala:220)
 at 
org.apache.spark.deploy.history.RollingEventLogFilesFileReader.lastEventLogFile(EventLogFileReaders.scala:272)
 at 
org.apache.spark.deploy.history.RollingEventLogFilesFileReader.fileSizeForLastIndex(EventLogFileReaders.scala:240)
 at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7(FsHistoryProvider.scala:524)
 at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$7$adapted(FsHistoryProvider.scala:466)
 at 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:256)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255) at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249) at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at 
scala.collection.TraversableLike.filter(TraversableLike.scala:347) at 
scala.collection.TraversableLike.filter$(TraversableLike.scala:347) at 
scala.collection.AbstractTraversable.filter(Traversable.scala:108) at 
org.apache.spark.deploy.history.FsHistoryProvider.checkForLogs(FsHistoryProvider.scala:466)
 at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$startPolling$3(FsHistoryProvider.scala:287)
 at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1302) at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$getRunner$1(FsHistoryProvider.scala:210)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 

[jira] [Commented] (SPARK-32835) Add withField to PySpark Column class

2020-09-09 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193216#comment-17193216
 ] 

Adam Binford commented on SPARK-32835:
--

I think I already have a working version of this, I'll try to put together a PR 
soon.

> Add withField to PySpark Column class
> -
>
> Key: SPARK-32835
> URL: https://issues.apache.org/jira/browse/SPARK-32835
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 3.1.0
>Reporter: Adam Binford
>Priority: Major
>
> The withField functionality was added to the Scala API, we simply need a 
> Python wrapper to call it.
> Will need to do the same for dropField once it gets added back.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32835) Add withField to PySpark Column class

2020-09-09 Thread Adam Binford (Jira)
Adam Binford created SPARK-32835:


 Summary: Add withField to PySpark Column class
 Key: SPARK-32835
 URL: https://issues.apache.org/jira/browse/SPARK-32835
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 3.1.0
Reporter: Adam Binford


The withField functionality was added to the Scala API, we simply need a Python 
wrapper to call it.

Will need to do the same for dropField once it gets added back.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32589) NoSuchElementException: None.get for needsUnsafeRowConversion

2020-08-11 Thread Adam Binford (Jira)
Adam Binford created SPARK-32589:


 Summary: NoSuchElementException: None.get for 
needsUnsafeRowConversion
 Key: SPARK-32589
 URL: https://issues.apache.org/jira/browse/SPARK-32589
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Adam Binford


I have run into an error somewhat non-deterministically where a query fails 
with 

{{NoSuchElementException: None.get}}

which happens at 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L182]

getActiveSession apparently is returning None. I only use Pyspark, and I think 
this is a threading issue, since the active session comes from an 
InheritableThreadLocal. I encounter this both when I manually use threading to 
run multiple jobs at the same time, as well as occasionally when I have 
multiple streams active at the same time. I tried using the PYSPARK_PIN_THREAD 
flag but it didn't seem to help. For the former case I hacked around it in my 
manual threading code by doing

{{spark._jvm.SparkSession.setActiveSession(spark._jvm.SparkSession.builder().getOrCreate())}}

at the start of each new thread, and this sometimes doesn't work reliably 
either.

I see this was mentioned in this 
[issue|https://issues.apache.org/jira/browse/SPARK-21418?focusedCommentId=16174642=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16174642]

I'm not sure if the problem/solution is something to do with Python threads, or 
adding a default value or some other way of updating this function. One other 
note is that I started encountering this when using Delta Lake OSS, which reads 
parquet files as part of the transaction log, which is when this error always 
happens. It doesn't seem like anything specific to that library though that 
would be doing something incorrectly that would cause this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31836) input_file_name() gives wrong value following Python UDF usage

2020-05-30 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17120375#comment-17120375
 ] 

Adam Binford commented on SPARK-31836:
--

Confirmed also an issue on 2.4.5. Also I could recreate with just two files 
without streaming, using 
{code:java}
spark.sql.files.openCostInBytes 0{code}
to make sure both files ended up on a single partition. The behavior seems to 
be after a python UDF, all rows in a partition have the input_file_name of the 
last row in the partition. But that's an assumption based on a tiny test. Doing
{code:java}
df = (df
.withColumn('before', input_file_name())
.withColumn('during', udf(lambda x: x)(input_file_name()))
.withColumn('after', input_file_name())
)
{code}
before and during are correct, where after is incorrect (all are the last file 
in the partition)

> input_file_name() gives wrong value following Python UDF usage
> --
>
> Key: SPARK-31836
> URL: https://issues.apache.org/jira/browse/SPARK-31836
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Wesley Hildebrandt
>Priority: Major
>
> I'm using PySpark for Spark 3.0.0 RC1 with Python 3.6.8.
> The following commands demonstrate that the input_file_name() function 
> sometimes returns the wrong filename following usage of a Python UDF:
> $ for i in `seq 5`; do echo $i > /tmp/test-file-$i; done
> $ pyspark
> >>> import pyspark.sql.functions as F
> >>> spark.readStream.text('file:///tmp/test-file-*', 
> >>> wholetext=True).withColumn('file1', 
> >>> F.input_file_name()).withColumn('udf', F.udf(lambda 
> >>> x:x)('value')).withColumn('file2', 
> >>> F.input_file_name()).writeStream.trigger(once=True).foreachBatch(lambda 
> >>> df,_: df.select('file1','file2').show(truncate=False, 
> >>> vertical=True)).start().awaitTermination()
> A few notes about this bug:
>  * It happens with many different files, so it's not related to the file 
> contents
>  * It also happens loading files from HDFS, so storage location is not a 
> factor
>  * It also happens using .csv() to read the files instead of .text(), so 
> input format is not a factor
>  * I have not been able to cause the error without using readStream, so it 
> seems to be related to streaming
>  * The bug also happens using spark-submit to send a job to my cluster
>  * I haven't tested an older version, but it's possible that Spark pulls 
> 24958 and 25321([https://github.com/apache/spark/pull/24958], 
> [https://github.com/apache/spark/pull/25321]) to fix issue 28153 
> (https://issues.apache.org/jira/browse/SPARK-28153) introduced this bug?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31376) Non-global sort support for structured streaming

2020-04-08 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078219#comment-17078219
 ] 

Adam Binford edited comment on SPARK-31376 at 4/8/20, 12:40 PM:


I tried multiple times to add myself to the dev@ mailing list but was 
unsuccessful, which is why I ended up just posting a Jira ticket. It looks like 
it finally worked using the subscribe link on the spark community page 
(subscribing from the mailing list page doesn't seem to work).

Taking the discussion there now that it finally actually worked.


was (Author: kimahriman):
I tried multiple times to add myself to the dev@ mailing list but was 
unsuccessful, which is why I ended up just posting a Jira ticket. It looks like 
it finally worked using the subscribe link on the spark community page 
(subscribing from the mailing list page doesn't seem to work).

> Non-global sort support for structured streaming
> 
>
> Key: SPARK-31376
> URL: https://issues.apache.org/jira/browse/SPARK-31376
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.1.0
>Reporter: Adam Binford
>Priority: Minor
>
> Currently, all sorting is disallowed with structured streaming queries. Not 
> allowing global sorting makes sense, but could non-global sorting (i.e. 
> sortWithinPartitions) be allowed? I'm running into this with an external 
> source I'm using, but not sure if this would be useful to file sources as 
> well. I have to foreachBatch so that I can do a sortWithinPartitions.
> Two main questions:
>  * Does a local sort cause issues with any exactly-once guarantees streaming 
> queries provides? I can't say I know or understand how these semantics work. 
> Or are there other issues I can't think of this would cause?
>  * Is the change as simple as changing the unsupported operations check to 
> only look for global sorts instead of all sorts?
> I have built a version that simply changes the unsupported check to only 
> disallow global sorts and it seems to be working. Anything I'm missing or is 
> it this simple?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31376) Non-global sort support for structured streaming

2020-04-08 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078219#comment-17078219
 ] 

Adam Binford commented on SPARK-31376:
--

I tried multiple times to add myself to the dev@ mailing list but was 
unsuccessful, which is why I ended up just posting a Jira ticket. It looks like 
it finally worked using the subscribe link on the spark community page 
(subscribing from the mailing list page doesn't seem to work).

> Non-global sort support for structured streaming
> 
>
> Key: SPARK-31376
> URL: https://issues.apache.org/jira/browse/SPARK-31376
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.1.0
>Reporter: Adam Binford
>Priority: Minor
>
> Currently, all sorting is disallowed with structured streaming queries. Not 
> allowing global sorting makes sense, but could non-global sorting (i.e. 
> sortWithinPartitions) be allowed? I'm running into this with an external 
> source I'm using, but not sure if this would be useful to file sources as 
> well. I have to foreachBatch so that I can do a sortWithinPartitions.
> Two main questions:
>  * Does a local sort cause issues with any exactly-once guarantees streaming 
> queries provides? I can't say I know or understand how these semantics work. 
> Or are there other issues I can't think of this would cause?
>  * Is the change as simple as changing the unsupported operations check to 
> only look for global sorts instead of all sorts?
> I have built a version that simply changes the unsupported check to only 
> disallow global sorts and it seems to be working. Anything I'm missing or is 
> it this simple?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31376) Non-global sort support for structured streaming

2020-04-07 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077803#comment-17077803
 ] 

Adam Binford commented on SPARK-31376:
--

You can't global sort because you can't sort an infinite list. However, 
regardless of batch vs streaming, a partition is a fairly well defined unit of 
work I would think. And I'm only speaking in the straightforward file based 
cases, I don't know if there are other reasons this wouldn't make sense, and 
I'm also ignoring continuous mode right now.

Say you had 5 input files you process as a batch. Your input size results in 8 
partitions, resulting in 8 output files.

Then let's say you processed those in a streaming fashion. And let's say the 
first micro batch is the first 3 files, that ends up being 5 partitions, and 
then your second micro batch is the last 2 files, resulting in 4 partitions. I 
don't feel like thinking about what math would make that end up being the case, 
but I'm sure that'd be a possibility. So you would end up with 9 total 
partitions/output files in that case. You already have a "different result" 
than you would have achieved via batch processing, and that doesn't even 
include any kind of sorting within partitions. Add in sorting, and you just get 
the same different number of partitions, but potentially better optimized for 
your use case. 

So it's not that one "makes sense", it's that global sort is not well defined 
while partition based sorting is. Though in theory you could make "global sort" 
mean sort within micro-batch and that would be well defined, but I'm not sure 
what benefits that would provide.

One question I have, and this isn't the use case I care about but something 
that seems useful, can you order by a column in a parquet file without using a 
foreachBatch? This seems like a very useful well defined optimization for a 
streaming output. 

Also, I don't quite understand how repartitioning is valid on a streaming 
query, I was surprised when that worked. That seems more questionable than 
local sorting.

Again, I don't know if there's non-file based reason this would cause issues, 
but a file based approach seems straightforward and well-defined.

> Non-global sort support for structured streaming
> 
>
> Key: SPARK-31376
> URL: https://issues.apache.org/jira/browse/SPARK-31376
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.1.0
>Reporter: Adam Binford
>Priority: Minor
>
> Currently, all sorting is disallowed with structured streaming queries. Not 
> allowing global sorting makes sense, but could non-global sorting (i.e. 
> sortWithinPartitions) be allowed? I'm running into this with an external 
> source I'm using, but not sure if this would be useful to file sources as 
> well. I have to foreachBatch so that I can do a sortWithinPartitions.
> Two main questions:
>  * Does a local sort cause issues with any exactly-once guarantees streaming 
> queries provides? I can't say I know or understand how these semantics work. 
> Or are there other issues I can't think of this would cause?
>  * Is the change as simple as changing the unsupported operations check to 
> only look for global sorts instead of all sorts?
> I have built a version that simply changes the unsupported check to only 
> disallow global sorts and it seems to be working. Anything I'm missing or is 
> it this simple?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31376) Non-global sort support for structured streaming

2020-04-07 Thread Adam Binford (Jira)
Adam Binford created SPARK-31376:


 Summary: Non-global sort support for structured streaming
 Key: SPARK-31376
 URL: https://issues.apache.org/jira/browse/SPARK-31376
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 3.0.0
Reporter: Adam Binford


Currently, all sorting is disallowed with structured streaming queries. Not 
allowing global sorting makes sense, but could non-global sorting (i.e. 
sortWithinPartitions) be allowed? I'm running into this with an external source 
I'm using, but not sure if this would be useful to file sources as well. I have 
to foreachBatch so that I can do a sortWithinPartitions.

Two main questions:
 * Does a local sort cause issues with any exactly-once guarantees streaming 
queries provides? I can't say I know or understand how these semantics work. Or 
are there other issues I can't think of this would cause?
 * Is the change as simple as changing the unsupported operations check to only 
look for global sorts instead of all sorts?

I have built a version that simply changes the unsupported check to only 
disallow global sorts and it seems to be working. Anything I'm missing or is it 
this simple?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30860) Different behavior between rolling and non-rolling event log

2020-02-17 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17038761#comment-17038761
 ] 

Adam Binford commented on SPARK-30860:
--

Yeah I'll see if I can manage to update it. Not the best Java/Scala person but 
I'm sure I can figure it out.

> Different behavior between rolling and non-rolling event log
> 
>
> Key: SPARK-30860
> URL: https://issues.apache.org/jira/browse/SPARK-30860
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy
>Affects Versions: 3.0.0
>Reporter: Adam Binford
>Priority: Minor
>
> When creating a rolling event log, the application directory is created with 
> a call to FileSystem.mkdirs, with the file permission 770. The default 
> behavior of HDFS is to set the permission of a file created with 
> FileSystem.create or FileSystem.mkdirs to (P & ^umask), where P is the 
> permission in the API call and umask is a system value set by 
> fs.permissions.umask-mode and defaults to 0022. This means, with default 
> settings, any mkdirs call can have at most 755 permissions, which causes 
> rolling event log directories to be created with 750 permissions. This causes 
> the history server to be unable to prune old applications if they are not run 
> by the same user running the history server.
> This is not a problem for non-rolling logs, because it uses 
> SparkHadoopUtils.createFile for Hadoop 2 backward compatibility, and then 
> calls FileSystem.setPermission with 770 after the file has been created. 
> setPermission doesn't have the umask applied to it, so this works fine.
> Obviously this could be fixed by changing fs.permissions.umask-mode, but I'm 
> not sure the reason that's set in the first place or if this would hurt 
> anything else. The main issue is there is different behavior between rolling 
> and non-rolling event logs that might want to be updated in this repo to be 
> consistent across each.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30860) Different behavior between rolling and non-rolling event log

2020-02-17 Thread Adam Binford (Jira)
Adam Binford created SPARK-30860:


 Summary: Different behavior between rolling and non-rolling event 
log
 Key: SPARK-30860
 URL: https://issues.apache.org/jira/browse/SPARK-30860
 Project: Spark
  Issue Type: Bug
  Components: Deploy
Affects Versions: 3.0.0
Reporter: Adam Binford


When creating a rolling event log, the application directory is created with a 
call to FileSystem.mkdirs, with the file permission 770. The default behavior 
of HDFS is to set the permission of a file created with FileSystem.create or 
FileSystem.mkdirs to (P & ^umask), where P is the permission in the API call 
and umask is a system value set by fs.permissions.umask-mode and defaults to 
0022. This means, with default settings, any mkdirs call can have at most 755 
permissions, which causes rolling event log directories to be created with 750 
permissions. This causes the history server to be unable to prune old 
applications if they are not run by the same user running the history server.

This is not a problem for non-rolling logs, because it uses 
SparkHadoopUtils.createFile for Hadoop 2 backward compatibility, and then calls 
FileSystem.setPermission with 770 after the file has been created. 
setPermission doesn't have the umask applied to it, so this works fine.

Obviously this could be fixed by changing fs.permissions.umask-mode, but I'm 
not sure the reason that's set in the first place or if this would hurt 
anything else. The main issue is there is different behavior between rolling 
and non-rolling event logs that might want to be updated in this repo to be 
consistent across each.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org