[jira] [Commented] (SPARK-20624) SPIP: Add better handling for node shutdown

2024-02-02 Thread Yu-Jhe Li (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-20624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813592#comment-17813592
 ] 

Yu-Jhe Li commented on SPARK-20624:
---

Hi, we found the migrated shuffle files from the decommissioned node are never 
deleted even if the job had been completed for a long time. 

I have created an issue https://issues.apache.org/jira/browse/SPARK-46957 to 
address this issue. Can anyone help?

> SPIP: Add better handling for node shutdown
> ---
>
> Key: SPARK-20624
> URL: https://issues.apache.org/jira/browse/SPARK-20624
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Holden Karau
>Priority: Major
>
> While we've done some good work with better handling when Spark is choosing 
> to decommission nodes (SPARK-7955), it might make sense in environments where 
> we get preempted without our own choice (e.g. YARN over-commit, EC2 spot 
> instances, GCE Preemptiable instances, etc.) to do something for the data on 
> the node (or at least not schedule any new tasks).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-46957) Migrated shuffle data files from the decommissioned node should be removed when job completed

2024-02-02 Thread Yu-Jhe Li (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-46957:
--
Description: 
Hi, we have a long-lived Spark application run on a standalone cluster on GCP 
and we are using spot instances. To reduce the impact of preempted instances, 
we have enabled node decommission to let the preempted node migrate its shuffle 
data to other instances before it is deleted by GCP.

However, we found the migrated shuffle data from the decommissioned node is 
never removed. (same behavior on spark-3.5)

*Reproduce steps:*

1. Start spark-shell with 3 executors and enable decommission on both 
driver/worker
{code:java}
start-worker.sh[3331]: Spark Command: 
/usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp 
/opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 
-Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker 
--webui-port 8081 spark://master-01.com:7077 {code}
{code:java}
/opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \
  --total-executor-cores 12 \
  --conf spark.decommission.enabled=true \
  --conf spark.storage.decommission.enabled=true \
  --conf spark.storage.decommission.shuffleBlocks.enabled=true \
  --conf spark.storage.decommission.rddBlocks.enabled=true{code}
 

2. Manually stop 1 worker during execution
{code:java}
(1 to 10).foreach { i =>
  println(s"start iter $i ...")
  val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac risus 
vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, non 
laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat 
volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. 
Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim 
tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus nec 
sodales tempor, odio augue euismod ipsum, nec tristique e"
  val df = (1 to 1 * i).map(j => (j, s"${j}_${longString}")).toDF("id", 
"mystr")

  df.repartition(6).count()
  System.gc()
  println(s"finished iter $i, wait 15s for next round")
  Thread.sleep(15*1000)
}
System.gc()

start iter 1 ...
finished iter 1, wait 15s for next round
... {code}
 

3. Check the migrated shuffle data files on the remaining workers

{*}decommissioned node{*}: migrated shuffle file successfully
{code:java}
less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated '
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code}
{*}remaining shuffle data files on the other workers{*}: the migrated shuffle 
files are never removed
{code:java}
10.67.5.134 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data
10.67.5.139 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/29/shuffle_5_55_0.data
 {code}
 

*Expected behavior:*

The migrated shuffle data files should be removed after job completed

  was:
Hi, we have a long-lived Spark application run on a 

[jira] [Updated] (SPARK-46957) Migrated shuffle data files from the decommissioned node should be removed when job completed

2024-02-02 Thread Yu-Jhe Li (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-46957:
--
Description: 
Hi, we have a long-lived Spark application run on a standalone cluster on GCP 
and we are using spot instances. To reduce the impact of preempted instances, 
we have enabled node decommission to let the preempted node migrate its shuffle 
data to other instances before it is deleted by GCP.

However, we found the migrated shuffle data from the decommissioned node is 
never removed. (same behavior on spark-3.5)

*Reproduce steps:*

1. Start spark-shell with 3 executors and enable decommission on both 
driver/worker
{code:java}
start-worker.sh[3331]: Spark Command: 
/usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp 
/opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 
-Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker 
--webui-port 8081 spark://master-01.com:7077 {code}
{code:java}
/opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \
  --total-executor-cores 12 \
  --conf spark.decommission.enabled=true \
  --conf spark.storage.decommission.enabled=true \
  --conf spark.storage.decommission.shuffleBlocks.enabled=true \
  --conf spark.storage.decommission.rddBlocks.enabled=true{code}
 

2. Manually stop 1 worker during execution
{code:java}
(1 to 10).foreach { i =>
  println(s"start iter $i ...")
  val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac risus 
vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, non 
laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat 
volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. 
Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim 
tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus nec 
sodales tempor, odio augue euismod ipsum, nec tristique e"
  val df = (1 to 1 * i).map(j => (j, s"${j}_${longString}")).toDF("id", 
"mystr")

  df.repartition(6).count()
  System.gc()
  println(s"finished iter $i, wait 15s for next round")
  Thread.sleep(15*1000)
}
System.gc()

start iter 1 ...
finished iter 1, wait 15s for next round
... {code}
 

3. Check the migrated shuffle data files on the remaining workers

{*}decommissioned node{*}: migrated shuffle file successfully
{code:java}
less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated '
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code}
{*}remaining shuffle data files on the other workers{*}: the migrated shuffle 
files are never removed
{code:java}
10.67.5.134 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data
10.67.5.139 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/29/shuffle_5_55_0.data
 {code}
 

 

  was:
Hi, we have a long-lived Spark application run on a standalone cluster on GCP 
and we are using spot instances. To reduce the impact of preempted 

[jira] [Updated] (SPARK-46957) Migrated shuffle data files from the decommissioned node should be removed when job completed

2024-02-02 Thread Yu-Jhe Li (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-46957:
--
Description: 
Hi, we have a long-lived Spark application run on a standalone cluster on GCP 
and we are using spot instances. To reduce the impact of preempted instances, 
we have enabled node decommission to let the preempted node migrate its shuffle 
data to other instances before it is deleted by GCP.

However, we found the migrated shuffle data from the decommissioned node is 
never removed. (same behavior on spark-3.5)

*Reproduce steps:*

1. Start spark-shell with 3 executors and enable decommission on both 
driver/worker
{noformat}
start-worker.sh[3331]: Spark Command: 
/usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp 
/opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 
-Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker 
--webui-port 8081 spark://master-01.com:7077 {noformat}
{code:java}
start-worker.sh[3331]: Spark Command: 
/usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp 
/opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 
-Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker 
--webui-port 8081 spark://master-01.com:7077 {code}
 

 
{code:java}
/opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \
  --total-executor-cores 12 \
  --conf spark.decommission.enabled=true \
  --conf spark.storage.decommission.enabled=true \
  --conf spark.storage.decommission.shuffleBlocks.enabled=true \
  --conf spark.storage.decommission.rddBlocks.enabled=true{code}
 

2. Manually stop 1 worker during execution

 
{code:java}
(1 to 10).foreach { i =>
  println(s"start iter $i ...")
  val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac risus 
vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, non 
laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat 
volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. 
Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim 
tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus nec 
sodales tempor, odio augue euismod ipsum, nec tristique e"
  val df = (1 to 1 * i).map(j => (j, s"${j}_${longString}")).toDF("id", 
"mystr")

  df.repartition(6).count()
  System.gc()
  println(s"finished iter $i, wait 15s for next round")
  Thread.sleep(15*1000)
}
System.gc()

start iter 1 ...
finished iter 1, wait 15s for next round
... {code}
 

3. Check the migrated shuffle data files on the remaining workers

{*}decommissioned node{*}: migrated shuffle file successfully
{code:java}
less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated '
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code}
{*}remaining shuffle data files on the other workers{*}: the migrated shuffle 
files are never removed

 
{code:java}
10.67.5.134 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data
10.67.5.139 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 

[jira] [Created] (SPARK-46957) Migrated shuffle data files from the decommissioned node should be removed when job completed

2024-02-02 Thread Yu-Jhe Li (Jira)
Yu-Jhe Li created SPARK-46957:
-

 Summary: Migrated shuffle data files from the decommissioned node 
should be removed when job completed
 Key: SPARK-46957
 URL: https://issues.apache.org/jira/browse/SPARK-46957
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Yu-Jhe Li


Hi, we have a long-lived Spark application run on a standalone cluster on GCP 
and we are using spot instances. To reduce the impact of preempted instances, 
we have enabled node decommission to let the preempted node migrate its shuffle 
data to other instances before it is deleted by GCP.

However, we found the migrated shuffle data from the decommissioned node is 
never removed. (same behavior on spark-3.5)

*Reproduce steps:*

1. Start spark-shell with 3 executors and enable decommission on both 
driver/worker

 
{code:java}
start-worker.sh[3331]: Spark Command: 
/usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp 
/opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 
-Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker 
--webui-port 8081 spark://master-01.com:7077 {code}
 

 
{code:java}
/opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \
  --total-executor-cores 12 \
  --conf spark.decommission.enabled=true \
  --conf spark.storage.decommission.enabled=true \
  --conf spark.storage.decommission.shuffleBlocks.enabled=true \
  --conf spark.storage.decommission.rddBlocks.enabled=true{code}
 

2. Manually stop 1 worker during execution

 
{code:java}
(1 to 10).foreach { i =>
  println(s"start iter $i ...")
  val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac risus 
vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, non 
laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat 
volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. 
Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim 
tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus nec 
sodales tempor, odio augue euismod ipsum, nec tristique e"
  val df = (1 to 1 * i).map(j => (j, s"${j}_${longString}")).toDF("id", 
"mystr")

  df.repartition(6).count()
  System.gc()
  println(s"finished iter $i, wait 15s for next round")
  Thread.sleep(15*1000)
}
System.gc()

start iter 1 ...
finished iter 1, wait 15s for next round
... {code}
 

3. Check the migrated shuffle data files on the remaining workers

{*}decommissioned node{*}: migrated shuffle file successfully
{code:java}
less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated '
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code}
{*}remaining shuffle data files on the other workers{*}: the migrated shuffle 
files are never removed

 
{code:java}
10.67.5.134 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data
10.67.5.139 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 

[jira] [Resolved] (SPARK-30181) Throws runtime exception when filter metastore partition key that's not string type or integral types

2019-12-17 Thread Yu-Jhe Li (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li resolved SPARK-30181.
---
Resolution: Duplicate

> Throws runtime exception when filter metastore partition key that's not 
> string type or integral types
> -
>
> Key: SPARK-30181
> URL: https://issues.apache.org/jira/browse/SPARK-30181
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 2.4.3, 2.4.4
>Reporter: Yu-Jhe Li
>Priority: Major
>
> SQL below will throw a runtime exception since spark-2.4.0. I think it's a 
> bug brought from SPARK-22384 
> {code:scala}
> val df = Seq(
> (1, java.sql.Timestamp.valueOf("2019-12-01 00:00:00"), 1), 
> (2, java.sql.Timestamp.valueOf("2019-12-01 01:00:00"), 1)
>   ).toDF("id", "dt", "value")
> df.write.partitionBy("dt").mode("overwrite").saveAsTable("timestamp_part")
> spark.sql("select * from timestamp_part where dt >= '2019-12-01 
> 00:00:00'").explain(true)
> {code}
> {noformat}
> Caught Hive MetaException attempting to get partition metadata by filter from 
> Hive. You can set the Spark configuration setting 
> spark.sql.hive.manageFilesourcePartitions to false to work around this 
> problem, however this will result in degraded performance. Please report a 
> bug: https://issues.apache.org/jira/browse/SPARK
> java.lang.RuntimeException: Caught Hive MetaException attempting to get 
> partition metadata by filter from Hive. You can set the Spark configuration 
> setting spark.sql.hive.manageFilesourcePartitions to false to work around 
> this problem, however this will result in degraded performance. Please report 
> a bug: https://issues.apache.org/jira/browse/SPARK
>   at 
> org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:774)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply(HiveClientImpl.scala:679)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply(HiveClientImpl.scala:677)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:275)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:213)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:212)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:258)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(HiveClientImpl.scala:677)
>   at 
> org.apache.spark.sql.hive.client.HiveClientSuite.testMetastorePartitionFiltering(HiveClientSuite.scala:310)
>   at 
> org.apache.spark.sql.hive.client.HiveClientSuite.org$apache$spark$sql$hive$client$HiveClientSuite$$testMetastorePartitionFiltering(HiveClientSuite.scala:282)
>   at 
> org.apache.spark.sql.hive.client.HiveClientSuite$$anonfun$1.apply$mcV$sp(HiveClientSuite.scala:105)
>   at 
> org.apache.spark.sql.hive.client.HiveClientSuite$$anonfun$1.apply(HiveClientSuite.scala:105)
>   at 
> org.apache.spark.sql.hive.client.HiveClientSuite$$anonfun$1.apply(HiveClientSuite.scala:105)
>   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>   at org.scalatest.Transformer.apply(Transformer.scala:22)
>   at org.scalatest.Transformer.apply(Transformer.scala:20)
>   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
>   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
>   at 
> org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
>   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
>   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
>   at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
>   at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
>   at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
>   at 
> org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
>   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
>   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
>   at 

[jira] [Commented] (SPARK-30181) Throws runtime exception when filter metastore partition key that's not string type or integral types

2019-12-17 Thread Yu-Jhe Li (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16998053#comment-16998053
 ] 

Yu-Jhe Li commented on SPARK-30181:
---

Yes, the bug is fixed in the master branch and latest branch-2.4. We can close 
this issue.

> Throws runtime exception when filter metastore partition key that's not 
> string type or integral types
> -
>
> Key: SPARK-30181
> URL: https://issues.apache.org/jira/browse/SPARK-30181
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 2.4.3, 2.4.4
>Reporter: Yu-Jhe Li
>Priority: Major
>
> SQL below will throw a runtime exception since spark-2.4.0. I think it's a 
> bug brought from SPARK-22384 
> {code:scala}
> val df = Seq(
> (1, java.sql.Timestamp.valueOf("2019-12-01 00:00:00"), 1), 
> (2, java.sql.Timestamp.valueOf("2019-12-01 01:00:00"), 1)
>   ).toDF("id", "dt", "value")
> df.write.partitionBy("dt").mode("overwrite").saveAsTable("timestamp_part")
> spark.sql("select * from timestamp_part where dt >= '2019-12-01 
> 00:00:00'").explain(true)
> {code}
> {noformat}
> Caught Hive MetaException attempting to get partition metadata by filter from 
> Hive. You can set the Spark configuration setting 
> spark.sql.hive.manageFilesourcePartitions to false to work around this 
> problem, however this will result in degraded performance. Please report a 
> bug: https://issues.apache.org/jira/browse/SPARK
> java.lang.RuntimeException: Caught Hive MetaException attempting to get 
> partition metadata by filter from Hive. You can set the Spark configuration 
> setting spark.sql.hive.manageFilesourcePartitions to false to work around 
> this problem, however this will result in degraded performance. Please report 
> a bug: https://issues.apache.org/jira/browse/SPARK
>   at 
> org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:774)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply(HiveClientImpl.scala:679)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply(HiveClientImpl.scala:677)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:275)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:213)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:212)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:258)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(HiveClientImpl.scala:677)
>   at 
> org.apache.spark.sql.hive.client.HiveClientSuite.testMetastorePartitionFiltering(HiveClientSuite.scala:310)
>   at 
> org.apache.spark.sql.hive.client.HiveClientSuite.org$apache$spark$sql$hive$client$HiveClientSuite$$testMetastorePartitionFiltering(HiveClientSuite.scala:282)
>   at 
> org.apache.spark.sql.hive.client.HiveClientSuite$$anonfun$1.apply$mcV$sp(HiveClientSuite.scala:105)
>   at 
> org.apache.spark.sql.hive.client.HiveClientSuite$$anonfun$1.apply(HiveClientSuite.scala:105)
>   at 
> org.apache.spark.sql.hive.client.HiveClientSuite$$anonfun$1.apply(HiveClientSuite.scala:105)
>   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>   at org.scalatest.Transformer.apply(Transformer.scala:22)
>   at org.scalatest.Transformer.apply(Transformer.scala:20)
>   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
>   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
>   at 
> org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
>   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
>   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
>   at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
>   at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
>   at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
>   at 
> org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
>   at 

[jira] [Updated] (SPARK-30181) Throws runtime exception when filter metastore partition key that's not string type or integral types

2019-12-16 Thread Yu-Jhe Li (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-30181:
--
Description: 
SQL below will throw a runtime exception since spark-2.4.0. I think it's a bug 
brought from SPARK-22384 
{code:scala}
val df = Seq(
(1, java.sql.Timestamp.valueOf("2019-12-01 00:00:00"), 1), 
(2, java.sql.Timestamp.valueOf("2019-12-01 01:00:00"), 1)
  ).toDF("id", "dt", "value")
df.write.partitionBy("dt").mode("overwrite").saveAsTable("timestamp_part")

spark.sql("select * from timestamp_part where dt >= '2019-12-01 
00:00:00'").explain(true)
{code}
{noformat}
Caught Hive MetaException attempting to get partition metadata by filter from 
Hive. You can set the Spark configuration setting 
spark.sql.hive.manageFilesourcePartitions to false to work around this problem, 
however this will result in degraded performance. Please report a bug: 
https://issues.apache.org/jira/browse/SPARK
java.lang.RuntimeException: Caught Hive MetaException attempting to get 
partition metadata by filter from Hive. You can set the Spark configuration 
setting spark.sql.hive.manageFilesourcePartitions to false to work around this 
problem, however this will result in degraded performance. Please report a bug: 
https://issues.apache.org/jira/browse/SPARK
  at 
org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:774)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply(HiveClientImpl.scala:679)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply(HiveClientImpl.scala:677)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:275)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:213)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:212)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:258)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(HiveClientImpl.scala:677)
  at 
org.apache.spark.sql.hive.client.HiveClientSuite.testMetastorePartitionFiltering(HiveClientSuite.scala:310)
  at 
org.apache.spark.sql.hive.client.HiveClientSuite.org$apache$spark$sql$hive$client$HiveClientSuite$$testMetastorePartitionFiltering(HiveClientSuite.scala:282)
  at 
org.apache.spark.sql.hive.client.HiveClientSuite$$anonfun$1.apply$mcV$sp(HiveClientSuite.scala:105)
  at 
org.apache.spark.sql.hive.client.HiveClientSuite$$anonfun$1.apply(HiveClientSuite.scala:105)
  at 
org.apache.spark.sql.hive.client.HiveClientSuite$$anonfun$1.apply(HiveClientSuite.scala:105)
  at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
  at org.scalatest.Transformer.apply(Transformer.scala:22)
  at org.scalatest.Transformer.apply(Transformer.scala:20)
  at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
  at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
  at 
org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
  at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
  at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
  at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
  at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
  at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
  at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
  at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
  at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
  at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
  at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
  at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
  at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
  at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
  at org.scalatest.Suite$class.run(Suite.scala:1147)
  at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
  at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
  at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
  at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
  at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
  at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
  at 
org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
  at 

[jira] [Created] (SPARK-30181) Throws runtime exception when filter metastore partition key that's not string type or integral types

2019-12-09 Thread Yu-Jhe Li (Jira)
Yu-Jhe Li created SPARK-30181:
-

 Summary: Throws runtime exception when filter metastore partition 
key that's not string type or integral types
 Key: SPARK-30181
 URL: https://issues.apache.org/jira/browse/SPARK-30181
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.4, 2.4.3, 2.4.2, 2.4.1, 2.4.0
Reporter: Yu-Jhe Li


SQL below will throw a runtime exception since spark-2.4.0. I think it's a bug 
brought from SPARK-22384 
{code:scala}
spark.sql("CREATE TABLE timestamp_part (value INT) PARTITIONED BY (dt 
TIMESTAMP)")
val df = Seq(
(1, java.sql.Timestamp.valueOf("2019-12-01 00:00:00"), 1), 
(2, java.sql.Timestamp.valueOf("2019-12-01 01:00:00"), 1)
  ).toDF("id", "dt", "value")
df.write.partitionBy("dt").mode("overwrite").saveAsTable("timestamp_part")

spark.sql("select * from timestamp_part where dt >= '2019-12-01 
00:00:00'").explain(true)
{code}
{noformat}
Caught Hive MetaException attempting to get partition metadata by filter from 
Hive. You can set the Spark configuration setting 
spark.sql.hive.manageFilesourcePartitions to false to work around this problem, 
however this will result in degraded performance. Please report a bug: 
https://issues.apache.org/jira/browse/SPARK
java.lang.RuntimeException: Caught Hive MetaException attempting to get 
partition metadata by filter from Hive. You can set the Spark configuration 
setting spark.sql.hive.manageFilesourcePartitions to false to work around this 
problem, however this will result in degraded performance. Please report a bug: 
https://issues.apache.org/jira/browse/SPARK
  at 
org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:774)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply(HiveClientImpl.scala:679)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply(HiveClientImpl.scala:677)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:275)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:213)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:212)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:258)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(HiveClientImpl.scala:677)
  at 
org.apache.spark.sql.hive.client.HiveClientSuite.testMetastorePartitionFiltering(HiveClientSuite.scala:310)
  at 
org.apache.spark.sql.hive.client.HiveClientSuite.org$apache$spark$sql$hive$client$HiveClientSuite$$testMetastorePartitionFiltering(HiveClientSuite.scala:282)
  at 
org.apache.spark.sql.hive.client.HiveClientSuite$$anonfun$1.apply$mcV$sp(HiveClientSuite.scala:105)
  at 
org.apache.spark.sql.hive.client.HiveClientSuite$$anonfun$1.apply(HiveClientSuite.scala:105)
  at 
org.apache.spark.sql.hive.client.HiveClientSuite$$anonfun$1.apply(HiveClientSuite.scala:105)
  at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
  at org.scalatest.Transformer.apply(Transformer.scala:22)
  at org.scalatest.Transformer.apply(Transformer.scala:20)
  at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
  at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
  at 
org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
  at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
  at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
  at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
  at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
  at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
  at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
  at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
  at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
  at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
  at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
  at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
  at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
  at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
  at org.scalatest.Suite$class.run(Suite.scala:1147)
  at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
  at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
  at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
  at 

[jira] [Commented] (SPARK-27652) Caught Hive MetaException when query by partition (partition col start with underscore)

2019-11-04 Thread Yu-Jhe Li (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967213#comment-16967213
 ] 

Yu-Jhe Li commented on SPARK-27652:
---

Hi [~tongqqiu], I have issued this problem too. How did you solve that?

> Caught Hive MetaException when query by partition (partition col start with 
> underscore)
> ---
>
> Key: SPARK-27652
> URL: https://issues.apache.org/jira/browse/SPARK-27652
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.1, 2.4.2, 2.4.3
>Reporter: Tongqing Qiu
>Priority: Minor
>
> create a table, set location as parquet, do msck repair table to get the data.
> But when query with partition column, got some errors (adding backtick would 
> not help)
> {code:java}
> select count from some_table where `_partition_date` = '2015-01-01'{code}
> Error
> {code:java}
> com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: 
> java.lang.RuntimeException: Caught Hive MetaException attempting to get 
> partition metadata by filter from Hive. You can set the Spark configuration 
> setting spark.sql.hive.manageFilesourcePartitions to false to work around 
> this problem, however this will result in degraded performance. Please report 
> a bug: https://issues.apache.org/jira/browse/SPARK at 
> org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:783)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply(HiveClientImpl.scala:774)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply(HiveClientImpl.scala:772)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:322)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$retryLocked$1.apply(HiveClientImpl.scala:230)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$retryLocked$1.apply(HiveClientImpl.scala:222)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl.synchronizeOnObject(HiveClientImpl.scala:266)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:222)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:305)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(HiveClientImpl.scala:772)
>  at 
> org.apache.spark.sql.hive.client.PoolingHiveClient$$anonfun$getPartitionsByFilter$1.apply(PoolingHiveClient.scala:373)
>  at 
> org.apache.spark.sql.hive.client.PoolingHiveClient$$anonfun$getPartitionsByFilter$1.apply(PoolingHiveClient.scala:372)
>  at 
> org.apache.spark.sql.hive.client.PoolingHiveClient.withHiveClient(PoolingHiveClient.scala:101)
>  at 
> org.apache.spark.sql.hive.client.PoolingHiveClient.getPartitionsByFilter(PoolingHiveClient.scala:372)
>  at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply(HiveExternalCatalog.scala:1295)
>  at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply(HiveExternalCatalog.scala:1288)
>  at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$withClient$1$$anonfun$apply$1.apply(HiveExternalCatalog.scala:141)
>  at 
> org.apache.spark.sql.hive.HiveExternalCatalog.org$apache$spark$sql$hive$HiveExternalCatalog$$maybeSynchronized(HiveExternalCatalog.scala:104)
>  at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$withClient$1.apply(HiveExternalCatalog.scala:139)
>  at 
> com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:345)
>  at 
> com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:331)
>  at 
> com.databricks.spark.util.SparkDatabricksProgressReporter$.withStatusCode(ProgressReporter.scala:23)
>  at 
> org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:137)
>  at 
> org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionsByFilter(HiveExternalCatalog.scala:1288)
>  at 
> org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionsByFilter(ExternalCatalogWithListener.scala:261)
>  at 
> org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilter(SessionCatalog.scala:1045)
>  at 
> org.apache.spark.sql.execution.datasources.CatalogFileIndex.filterPartitions(CatalogFileIndex.scala:73)
>  at 
> org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:62)
>  at 
> org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:27)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:277)
>  at 
> 

[jira] [Resolved] (SPARK-24492) Endless attempted task when TaskCommitDenied exception writing to S3A

2019-04-18 Thread Yu-Jhe Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li resolved SPARK-24492.
---
Resolution: Won't Do

Haven't happened again since upgrade to 2.3.2, i'm going to close this issue.

> Endless attempted task when TaskCommitDenied exception writing to S3A
> -
>
> Key: SPARK-24492
> URL: https://issues.apache.org/jira/browse/SPARK-24492
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yu-Jhe Li
>Priority: Critical
> Attachments: retry_stage.png, 螢幕快照 2018-05-16 上午11.10.46.png, 螢幕快照 
> 2018-05-16 上午11.10.57.png
>
>
> Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
> output file to S3, some tasks endless retry and all of them failed with 
> TaskCommitDenied exception. This happened when we run Spark application on 
> some network issue instances. (it runs well on healthy spot instances)
> Sorry, I can find a easy way to reproduce this issue, here's all I can 
> provide.
> The Spark UI shows (in attachments) one task of stage 112 failed due to 
> FetchFailedException (it is network issue) and attempt to retry a new stage 
> 112 (retry 1). But in stage 112 (retry 1), all task failed due to 
> TaskCommitDenied exception, and keep retry (it never succeed and cause lots 
> of S3 requests).
> On the other side, driver logs shows:
>  # task 123.0 in stage 112.0 failed due to FetchFailedException (network 
> issue cause corrupted file)
>  # warning message from OutputCommitCoordinator
>  # task 92.0 in stage 112.1 failed when writing rows
>  # keep retry the failed tasks, but never succeed
> {noformat}
> 2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
> (TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
> 10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
> org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at 
> 

[jira] [Commented] (SPARK-24492) Endless attempted task when TaskCommitDenied exception writing to S3A

2018-06-11 Thread Yu-Jhe Li (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508065#comment-16508065
 ] 

Yu-Jhe Li commented on SPARK-24492:
---

[~ste...@apache.org] thanks for your replay,
do you have any idea why spark committer does not be reported of the commit 
failure? if it is fixed, can we avoid endless retry?

> Endless attempted task when TaskCommitDenied exception writing to S3A
> -
>
> Key: SPARK-24492
> URL: https://issues.apache.org/jira/browse/SPARK-24492
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yu-Jhe Li
>Priority: Critical
> Attachments: retry_stage.png, 螢幕快照 2018-05-16 上午11.10.46.png, 螢幕快照 
> 2018-05-16 上午11.10.57.png
>
>
> Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
> output file to S3, some tasks endless retry and all of them failed with 
> TaskCommitDenied exception. This happened when we run Spark application on 
> some network issue instances. (it runs well on healthy spot instances)
> Sorry, I can find a easy way to reproduce this issue, here's all I can 
> provide.
> The Spark UI shows (in attachments) one task of stage 112 failed due to 
> FetchFailedException (it is network issue) and attempt to retry a new stage 
> 112 (retry 1). But in stage 112 (retry 1), all task failed due to 
> TaskCommitDenied exception, and keep retry (it never succeed and cause lots 
> of S3 requests).
> On the other side, driver logs shows:
>  # task 123.0 in stage 112.0 failed due to FetchFailedException (network 
> issue cause corrupted file)
>  # warning message from OutputCommitCoordinator
>  # task 92.0 in stage 112.1 failed when writing rows
>  # keep retry the failed tasks, but never succeed
> {noformat}
> 2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
> (TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
> 10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
> org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at 

[jira] [Commented] (SPARK-24492) Endless attempted task when TaskCommitDenied exception writing to S3A

2018-06-08 Thread Yu-Jhe Li (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16506792#comment-16506792
 ] 

Yu-Jhe Li commented on SPARK-24492:
---

[~ste...@apache.org]: i thinks the TaskCommitDenied exception is not caused by 
S3 eventually consistency. As [~jiangxb1987] mentioned, there may be another 
task get the lock of commit permission. 

Btw, in this case, we did not enable speculative.

> Endless attempted task when TaskCommitDenied exception writing to S3A
> -
>
> Key: SPARK-24492
> URL: https://issues.apache.org/jira/browse/SPARK-24492
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yu-Jhe Li
>Priority: Critical
> Attachments: retry_stage.png, 螢幕快照 2018-05-16 上午11.10.46.png, 螢幕快照 
> 2018-05-16 上午11.10.57.png
>
>
> Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
> output file to S3, some tasks endless retry and all of them failed with 
> TaskCommitDenied exception. This happened when we run Spark application on 
> some network issue instances. (it runs well on healthy spot instances)
> Sorry, I can find a easy way to reproduce this issue, here's all I can 
> provide.
> The Spark UI shows (in attachments) one task of stage 112 failed due to 
> FetchFailedException (it is network issue) and attempt to retry a new stage 
> 112 (retry 1). But in stage 112 (retry 1), all task failed due to 
> TaskCommitDenied exception, and keep retry (it never succeed and cause lots 
> of S3 requests).
> On the other side, driver logs shows:
>  # task 123.0 in stage 112.0 failed due to FetchFailedException (network 
> issue cause corrupted file)
>  # warning message from OutputCommitCoordinator
>  # task 92.0 in stage 112.1 failed when writing rows
>  # keep retry the failed tasks, but never succeed
> {noformat}
> 2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
> (TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
> 10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
> org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   

[jira] [Updated] (SPARK-24492) Endless attempted task when TaskCommitDenied exception

2018-06-07 Thread Yu-Jhe Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-24492:
--
Description: 
Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
output file to S3, some tasks endless retry and all of them failed with 
TaskCommitDenied exception. This happened when we run Spark application on some 
network issue instances. (it runs well on healthy spot instances)

Sorry, I can find a easy way to reproduce this issue, here's all I can provide.

The Spark UI shows (in attachments) one task of stage 112 failed due to 
FetchFailedException (it is network issue) and attempt to retry a new stage 112 
(retry 1). But in stage 112 (retry 1), all task failed due to TaskCommitDenied 
exception, and keep retry (it never succeed and cause lots of S3 requests).

On the other side, driver logs shows:
 # task 123.0 in stage 112.0 failed due to FetchFailedException (network issue 
cause corrupted file)
 # warning message from OutputCommitCoordinator
 # task 92.0 in stage 112.1 failed when writing rows
 # keep retry the failed tasks, but never succeed

{noformat}
2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
(TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
at 
org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)

[jira] [Updated] (SPARK-24492) Endless attempted task when TaskCommitDenied exception

2018-06-07 Thread Yu-Jhe Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-24492:
--
Environment: (was: Spark version: spark-2.2.0)

> Endless attempted task when TaskCommitDenied exception
> --
>
> Key: SPARK-24492
> URL: https://issues.apache.org/jira/browse/SPARK-24492
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yu-Jhe Li
>Priority: Critical
> Attachments: retry_stage.png, 螢幕快照 2018-05-16 上午11.10.46.png, 螢幕快照 
> 2018-05-16 上午11.10.57.png
>
>
> Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
> output file to S3, some tasks endless retry and all of them failed with 
> TaskCommitDenied exception. This happened when we run Spark application on 
> some network issue instances. (it runs well on healthy spot instances)
> Sorry, I can find a easy way to reproduce this issue, here's all I can 
> provide.
> The Spark UI shows one task of stage 112 failed due to FetchFailedException 
> (it is network issue) and attempt to retry a new stage 112 (retry 1). But in 
> stage 112 (retry 1), all task failed due to TaskCommitDenied exception, and 
> keep retry (it never succeed and cause lots of S3 requests).
>  !retry_stage.png! 
>  !螢幕快照 2018-05-16 上午11.10.57.png! 
>  !螢幕快照 2018-05-16 上午11.10.46.png! 
> On the other side, driver logs shows:
>  # task 123.0 in stage 112.0 failed due to FetchFailedException (network 
> issue cause corrupted file)
>  # warning message from OutputCommitCoordinator
>  # task 92.0 in stage 112.1 failed when writing rows
>  # keep retry the failed tasks, but never succeed
> {noformat}
> 2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
> (TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
> 10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
> org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at 
> 

[jira] [Updated] (SPARK-24492) Endless attempted task when TaskCommitDenied exception

2018-06-07 Thread Yu-Jhe Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-24492:
--
Shepherd:   (was: Jiang Xingbo)

> Endless attempted task when TaskCommitDenied exception
> --
>
> Key: SPARK-24492
> URL: https://issues.apache.org/jira/browse/SPARK-24492
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yu-Jhe Li
>Priority: Critical
> Attachments: retry_stage.png, 螢幕快照 2018-05-16 上午11.10.46.png, 螢幕快照 
> 2018-05-16 上午11.10.57.png
>
>
> Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
> output file to S3, some tasks endless retry and all of them failed with 
> TaskCommitDenied exception. This happened when we run Spark application on 
> some network issue instances. (it runs well on healthy spot instances)
> Sorry, I can find a easy way to reproduce this issue, here's all I can 
> provide.
> The Spark UI shows one task of stage 112 failed due to FetchFailedException 
> (it is network issue) and attempt to retry a new stage 112 (retry 1). But in 
> stage 112 (retry 1), all task failed due to TaskCommitDenied exception, and 
> keep retry (it never succeed and cause lots of S3 requests).
>  !retry_stage.png! 
>  !螢幕快照 2018-05-16 上午11.10.57.png! 
>  !螢幕快照 2018-05-16 上午11.10.46.png! 
> On the other side, driver logs shows:
>  # task 123.0 in stage 112.0 failed due to FetchFailedException (network 
> issue cause corrupted file)
>  # warning message from OutputCommitCoordinator
>  # task 92.0 in stage 112.1 failed when writing rows
>  # keep retry the failed tasks, but never succeed
> {noformat}
> 2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
> (TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
> 10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
> org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at 
> 

[jira] [Updated] (SPARK-24492) Endless attempted task when TaskCommitDenied exception

2018-06-07 Thread Yu-Jhe Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-24492:
--
Description: 
Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
output file to S3, some tasks endless retry and all of them failed with 
TaskCommitDenied exception. This happened when we run Spark application on some 
network issue instances. (it runs well on healthy spot instances)

Sorry, I can find a easy way to reproduce this issue, here's all I can provide.

The Spark UI shows one task of stage 112 failed due to FetchFailedException (it 
is network issue) and attempt to retry a new stage 112 (retry 1). But in stage 
112 (retry 1), all task failed due to TaskCommitDenied exception, and keep 
retry (it never succeed and cause lots of S3 requests).
 !retry_stage.png! 
 !螢幕快照 2018-05-16 上午11.10.57.png! 
 !螢幕快照 2018-05-16 上午11.10.46.png! 


On the other side, driver logs shows:
 # task 123.0 in stage 112.0 failed due to FetchFailedException (network issue 
cause corrupted file)
 # warning message from OutputCommitCoordinator
 # task 92.0 in stage 112.1 failed when writing rows
 # keep retry the failed tasks, but never succeed

{noformat}
2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
(TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
at 
org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at 

[jira] [Updated] (SPARK-24492) Endless attempted task when TaskCommitDenied exception

2018-06-07 Thread Yu-Jhe Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-24492:
--
Description: 
Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
output file to S3, some tasks endless retry and all of them failed with 
TaskCommitDenied exception. This happened when we run Spark application on some 
network issue instances. (it runs well on healthy spot instances)

Sorry, I can find a easy way to reproduce this issue, here's all I can provide.

The Spark UI shows one task of stage 112 failed due to FetchFailedException (it 
is network issue) and attempt to retry a new stage 112 (retry 1). But in stage 
112 (retry 1), all task failed due to TaskCommitDenied exception, and keep 
retry (it never succeed and cause lots of S3 requests).
 !retry_stage.png! 
 !螢幕快照 2018-05-16 上午11.10.57.png! 
 !螢幕快照 2018-05-16 上午11.10.46.png! 


The other side, driver logs shows:
 # task 123.0 in stage 112.0 failed due to FetchFailedException (network issue 
cause corrupted file)
 # warning message from OutputCommitCoordinator
 # task 92.0 in stage 112.1 failed when writing rows
 # keep retry the failed tasks, but never succeed

{noformat}
2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
(TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
at 
org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at 

[jira] [Updated] (SPARK-24492) Endless attempted task when TaskCommitDenied exception

2018-06-07 Thread Yu-Jhe Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-24492:
--
Description: 
Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
output file to S3, some tasks endless retry and all of them failed with 
TaskCommitDenied exception. This happened when we run Spark application on some 
network issue instances. (it runs well on healthy spot instances)

Sorry, I can find a easy way to reproduce this issue, here's all I can provide.

The Spark UI shows one task of stage 112 failed due to FetchFailedException (it 
is network issue) and attempt to retry a new stage 112 (retry 1). But in stage 
112 (retry 1), all task failed due to TaskCommitDenied exception, and keep 
retry (it never succeed and cause lots of S3 requests).
 !retry_stage.png! 
 !螢幕快照 2018-05-16 上午11.10.57.png! 
 !螢幕快照 2018-05-16 上午11.10.46.png! 

The other side, driver logs shows:
 # task 123.0 in stage 112.0 failed due to FetchFailedException (network issue 
cause corrupted file)
 # warning message from OutputCommitCoordinator
 # task 92.0 in stage 112.1 failed when writing rows
 # keep retry the failed tasks, but never succeed

{noformat}
2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
(TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
at 
org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at 

[jira] [Updated] (SPARK-24492) Endless attempted task when TaskCommitDenied exception

2018-06-07 Thread Yu-Jhe Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-24492:
--
Description: 
Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
output file to S3, some tasks endless retry and all of them failed with 
TaskCommitDenied exception. This happened when we run Spark application on some 
network issue instances. (it runs well on healthy spot instances)

Sorry, I can find a easy way to reproduce this issue, here's all I can provide.

The Spark UI shows one task of stage 112 failed due to FetchFailedException (it 
is network issue) and attempt to retry a new stage 112 (retry 1). But in stage 
112 (retry 1), all task failed due to TaskCommitDenied exception, and keep 
retry (it never succeed and cause lots of S3 requests).
 !retry_stage.png! 


The other side, driver logs shows:
 # task 123.0 in stage 112.0 failed due to FetchFailedException (network issue 
cause corrupted file)
 # warning message from OutputCommitCoordinator
 # task 92.0 in stage 112.1 failed when writing rows
 # keep retry the failed tasks, but never succeed

{noformat}
2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
(TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
at 
org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at 

[jira] [Updated] (SPARK-24492) Endless attempted task when TaskCommitDenied exception

2018-06-07 Thread Yu-Jhe Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-24492:
--
Attachment: retry_stage.png
螢幕快照 2018-05-16 上午11.10.57.png
螢幕快照 2018-05-16 上午11.10.46.png

> Endless attempted task when TaskCommitDenied exception
> --
>
> Key: SPARK-24492
> URL: https://issues.apache.org/jira/browse/SPARK-24492
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: Spark version: spark-2.2.0
>Reporter: Yu-Jhe Li
>Priority: Critical
> Attachments: retry_stage.png, 螢幕快照 2018-05-16 上午11.10.46.png, 螢幕快照 
> 2018-05-16 上午11.10.57.png
>
>
> Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
> output file to S3, some tasks endless retry and all of them failed with 
> TaskCommitDenied exception. This happened when we run Spark application on 
> some network issue instances. (it runs well on healthy spot instances)
> Sorry, I can find a easy way to reproduce this issue, here's all I can 
> provide.
> The Spark UI shows one task of stage 112 failed due to FetchFailedException 
> (it is network issue) and attempt to retry a new stage 112 (retry 1). But in 
> stage 112 (retry 1), all task failed due to TaskCommitDenied exception, and 
> keep retry (it never succeed and cause lots of S3 requests).
> The other side, driver logs shows:
>  # task 123.0 in stage 112.0 failed due to FetchFailedException (network 
> issue cause corrupted file)
>  # warning message from OutputCommitCoordinator
>  # task 92.0 in stage 112.1 failed when writing rows
>  # keep retry the failed tasks, but never succeed
> {noformat}
> 2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
> (TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
> 10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
> org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at 
> 

[jira] [Updated] (SPARK-24492) Endless attempted task when TaskCommitDenied exception

2018-06-07 Thread Yu-Jhe Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-24492:
--
Description: 
Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
output file to S3, some tasks endless retry and all of them failed with 
TaskCommitDenied exception. This happened when we run Spark application on some 
network issue instances. (it runs well on healthy spot instances)

Sorry, I can find a easy way to reproduce this issue, here's all I can provide.

The Spark UI shows one task of stage 112 failed due to FetchFailedException (it 
is network issue) and attempt to retry a new stage 112 (retry 1). But in stage 
112 (retry 1), all task failed due to TaskCommitDenied exception, and keep 
retry (it never succeed and cause lots of S3 requests).

The other side, driver logs shows:
 # task 123.0 in stage 112.0 failed due to FetchFailedException (network issue 
cause corrupted file)
 # warning message from OutputCommitCoordinator
 # task 92.0 in stage 112.1 failed when writing rows
 # keep retry the failed tasks, but never succeed

{noformat}
2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
(TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
at 
org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at 

[jira] [Updated] (SPARK-24492) Endless attempted task when TaskCommitDenied exception

2018-06-07 Thread Yu-Jhe Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-24492:
--
Description: 
Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
output file to S3, some tasks endless retry and all of them failed with 
TaskCommitDenied exception. This happened when we run Spark application on some 
network issue instances. (it runs well on healthy spot instances)

Sorry, I can reproduce this issue easily, here's all I can provide.

The Spark UI shows one task of stage 112 failed due to FetchFailedException (it 
is network issue) and attempt to retry a new stage 112 (retry 1). But in stage 
112 (retry 1), all task failed due to TaskCommitDenied exception, and keep 
retry (it never succeed and cause lots of S3 requests).

The other side, driver logs shows:
 1. task 123.0 in stage 112.0 failed due to FetchFailedException (network issue 
cause corrupted file)
 2. warning message from OutputCommitCoordinator
 3. task 92.0 in stage 112.1 failed when writing rows
 4. keep retry the failed tasks, but never succeed

{noformat}
2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
(TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
at 
org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at 

[jira] [Updated] (SPARK-24492) Endless attempted task when TaskCommitDenied exception

2018-06-07 Thread Yu-Jhe Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-24492:
--
Description: 
Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
output file to S3, some tasks endless retry and all of them failed with 
TaskCommitDenied exception. This happened when we run Spark application on some 
network issue instances. (it runs well on healthy spot instances)

Sorry, I can find a easy way to reproduce this issue, here's all I can provide.

The Spark UI shows one task of stage 112 failed due to FetchFailedException (it 
is network issue) and attempt to retry a new stage 112 (retry 1). But in stage 
112 (retry 1), all task failed due to TaskCommitDenied exception, and keep 
retry (it never succeed and cause lots of S3 requests).

The other side, driver logs shows:
 1. task 123.0 in stage 112.0 failed due to FetchFailedException (network issue 
cause corrupted file)
 2. warning message from OutputCommitCoordinator
 3. task 92.0 in stage 112.1 failed when writing rows
 4. keep retry the failed tasks, but never succeed

{noformat}
2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
(TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
at 
org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at 

[jira] [Created] (SPARK-24492) Endless attempted task when TaskCommitDenied exception

2018-06-07 Thread Yu-Jhe Li (JIRA)
Yu-Jhe Li created SPARK-24492:
-

 Summary: Endless attempted task when TaskCommitDenied exception
 Key: SPARK-24492
 URL: https://issues.apache.org/jira/browse/SPARK-24492
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0
 Environment: Spark version: spark-2.2.0
Reporter: Yu-Jhe Li


Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
output file to S3, some tasks endless retry and all of them failed with 
TaskCommitDenied exception.
 This happened when we run Spark application on some network issue instances. 
(it runs well on healthy spot instances)

Sorry, I can reproduce this issue easily, here's all I can provide.

The Spark UI shows one task of stage 112 failed due to FetchFailedException (it 
is network issue) and attempt to retry a new stage 112 (retry 1). But in stage 
112 (retry 1), all task failed due to TaskCommitDenied exception, and keep 
retry (it never succeed and cause lots of S3 requests).

The other side, driver logs shows:
 1. task 123.0 in stage 112.0 failed due to FetchFailedException (network issue 
cause corrupted file)
 2. warning message from OutputCommitCoordinator
 3. task 92.0 in stage 112.1 failed when writing rows
 4. keep retry the failed tasks, but never succeed

{noformat}
2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
(TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
at 
org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
at 

[jira] [Commented] (SPARK-23614) Union produces incorrect results when caching is used

2018-05-18 Thread Yu-Jhe Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480438#comment-16480438
 ] 

Yu-Jhe Li commented on SPARK-23614:
---

Is this bug happening only when 1) cached dataframe 2) aggregation?

> Union produces incorrect results when caching is used
> -
>
> Key: SPARK-23614
> URL: https://issues.apache.org/jira/browse/SPARK-23614
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Morten Hornbech
>Assignee: Liang-Chi Hsieh
>Priority: Major
>  Labels: correctness
> Fix For: 2.3.1, 2.4.0
>
>
> We just upgraded from 2.2 to 2.3 and our test suite caught this error:
> {code:java}
> case class TestData(x: Int, y: Int, z: Int)
> val frame = session.createDataset(Seq(TestData(1, 2, 3), TestData(4, 5, 
> 6))).cache()
> val group1 = frame.groupBy("x").agg(min(col("y")) as "value")
> val group2 = frame.groupBy("x").agg(min(col("z")) as "value")
> group1.union(group2).show()
> // +---+-+
> // | x|value|
> // +---+-+
> // | 1| 2|
> // | 4| 5|
> // | 1| 2|
> // | 4| 5|
> // +---+-+
> group2.union(group1).show()
> // +---+-+
> // | x|value|
> // +---+-+
> // | 1| 3|
> // | 4| 6|
> // | 1| 3|
> // | 4| 6|
> // +---+-+
> {code}
> The error disappears if the first data frame is not cached or if the two 
> group by's use separate copies. I'm not sure exactly what happens on the 
> insides of Spark, but errors that produce incorrect results rather than 
> exceptions always concerns me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org