FLINK-33759

2024-05-10 Thread Frank Yin
Hi,

I opened a PR on GitHub to address FLINK-33759 that causes issues when
writing Parquet files with complex data schema.
https://github.com/apache/flink/pull/24029

Can anyone help review this PR?

Thanks,
Frank


[jira] [Created] (SPARK-45612) Allow cached RDDs to be migrated to fallback storage during decommission

2023-10-19 Thread Frank Yin (Jira)
Frank Yin created SPARK-45612:
-

 Summary: Allow cached RDDs to be migrated to fallback storage 
during decommission
 Key: SPARK-45612
 URL: https://issues.apache.org/jira/browse/SPARK-45612
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.5.0
Reporter: Frank Yin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45579) Executor hangs indefinitely due to decommissioner errors

2023-10-17 Thread Frank Yin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Yin updated SPARK-45579:
--
Description: 
During Spark executor decommission, the fallback storage uploads can fail due 
to some race conditions even though we check the actual file exists: 

{{java.io.FileNotFoundException: No file: 
/var/data/spark-ab14b716-630d-435e-a92a-1403f6206dd8/blockmgr-7f9ab4d7-1340-4b39-9558-fde994a82090/0b/shuffle_175_66754_0.index
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.checkSource(CopyFromLocalOperation.java:314)
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:167)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2558)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2520)
at 
org.apache.spark.storage.FallbackStorage.copy(FallbackStorage.scala:67)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12(BlockManagerDecommissioner.scala:146)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12$adapted(BlockManagerDecommissioner.scala:146)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:146)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)}}

This will block the executor from exiting properly because the decommissioner 
doesn't think shuffle migration is complete. 

  was:
During Spark executor decommission, the fallback storage uploads can fail due 
to some race conditions even though we check the actual file exists: 

java.io.FileNotFoundException: No file: 
/var/data/spark-ab14b716-630d-435e-a92a-1403f6206dd8/blockmgr-7f9ab4d7-1340-4b39-9558-fde994a82090/0b/shuffle_175_66754_0.index
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.checkSource(CopyFromLocalOperation.java:314)
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:167)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2558)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2520)
at 
org.apache.spark.storage.FallbackStorage.copy(FallbackStorage.scala:67)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12(BlockManagerDecommissioner.scala:146)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12$adapted(BlockManagerDecommissioner.scala:146)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:146)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264

[jira] [Updated] (SPARK-45579) Executor hangs indefinitely due to decommissioner errors

2023-10-17 Thread Frank Yin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Yin updated SPARK-45579:
--
Description: 
During Spark executor decommission, the fallback storage uploads can fail due 
to some race conditions even though we check the actual file exists: 

java.io.FileNotFoundException: No file: 
/var/data/spark-ab14b716-630d-435e-a92a-1403f6206dd8/blockmgr-7f9ab4d7-1340-4b39-9558-fde994a82090/0b/shuffle_175_66754_0.index
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.checkSource(CopyFromLocalOperation.java:314)
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:167)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2558)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2520)
at 
org.apache.spark.storage.FallbackStorage.copy(FallbackStorage.scala:67)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12(BlockManagerDecommissioner.scala:146)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12$adapted(BlockManagerDecommissioner.scala:146)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:146)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

This will block the executor from exiting properly because the decommissioner 
doesn't think shuffle migration is complete. 

  was:
During Spark executor decommission, the fallback storage uploads can fail due 
to some race conditions even though we check the actual file exists: 

{{
java.io.FileNotFoundException: No file: 
/var/data/spark-ab14b716-630d-435e-a92a-1403f6206dd8/blockmgr-7f9ab4d7-1340-4b39-9558-fde994a82090/0b/shuffle_175_66754_0.index
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.checkSource(CopyFromLocalOperation.java:314)
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:167)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2558)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2520)
at 
org.apache.spark.storage.FallbackStorage.copy(FallbackStorage.scala:67)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12(BlockManagerDecommissioner.scala:146)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12$adapted(BlockManagerDecommissioner.scala:146)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:146)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264

[jira] [Updated] (SPARK-45579) Executor hangs indefinitely due to decommissioner errors

2023-10-17 Thread Frank Yin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Yin updated SPARK-45579:
--
Description: 
During Spark executor decommission, the fallback storage uploads can fail due 
to some race conditions even though we check the actual file exists: 

{{
java.io.FileNotFoundException: No file: 
/var/data/spark-ab14b716-630d-435e-a92a-1403f6206dd8/blockmgr-7f9ab4d7-1340-4b39-9558-fde994a82090/0b/shuffle_175_66754_0.index
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.checkSource(CopyFromLocalOperation.java:314)
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:167)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2558)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2520)
at 
org.apache.spark.storage.FallbackStorage.copy(FallbackStorage.scala:67)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12(BlockManagerDecommissioner.scala:146)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12$adapted(BlockManagerDecommissioner.scala:146)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:146)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
}}

This will block the executor from exiting properly because the decommissioner 
doesn't think shuffle migration is complete. 

  was:
During Spark executor decommission, the fallback storage uploads can fail due 
to some race conditions even though we check the actual file exists: 

{{java.io.FileNotFoundException: No file: 
/var/data/spark-ab14b716-630d-435e-a92a-1403f6206dd8/blockmgr-7f9ab4d7-1340-4b39-9558-fde994a82090/0b/shuffle_175_66754_0.index
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.checkSource(CopyFromLocalOperation.java:314)
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:167)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2558)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2520)
at 
org.apache.spark.storage.FallbackStorage.copy(FallbackStorage.scala:67)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12(BlockManagerDecommissioner.scala:146)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12$adapted(BlockManagerDecommissioner.scala:146)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:146)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java

[jira] [Updated] (SPARK-45579) Executor hangs indefinitely due to decommissioner errors

2023-10-17 Thread Frank Yin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Yin updated SPARK-45579:
--
Description: 
During Spark executor decommission, the fallback storage uploads can fail due 
to some race conditions even though we check the actual file exists: 

{{java.io.FileNotFoundException: No file: 
/var/data/spark-ab14b716-630d-435e-a92a-1403f6206dd8/blockmgr-7f9ab4d7-1340-4b39-9558-fde994a82090/0b/shuffle_175_66754_0.index
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.checkSource(CopyFromLocalOperation.java:314)
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:167)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2558)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2520)
at 
org.apache.spark.storage.FallbackStorage.copy(FallbackStorage.scala:67)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12(BlockManagerDecommissioner.scala:146)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12$adapted(BlockManagerDecommissioner.scala:146)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:146)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)}}

This will block the executor from exiting properly because the decommissioner 
doesn't think shuffle migration is complete. 

  was:
During Spark executor decommission, the fallback storage uploads can fail due 
to some race conditions even though we check the actual file exists: 
```
java.io.FileNotFoundException: No file: 
/var/data/spark-ab14b716-630d-435e-a92a-1403f6206dd8/blockmgr-7f9ab4d7-1340-4b39-9558-fde994a82090/0b/shuffle_175_66754_0.index
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.checkSource(CopyFromLocalOperation.java:314)
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:167)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2558)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2520)
at 
org.apache.spark.storage.FallbackStorage.copy(FallbackStorage.scala:67)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12(BlockManagerDecommissioner.scala:146)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12$adapted(BlockManagerDecommissioner.scala:146)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:146)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java

[jira] [Updated] (SPARK-45579) Executor hangs indefinitely due to decommissioner errors

2023-10-17 Thread Frank Yin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Yin updated SPARK-45579:
--
Description: 
During Spark executor decommission, the fallback storage uploads can fail due 
to some race conditions even though we check the actual file exists: 

java.io.FileNotFoundException: No file: 
/var/data/spark-ab14b716-630d-435e-a92a-1403f6206dd8/blockmgr-7f9ab4d7-1340-4b39-9558-fde994a82090/0b/shuffle_175_66754_0.index
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.checkSource(CopyFromLocalOperation.java:314)
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:167)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2558)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2520)
at 
org.apache.spark.storage.FallbackStorage.copy(FallbackStorage.scala:67)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12(BlockManagerDecommissioner.scala:146)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12$adapted(BlockManagerDecommissioner.scala:146)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:146)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

This will block the executor from exiting properly because the decommissioner 
doesn't think shuffle migration is complete. 

  was:
During Spark executor decommission, the fallback storage uploads can fail due 
to some race conditions even though we check the actual file exists: 

{{java.io.FileNotFoundException: No file: 
/var/data/spark-ab14b716-630d-435e-a92a-1403f6206dd8/blockmgr-7f9ab4d7-1340-4b39-9558-fde994a82090/0b/shuffle_175_66754_0.index
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.checkSource(CopyFromLocalOperation.java:314)
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:167)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2558)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2520)
at 
org.apache.spark.storage.FallbackStorage.copy(FallbackStorage.scala:67)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12(BlockManagerDecommissioner.scala:146)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12$adapted(BlockManagerDecommissioner.scala:146)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:146)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264

[jira] [Updated] (SPARK-45579) Executor hangs indefinitely due to decommissioner errors

2023-10-17 Thread Frank Yin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Yin updated SPARK-45579:
--
Description: 
During Spark executor decommission, the fallback storage uploads can fail due 
to some race conditions even though we check the actual file exists: 
```
java.io.FileNotFoundException: No file: 
/var/data/spark-ab14b716-630d-435e-a92a-1403f6206dd8/blockmgr-7f9ab4d7-1340-4b39-9558-fde994a82090/0b/shuffle_175_66754_0.index
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.checkSource(CopyFromLocalOperation.java:314)
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:167)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2558)
at 
org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2520)
at 
org.apache.spark.storage.FallbackStorage.copy(FallbackStorage.scala:67)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12(BlockManagerDecommissioner.scala:146)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$12$adapted(BlockManagerDecommissioner.scala:146)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:146)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
```
This will block the executor from exiting properly because the decommissioner 
doesn't think shuffle migration is complete. 

  was:
During Spark executor decommission, the fallback storage uploads can fail due 
to some race conditions: 



> Executor hangs indefinitely due to decommissioner errors
> 
>
> Key: SPARK-45579
> URL: https://issues.apache.org/jira/browse/SPARK-45579
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.5.0
>    Reporter: Frank Yin
>Priority: Major
>
> During Spark executor decommission, the fallback storage uploads can fail due 
> to some race conditions even though we check the actual file exists: 
> ```
> java.io.FileNotFoundException: No file: 
> /var/data/spark-ab14b716-630d-435e-a92a-1403f6206dd8/blockmgr-7f9ab4d7-1340-4b39-9558-fde994a82090/0b/shuffle_175_66754_0.index
>   at 
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.checkSource(CopyFromLocalOperation.java:314)
>   at 
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:167)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
>   at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
>   at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
>   at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
>   at 
> org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2558)
>   at 
> org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2520)
>   at 
> org.apache.spa

[jira] [Created] (SPARK-45579) Executor hangs indefinitely due to decommissioner errors

2023-10-17 Thread Frank Yin (Jira)
Frank Yin created SPARK-45579:
-

 Summary: Executor hangs indefinitely due to decommissioner errors
 Key: SPARK-45579
 URL: https://issues.apache.org/jira/browse/SPARK-45579
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.5.0
Reporter: Frank Yin


During Spark executor decommission, the fallback storage uploads can fail due 
to some race conditions: 




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44547) BlockManagerDecommissioner throws exceptions when migrating RDD cached blocks to fallback storage

2023-07-25 Thread Frank Yin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Yin updated SPARK-44547:
--
Description: 
Looks like the RDD cache doesn't support fallback storage and we should stop 
the migration if the only viable peer is the fallback storage. 

  [^spark-error.log] 23/07/25 05:12:58 WARN BlockManager: Failed to replicate 
rdd_18_25 to BlockManagerId(fallback, remote, 7337, None), failure #0
java.io.IOException: Failed to connect to remote:7337
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
at 
org.apache.spark.network.netty.NettyBlockTransferService.uploadBlock(NettyBlockTransferService.scala:168)
at 
org.apache.spark.network.BlockTransferService.uploadBlockSync(BlockTransferService.scala:121)
at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$replicate(BlockManager.scala:1784)
at 
org.apache.spark.storage.BlockManager.$anonfun$replicateBlock$2(BlockManager.scala:1721)
at 
org.apache.spark.storage.BlockManager.$anonfun$replicateBlock$2$adapted(BlockManager.scala:1707)
at scala.Option.forall(Option.scala:390)
at 
org.apache.spark.storage.BlockManager.replicateBlock(BlockManager.scala:1707)
at 
org.apache.spark.storage.BlockManagerDecommissioner.migrateBlock(BlockManagerDecommissioner.scala:356)
at 
org.apache.spark.storage.BlockManagerDecommissioner.$anonfun$decommissionRddCacheBlocks$3(BlockManagerDecommissioner.scala:340)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.spark.storage.BlockManagerDecommissioner.decommissionRddCacheBlocks(BlockManagerDecommissioner.scala:339)
at 
org.apache.spark.storage.BlockManagerDecommissioner$$anon$1.run(BlockManagerDecommissioner.scala:214)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.net.UnknownHostException: remote
at java.base/java.net.InetAddress$CachedAddresses.get(Unknown Source)
at java.base/java.net.InetAddress.getAllByName0(Unknown Source)
at java.base/java.net.InetAddress.getAllByName(Unknown Source)
at java.base/java.net.InetAddress.getAllByName(Unknown Source)
at java.base/java.net.InetAddress.getByName(Unknown Source)
at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:156)
at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:153)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at 
io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:153)
at 
io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:41)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:61)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:53)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:55)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:31)
at 
io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:106)
at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:206)
at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:46)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:180)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:166)
at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at 
io.netty.util.concurrent.DefaultPromise.setValue0

[jira] [Updated] (SPARK-44547) BlockManagerDecommissioner throws exceptions when migrating RDD cached blocks to fallback storage

2023-07-25 Thread Frank Yin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Yin updated SPARK-44547:
--
Attachment: spark-error.log

> BlockManagerDecommissioner throws exceptions when migrating RDD cached blocks 
> to fallback storage
> -
>
> Key: SPARK-44547
> URL: https://issues.apache.org/jira/browse/SPARK-44547
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.1
>Reporter: Frank Yin
>Priority: Major
> Attachments: spark-error.log
>
>
> Looks like the RDD cache doesn't support fallback storage and we should stop 
> the migration if the only viable peer is the fallback storage. 
>  {{23/07/25 05:12:58 WARN BlockManager: Failed to replicate rdd_18_25 to 
> BlockManagerId(fallback, remote, 7337, None), failure #0
> java.io.IOException: Failed to connect to remote:7337
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
>   at 
> org.apache.spark.network.netty.NettyBlockTransferService.uploadBlock(NettyBlockTransferService.scala:168)
>   at 
> org.apache.spark.network.BlockTransferService.uploadBlockSync(BlockTransferService.scala:121)
>   at 
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$replicate(BlockManager.scala:1784)
>   at 
> org.apache.spark.storage.BlockManager.$anonfun$replicateBlock$2(BlockManager.scala:1721)
>   at 
> org.apache.spark.storage.BlockManager.$anonfun$replicateBlock$2$adapted(BlockManager.scala:1707)
>   at scala.Option.forall(Option.scala:390)
>   at 
> org.apache.spark.storage.BlockManager.replicateBlock(BlockManager.scala:1707)
>   at 
> org.apache.spark.storage.BlockManagerDecommissioner.migrateBlock(BlockManagerDecommissioner.scala:356)
>   at 
> org.apache.spark.storage.BlockManagerDecommissioner.$anonfun$decommissionRddCacheBlocks$3(BlockManagerDecommissioner.scala:340)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> org.apache.spark.storage.BlockManagerDecommissioner.decommissionRddCacheBlocks(BlockManagerDecommissioner.scala:339)
>   at 
> org.apache.spark.storage.BlockManagerDecommissioner$$anon$1.run(BlockManagerDecommissioner.scala:214)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>   at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>   at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.net.UnknownHostException: remote
>   at java.base/java.net.InetAddress$CachedAddresses.get(Unknown Source)
>   at java.base/java.net.InetAddress.getAllByName0(Unknown Source)
>   at java.base/java.net.InetAddress.getAllByName(Unknown Source)
>   at java.base/java.net.InetAddress.getAllByName(Unknown Source)
>   at java.base/java.net.InetAddress.getByName(Unknown Source)
>   at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:156)
>   at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:153)
>   at java.base/java.security.AccessController.doPrivileged(Native Method)
>   at 
> io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:153)
>   at 
> io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:41)
>   at 
> io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:61)
>   at 
> io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:53)
>   at 
> io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:55)
> 

[jira] [Updated] (SPARK-44547) BlockManagerDecommissioner throws exceptions when migrating RDD cached blocks to fallback storage

2023-07-25 Thread Frank Yin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Yin updated SPARK-44547:
--
Description: 
Looks like the RDD cache doesn't support fallback storage and we should stop 
the migration if the only viable peer is the fallback storage. 

 {{23/07/25 05:12:58 WARN BlockManager: Failed to replicate rdd_18_25 to 
BlockManagerId(fallback, remote, 7337, None), failure #0
java.io.IOException: Failed to connect to remote:7337
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
at 
org.apache.spark.network.netty.NettyBlockTransferService.uploadBlock(NettyBlockTransferService.scala:168)
at 
org.apache.spark.network.BlockTransferService.uploadBlockSync(BlockTransferService.scala:121)
at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$replicate(BlockManager.scala:1784)
at 
org.apache.spark.storage.BlockManager.$anonfun$replicateBlock$2(BlockManager.scala:1721)
at 
org.apache.spark.storage.BlockManager.$anonfun$replicateBlock$2$adapted(BlockManager.scala:1707)
at scala.Option.forall(Option.scala:390)
at 
org.apache.spark.storage.BlockManager.replicateBlock(BlockManager.scala:1707)
at 
org.apache.spark.storage.BlockManagerDecommissioner.migrateBlock(BlockManagerDecommissioner.scala:356)
at 
org.apache.spark.storage.BlockManagerDecommissioner.$anonfun$decommissionRddCacheBlocks$3(BlockManagerDecommissioner.scala:340)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.spark.storage.BlockManagerDecommissioner.decommissionRddCacheBlocks(BlockManagerDecommissioner.scala:339)
at 
org.apache.spark.storage.BlockManagerDecommissioner$$anon$1.run(BlockManagerDecommissioner.scala:214)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.net.UnknownHostException: remote
at java.base/java.net.InetAddress$CachedAddresses.get(Unknown Source)
at java.base/java.net.InetAddress.getAllByName0(Unknown Source)
at java.base/java.net.InetAddress.getAllByName(Unknown Source)
at java.base/java.net.InetAddress.getAllByName(Unknown Source)
at java.base/java.net.InetAddress.getByName(Unknown Source)
at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:156)
at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:153)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at 
io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:153)
at 
io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:41)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:61)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:53)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:55)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:31)
at 
io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:106)
at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:206)
at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:46)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:180)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:166)
at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at 
io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616

[jira] [Created] (SPARK-44547) BlockManagerDecommissioner throws exceptions when migrating RDD cached blocks to fallback storage

2023-07-25 Thread Frank Yin (Jira)
Frank Yin created SPARK-44547:
-

 Summary: BlockManagerDecommissioner throws exceptions when 
migrating RDD cached blocks to fallback storage
 Key: SPARK-44547
 URL: https://issues.apache.org/jira/browse/SPARK-44547
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.4.1
Reporter: Frank Yin


Looks like the RDD cache doesn't support fallback storage and we should stop 
the migration if the only viable peer is the fallback storage. 

```
23/07/25 05:12:58 WARN BlockManager: Failed to replicate rdd_18_25 to 
BlockManagerId(fallback, remote, 7337, None), failure #0
java.io.IOException: Failed to connect to remote:7337
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
at 
org.apache.spark.network.netty.NettyBlockTransferService.uploadBlock(NettyBlockTransferService.scala:168)
at 
org.apache.spark.network.BlockTransferService.uploadBlockSync(BlockTransferService.scala:121)
at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$replicate(BlockManager.scala:1784)
at 
org.apache.spark.storage.BlockManager.$anonfun$replicateBlock$2(BlockManager.scala:1721)
at 
org.apache.spark.storage.BlockManager.$anonfun$replicateBlock$2$adapted(BlockManager.scala:1707)
at scala.Option.forall(Option.scala:390)
at org.apache.spark.storage.BlockManager.replicateBlock(BlockManager.scala:1707)
at 
org.apache.spark.storage.BlockManagerDecommissioner.migrateBlock(BlockManagerDecommissioner.scala:356)
at 
org.apache.spark.storage.BlockManagerDecommissioner.$anonfun$decommissionRddCacheBlocks$3(BlockManagerDecommissioner.scala:340)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.spark.storage.BlockManagerDecommissioner.decommissionRddCacheBlocks(BlockManagerDecommissioner.scala:339)
at 
org.apache.spark.storage.BlockManagerDecommissioner$$anon$1.run(BlockManagerDecommissioner.scala:214)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.net.UnknownHostException: remote
at java.base/java.net.InetAddress$CachedAddresses.get(Unknown Source)
at java.base/java.net.InetAddress.getAllByName0(Unknown Source)
at java.base/java.net.InetAddress.getAllByName(Unknown Source)
at java.base/java.net.InetAddress.getAllByName(Unknown Source)
at java.base/java.net.InetAddress.getByName(Unknown Source)
at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:156)
at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:153)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:153)
at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:41)
at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:61)
at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:53)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:55)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:31)
at 
io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:106)
at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:206)
at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:46)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:180)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:166)
at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104

[jira] [Commented] (SPARK-39200) Stream is corrupted Exception while fetching the blocks from fallback storage system

2022-09-21 Thread Frank Yin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-39200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17608008#comment-17608008
 ] 

Frank Yin commented on SPARK-39200:
---

We've seen this exception as well. Is there a patch coming? 

> Stream is corrupted Exception while fetching the blocks from fallback storage 
> system
> 
>
> Key: SPARK-39200
> URL: https://issues.apache.org/jira/browse/SPARK-39200
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: Rajendra Gujja
>Priority: Major
>
> When executor decommissioning and fallback storage is enabled - the shuffle 
> reads are failing with `FetchFailedException: Stream is corrupted` 
> ref: https://issues.apache.org/jira/browse/SPARK-18105 (search for 
> decommission)
>  
> This is happening when the shuffle block is bigger than `inputstream.read` 
> can read in one attempt. The code path is not reading the block fully 
> (`readFully`) and the partial read is causing the exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: restarting jenkins build system tomorrow (7/8) ~930am PDT

2020-07-10 Thread Frank Yin
Yeah, that's what I figured -- those workers are under load. Thanks.

On Fri, Jul 10, 2020 at 12:43 PM shane knapp ☠  wrote:

> only 125561, 125562 and 125564 were impacted by -9.
>
> 125565 exited w/a code of 15 (143 - 128), which means the process was
> terminated for unknown reasons.
>
> 125563 looks like mima failed due to a bunch of errors.
>
> i just spot checked a bunch of recent failed PRB builds from today and
> they all seemed to be legit.
>
> another thing that might be happening is an overload of PRB builds on the
> workers due to the backlog...  the workers are under a LOT of load right
> now, and i can put some rate limiting in to see if that helps out.
>
> shane
>
> On Fri, Jul 10, 2020 at 11:31 AM Frank Yin  wrote:
>
>> Like from build number 125565 to 125561, all impacted by kill -9.
>>
>> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125565/console
>>
>> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125564/console
>>
>> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125563/console
>>
>> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125562/console
>>
>> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125561/console
>>
>> On Fri, Jul 10, 2020 at 9:35 AM shane knapp ☠ 
>> wrote:
>>
>>> define "a lot" and provide some links to those builds, please.  there
>>> are roughly 2000 builds per day, and i can't do more than keep a cursory
>>> eye on things.
>>>
>>> the infrastructure that the tests run on hasn't changed one bit on any
>>> of the workers, and 'kill -9' could be a timeout, flakiness caused by old
>>> build processes remaining on the workers after the master went down, or me
>>> trying to clean things up w/o a reboot.  or, perhaps, something wrong w/the
>>> infra.  :)
>>>
>>> On Fri, Jul 10, 2020 at 9:28 AM Frank Yin  wrote:
>>>
>>>> Agree, but I’ve seen a lot of kill by signal 9, assuming that
>>>> infrastructure?
>>>>
>>>> On Fri, Jul 10, 2020 at 8:19 AM shane knapp ☠ 
>>>> wrote:
>>>>
>>>>> yeah, i can't do much for flaky tests...  just flaky infrastructure.
>>>>>
>>>>>
>>>>> On Fri, Jul 10, 2020 at 12:41 AM Hyukjin Kwon 
>>>>> wrote:
>>>>>
>>>>>> Couple of flaky tests can happen. It's usual. Seems it got better now
>>>>>> at least. I will keep monitoring the builds.
>>>>>>
>>>>>> 2020년 7월 10일 (금) 오후 4:33, ukby1234 님이 작성:
>>>>>>
>>>>>>> Looks like Jenkins isn't stable still. My PR fails two times in a
>>>>>>> row:
>>>>>>>
>>>>>>> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125565/console
>>>>>>>
>>>>>>> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125536/testReport
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Sent from:
>>>>>>> http://apache-spark-developers-list.1001551.n3.nabble.com/
>>>>>>>
>>>>>>> -
>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> Shane Knapp
>>>>> Computer Guy / Voice of Reason
>>>>> UC Berkeley EECS Research / RISELab Staff Technical Lead
>>>>> https://rise.cs.berkeley.edu
>>>>>
>>>>
>>>
>>> --
>>> Shane Knapp
>>> Computer Guy / Voice of Reason
>>> UC Berkeley EECS Research / RISELab Staff Technical Lead
>>> https://rise.cs.berkeley.edu
>>>
>>
>
> --
> Shane Knapp
> Computer Guy / Voice of Reason
> UC Berkeley EECS Research / RISELab Staff Technical Lead
> https://rise.cs.berkeley.edu
>


[jira] [Updated] (SPARK-32059) Nested Schema Pruning not Working in Window Functions

2020-06-22 Thread Frank Yin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Yin updated SPARK-32059:
--
Description: 
Using tables and data structures in `SchemaPruningSuite.scala`

 
{code:java}
// code placeholder
case class FullName(first: String, middle: String, last: String)
case class Company(name: String, address: String)
case class Employer(id: Int, company: Company)
case class Contact(
  id: Int,
  name: FullName,
  address: String,
  pets: Int,
  friends: Array[FullName] = Array.empty,
  relatives: Map[String, FullName] = Map.empty,
  employer: Employer = null,
  relations: Map[FullName, String] = Map.empty)
case class Department(
  depId: Int,
  depName: String,
  contactId: Int,
  employer: Employer)
{code}
 

The query to run:
{code:java}
// code placeholder
select a.name.first from (select row_number() over (partition by address order 
by id desc) as __rank, contacts.* from contacts) a where a.name.first = 'A' AND 
a.__rank = 1
{code}
 

The current physical plan:
{code:java}
// code placeholder
== Physical Plan ==
*(3) Project [name#46.first AS first#74]
+- *(3) Filter (((isnotnull(name#46) AND isnotnull(__rank#71)) AND 
(name#46.first = A)) AND (__rank#71 = 1))
   +- Window [row_number() windowspecdefinition(address#47, id#45 DESC NULLS 
LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
__rank#71], [address#47], [id#45 DESC NULLS LAST]
  +- *(2) Sort [address#47 ASC NULLS FIRST, id#45 DESC NULLS LAST], false, 0
 +- Exchange hashpartitioning(address#47, 5), true, [id=#52]
+- *(1) Project [id#45, name#46, address#47]
   +- FileScan parquet [id#45,name#46,address#47,p#53] Batched: 
false, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/private/var/folders/_c/4r2j33dd14n9ldfc2xqyzs40gn/T/spark-85d173af-42...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct,address:string>
{code}
 

The desired physical plan:

 
{code:java}
// code placeholder
== Physical Plan ==
*(3) Project [_gen_alias_77#77 AS first#74]
+- *(3) Filter (((isnotnull(_gen_alias_77#77) AND isnotnull(__rank#71)) AND 
(_gen_alias_77#77 = A)) AND (__rank#71 = 1))
   +- Window [row_number() windowspecdefinition(address#47, id#45 DESC NULLS 
LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
__rank#71], [address#47], [id#45 DESC NULLS LAST]
  +- *(2) Sort [address#47 ASC NULLS FIRST, id#45 DESC NULLS LAST], false, 0
 +- Exchange hashpartitioning(address#47, 5), true, [id=#52]
+- *(1) Project [id#45, name#46.first AS _gen_alias_77#77, 
address#47]
   +- FileScan parquet [id#45,name#46,address#47,p#53] Batched: 
false, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/private/var/folders/_c/4r2j33dd14n9ldfc2xqyzs40gn/T/spark-c64e0b29-d9...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct,address:string>
{code}

  was:
Using tables and data structures in `SchemaPruningSuite.scala`

```

case class FullName(first: String, middle: String, last: String)
case class Company(name: String, address: String)
case class Employer(id: Int, company: Company)
case class Contact(
 id: Int,
 name: FullName,
 address: String,
 pets: Int,
 friends: Array[FullName] = Array.empty,
 relatives: Map[String, FullName] = Map.empty,
 employer: Employer = null,
 relations: Map[FullName, String] = Map.empty)
case class Department(
 depId: Int,
 depName: String,
 contactId: Int,
 employer: Employer)

```

 

The query to run: `

select a.name.first from (select row_number() over (partition by address order 
by id desc) as __rank, contacts.* from contacts) a where a.name.first = 'A' AND 
a.__rank = 1`

 

The current physical plan:

```

== Physical Plan ==
*(3) Project [name#46.first AS first#74]
+- *(3) Filter (((isnotnull(name#46) AND isnotnull(__rank#71)) AND 
(name#46.first = A)) AND (__rank#71 = 1))
 +- Window [row_number() windowspecdefinition(address#47, id#45 DESC NULLS 
LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
__rank#71], [address#47], [id#45 DESC NULLS LAST]
 +- *(2) Sort [address#47 ASC NULLS FIRST, id#45 DESC NULLS LAST], false, 0
 +- Exchange hashpartitioning(address#47, 5), true, [id=#52]
 +- *(1) Project [id#45, name#46, address#47]
 +- FileScan parquet [id#45,name#46,address#47,p#53] Batched: false, 
DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/private/var/folders/_c/4r2j33dd14n9ldfc2xqyzs40gn/T/spark-85d173af-42...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct,address:string>

```

 

The desired physical plan:

```

== Physical Plan ==
*(3) Project [_gen_alias_77#77 AS first#74]
+- *(3) Filter (((isnotnull(_gen_alias_77#77) AND isnotnull(__rank#71)) AND 
(_gen_alias_77#77 = A)) AND (__rank#71 = 1))
 +- Window [row_number() windowspecdefinition(address#47, id#45 D

[jira] [Created] (SPARK-32059) Nested Schema Pruning not Working in Window Functions

2020-06-22 Thread Frank Yin (Jira)
Frank Yin created SPARK-32059:
-

 Summary: Nested Schema Pruning not Working in Window Functions
 Key: SPARK-32059
 URL: https://issues.apache.org/jira/browse/SPARK-32059
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Frank Yin


Using tables and data structures in `SchemaPruningSuite.scala`

```

case class FullName(first: String, middle: String, last: String)
case class Company(name: String, address: String)
case class Employer(id: Int, company: Company)
case class Contact(
 id: Int,
 name: FullName,
 address: String,
 pets: Int,
 friends: Array[FullName] = Array.empty,
 relatives: Map[String, FullName] = Map.empty,
 employer: Employer = null,
 relations: Map[FullName, String] = Map.empty)
case class Department(
 depId: Int,
 depName: String,
 contactId: Int,
 employer: Employer)

```

 

The query to run: `

select a.name.first from (select row_number() over (partition by address order 
by id desc) as __rank, contacts.* from contacts) a where a.name.first = 'A' AND 
a.__rank = 1`

 

The current physical plan:

```

== Physical Plan ==
*(3) Project [name#46.first AS first#74]
+- *(3) Filter (((isnotnull(name#46) AND isnotnull(__rank#71)) AND 
(name#46.first = A)) AND (__rank#71 = 1))
 +- Window [row_number() windowspecdefinition(address#47, id#45 DESC NULLS 
LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
__rank#71], [address#47], [id#45 DESC NULLS LAST]
 +- *(2) Sort [address#47 ASC NULLS FIRST, id#45 DESC NULLS LAST], false, 0
 +- Exchange hashpartitioning(address#47, 5), true, [id=#52]
 +- *(1) Project [id#45, name#46, address#47]
 +- FileScan parquet [id#45,name#46,address#47,p#53] Batched: false, 
DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/private/var/folders/_c/4r2j33dd14n9ldfc2xqyzs40gn/T/spark-85d173af-42...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct,address:string>

```

 

The desired physical plan:

```

== Physical Plan ==
*(3) Project [_gen_alias_77#77 AS first#74]
+- *(3) Filter (((isnotnull(_gen_alias_77#77) AND isnotnull(__rank#71)) AND 
(_gen_alias_77#77 = A)) AND (__rank#71 = 1))
 +- Window [row_number() windowspecdefinition(address#47, id#45 DESC NULLS 
LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
__rank#71], [address#47], [id#45 DESC NULLS LAST]
 +- *(2) Sort [address#47 ASC NULLS FIRST, id#45 DESC NULLS LAST], false, 0
 +- Exchange hashpartitioning(address#47, 5), true, [id=#52]
 +- *(1) Project [id#45, name#46.first AS _gen_alias_77#77, address#47]
 +- FileScan parquet [id#45,name#46,address#47,p#53] Batched: false, 
DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/private/var/folders/_c/4r2j33dd14n9ldfc2xqyzs40gn/T/spark-c64e0b29-d9...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct,address:string>

```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-25165) Cannot parse Hive Struct

2018-08-20 Thread Frank Yin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586901#comment-16586901
 ] 

Frank Yin edited comment on SPARK-25165 at 8/21/18 3:48 AM:


 

 

{{#!/usr/bin/env python}}
 {{# -**- *coding: UTF-8 --*}}
 {{# encoding=utf8}}
 {{import sys}}
 {{import os}}
 {{import json}}
 {{import argparse}}
 {{import time}}
 {{from datetime import datetime, timedelta}}
 {{from calendar import timegm}}
 {{from pyspark.sql import SparkSession}}
 {{from pyspark.conf import SparkConf}}
 {{from pyspark.sql.functions import *}}
 {{from pyspark.sql.types import *}}

{{spark_conf = SparkConf().setAppName("Test Hive")}}
{{ .set("spark.executor.memory", "4g")\}}
{{ .set("spark.sql.catalogImplementation","hive")\}}
{{ .set("spark.speculation", "true")\}}
{{ .set("spark.dynamicAllocation.maxExecutors", "2000")\}}
{{ .set("spark.sql.shuffle.partitions", "400")}}

 

{{spark.sql("SELECT * FROM default.a").collect() }}

where default.a is a table in hive. 

schema: 

columnA:struct,view.b:array>

 


was (Author: frankyin-factual):
 

 

{{#!/usr/bin/env python}}
{{# -*- coding: UTF-8 -*-}}
{{# encoding=utf8}}
{{import sys}}
{{import os}}
{{import json}}
{{import argparse}}
{{import time}}
{{from datetime import datetime, timedelta}}
{{from calendar import timegm}}
{{from pyspark.sql import SparkSession}}
{{from pyspark.conf import SparkConf}}
{{from pyspark.sql.functions import *}}
{{from pyspark.sql.types import *}}{{spark_conf = SparkConf().setAppName("Test 
Hive")\}}
{{ .set("spark.executor.memory", "4g")\}}
{{ .set("spark.sql.catalogImplementation","hive")\}}
{{ .set("spark.speculation", "true")\}}
{{ .set("spark.dynamicAllocation.maxExecutors", "2000")\}}
{{ .set("spark.sql.shuffle.partitions", "400")}}{{spark = SparkSession\}}
{{ .builder\}}
{{ .config(conf=spark_conf)\}}
{{ .getOrCreate()}}

{{spark.sql("SELECT * FROM default.a").collect() }}

where default.a is a table in hive. 

schema: 

columnA:struct,view.b:array>

 

> Cannot parse Hive Struct
> 
>
> Key: SPARK-25165
> URL: https://issues.apache.org/jira/browse/SPARK-25165
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1, 2.3.1
>Reporter: Frank Yin
>Priority: Major
>
> org.apache.spark.SparkException: Cannot recognize hive type string: 
> struct,view.b:array>
>  
> My guess is dot(.) is causing issues for parsing. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-25165) Cannot parse Hive Struct

2018-08-20 Thread Frank Yin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586901#comment-16586901
 ] 

Frank Yin edited comment on SPARK-25165 at 8/21/18 3:48 AM:


 

 
{quote}{{#!/usr/bin/env python}}
 {{# -***- **coding: UTF-8 --*}}
 {{# encoding=utf8}}
 {{import sys}}
 {{import os}}
 {{import json}}
 {{import argparse}}
 {{import time}}
 {{from datetime import datetime, timedelta}}
 {{from calendar import timegm}}
 {{from pyspark.sql import SparkSession}}
 {{from pyspark.conf import SparkConf}}
 {{from pyspark.sql.functions import *}}
 {{from pyspark.sql.types import *}}

{{spark_conf = SparkConf().setAppName("Test 
Hive")}}.set("spark.sql.catalogImplementation","hive") 

{{spark.sql("SELECT * FROM default.a").collect() }}

where default.a is a table in hive. 

schema: 

columnA:struct,view.b:array>
{quote}
 


was (Author: frankyin-factual):
 

 

{{#!/usr/bin/env python}}
 {{# -**- *coding: UTF-8 --*}}
 {{# encoding=utf8}}
 {{import sys}}
 {{import os}}
 {{import json}}
 {{import argparse}}
 {{import time}}
 {{from datetime import datetime, timedelta}}
 {{from calendar import timegm}}
 {{from pyspark.sql import SparkSession}}
 {{from pyspark.conf import SparkConf}}
 {{from pyspark.sql.functions import *}}
 {{from pyspark.sql.types import *}}

{{spark_conf = SparkConf().setAppName("Test Hive")}}
{{ .set("spark.executor.memory", "4g")\}}
{{ .set("spark.sql.catalogImplementation","hive")\}}
{{ .set("spark.speculation", "true")\}}
{{ .set("spark.dynamicAllocation.maxExecutors", "2000")\}}
{{ .set("spark.sql.shuffle.partitions", "400")}}

 

{{spark.sql("SELECT * FROM default.a").collect() }}

where default.a is a table in hive. 

schema: 

columnA:struct,view.b:array>

 

> Cannot parse Hive Struct
> 
>
> Key: SPARK-25165
> URL: https://issues.apache.org/jira/browse/SPARK-25165
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1, 2.3.1
>Reporter: Frank Yin
>Priority: Major
>
> org.apache.spark.SparkException: Cannot recognize hive type string: 
> struct,view.b:array>
>  
> My guess is dot(.) is causing issues for parsing. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-25165) Cannot parse Hive Struct

2018-08-20 Thread Frank Yin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586901#comment-16586901
 ] 

Frank Yin edited comment on SPARK-25165 at 8/21/18 3:47 AM:


 

 

{{#!/usr/bin/env python}}
{{# -*- coding: UTF-8 -*-}}
{{# encoding=utf8}}
{{import sys}}
{{import os}}
{{import json}}
{{import argparse}}
{{import time}}
{{from datetime import datetime, timedelta}}
{{from calendar import timegm}}
{{from pyspark.sql import SparkSession}}
{{from pyspark.conf import SparkConf}}
{{from pyspark.sql.functions import *}}
{{from pyspark.sql.types import *}}{{spark_conf = SparkConf().setAppName("Test 
Hive")\}}
{{ .set("spark.executor.memory", "4g")\}}
{{ .set("spark.sql.catalogImplementation","hive")\}}
{{ .set("spark.speculation", "true")\}}
{{ .set("spark.dynamicAllocation.maxExecutors", "2000")\}}
{{ .set("spark.sql.shuffle.partitions", "400")}}{{spark = SparkSession\}}
{{ .builder\}}
{{ .config(conf=spark_conf)\}}
{{ .getOrCreate()}}

{{spark.sql("SELECT * FROM default.a").collect() }}

where default.a is a table in hive. 

schema: 

columnA:struct,view.b:array>

 


was (Author: frankyin-factual):
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# encoding=utf8
import sys
import os
import json
import argparse
import time
from datetime import datetime, timedelta
from calendar import timegm
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark_conf = SparkConf().setAppName("Test Hive")\
 .set("spark.executor.memory", "4g")\
 .set("spark.sql.catalogImplementation","hive")\
 .set("spark.speculation", "true")\
 .set("spark.dynamicAllocation.maxExecutors", "2000")\
 .set("spark.sql.shuffle.partitions", "400")

spark = SparkSession\
 .builder\
 .config(conf=spark_conf)\
 .getOrCreate()

 

places_and_devices = spark.sql("SELECT * FROM default.a").collect()

 

where default.a is a table in hive. 

schema: 

columnA:struct,view.b:array>

 

> Cannot parse Hive Struct
> 
>
> Key: SPARK-25165
> URL: https://issues.apache.org/jira/browse/SPARK-25165
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1, 2.3.1
>Reporter: Frank Yin
>Priority: Major
>
> org.apache.spark.SparkException: Cannot recognize hive type string: 
> struct,view.b:array>
>  
> My guess is dot(.) is causing issues for parsing. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25165) Cannot parse Hive Struct

2018-08-20 Thread Frank Yin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586901#comment-16586901
 ] 

Frank Yin commented on SPARK-25165:
---

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# encoding=utf8
import sys
import os
import json
import argparse
import time
from datetime import datetime, timedelta
from calendar import timegm
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark_conf = SparkConf().setAppName("Test Hive")\
 .set("spark.executor.memory", "4g")\
 .set("spark.sql.catalogImplementation","hive")\
 .set("spark.speculation", "true")\
 .set("spark.dynamicAllocation.maxExecutors", "2000")\
 .set("spark.sql.shuffle.partitions", "400")

spark = SparkSession\
 .builder\
 .config(conf=spark_conf)\
 .getOrCreate()

 

places_and_devices = spark.sql("SELECT * FROM default.a").collect()

 

where default.a is a table in hive. 

schema: 

columnA:struct,view.b:array>

 

> Cannot parse Hive Struct
> 
>
> Key: SPARK-25165
> URL: https://issues.apache.org/jira/browse/SPARK-25165
>     Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1, 2.3.1
>Reporter: Frank Yin
>Priority: Major
>
> org.apache.spark.SparkException: Cannot recognize hive type string: 
> struct,view.b:array>
>  
> My guess is dot(.) is causing issues for parsing. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25165) Cannot parse Hive Struct

2018-08-20 Thread Frank Yin (JIRA)
Frank Yin created SPARK-25165:
-

 Summary: Cannot parse Hive Struct
 Key: SPARK-25165
 URL: https://issues.apache.org/jira/browse/SPARK-25165
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.1, 2.2.1
Reporter: Frank Yin


org.apache.spark.SparkException: Cannot recognize hive type string: 
struct,view.b:array>

 

My guess is dot(.) is causing issues for parsing. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[zeromq-dev] Windows r2 high cpu usage

2014-05-26 Thread Frank Yin
Hi,

We are building  a project with the help of zmq.

Basically we have several windows boxes sending information to one linux box 
using PUSH PULL pattern.

The problem is that our windows boxes has very unstable cpu usage. At around 
10k messages per second, the cpu is occupying 100% cpu most of the time and it 
goes up and down every couple of seconds. The linux side is fairly stable 
through.

Without the zmq part, the windows boxes is stable, and 100%cpu usage will only 
happen when we have double of the traffic comparing to the above situation.

We are using windows r2.

Any suggestions on tuning?

Thanks
Frank
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev