thaismonti1912 opened a new issue #2990:
URL: https://github.com/apache/iceberg/issues/2990
When inserting new records to an Iceberg table using multiple Spark
executors (EMR) we get an `java.io.IOException: No such file or directory`. See
stack trace below.
It seems that this only happens when the Spark application is deployed in
cluster mode, on a cluster containing multiple core nodes.
These are the scenarios:
- spark application deployed in client mode on a cluster with one core node
(works fine)
- spark application deployed in cluster mode on a cluster with one core node
(works fine)
- spark application deployed in client mode on a cluster with multiple core
nodes (works fine)
- spark application deployed in cluster mode on a cluster with multiple core
nodes (throws `java.io.IOException: No such file or directory`)
I am using S3FileIO. The problem seems to be happening when Iceberg writes
to the staging directory (java.io.tmpdir default value) and it cannot find the
directory or file. I tried to set `s3.staging-dir` to a different directory,
but it didn't work either.
I found a workaround setting `java.io.tmpdir` property to a different
directory in the driver as below:
```bash
spark-submit --verbose
--conf spark.driver.extraJavaOptions=-Djava.io.tmpdir=/tmp/driver/
--deploy-mode cluster
--driver-memory 4G
--num-executors 8
--executor-memory 2G
--executor-cores 1
--class com.example.IcebergTest
--conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog
--conf
spark.sql.catalog.my_catalog.warehouse=s3://acme-bucket-name/my-catalog/
--conf spark.sql.catalog.my_catalog.type=hive
--conf
spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
--conf spark.sql.catalog.my_catalog.uri=thrift://hive.acme.com.cloud:9083
/usr/generate_dummy_records_2.12-0.1.jar --to-id 10000000
--records-in-parallel 100 --from-id 8170000
```
Configuration:
EMR version: emr-6.3.0
Master node: 1 instance r5.xlarge
Core node: 2 instances m5.xlarge
Spark version: 3.1.1
Do you have any idea what is happening under the covers or what might be
causing this?
Is this a misconfiguration or a bug?
My hypothesis is that when the driver and the worker are deployed on the
same instance the driver somehow erases the default temp directory.
```
21/08/16 20:53:19 ERROR ApplicationMaster: User class threw exception:
org.apache.spark.SparkException: Writing job aborted.
org.apache.spark.SparkException: Writing job aborted.
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
at
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
at
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at
org.apache.spark.sql.DataFrameWriterV2.$anonfun$runCommand$1(DataFrameWriterV2.scala:196)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:196)
at
org.apache.spark.sql.DataFrameWriterV2.append(DataFrameWriterV2.scala:149)
at com.example.IcebergTest$.main(IcebergTest.scala:97)
at com.example.IcebergTest.main(IcebergTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:735)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 5 in stage 2.0 failed 4 times, most recent failure: Lost task 5.3
in stage 2.0 (TID 14) (ip-172-22-2-121.ec2.internal executor 7):
java.io.UncheckedIOException: Filed to create output stream for location:
s3://acme-bucket/dummy_tests/test_10000000/data/partition_field=81700/00005-14-1a016409-f7b1-4f04-bb10-27989cdea703-00001.parquet
at
org.apache.iceberg.aws.s3.S3OutputFile.createOrOverwrite(S3OutputFile.java:60)
at
org.apache.iceberg.parquet.ParquetIO$ParquetOutputFile.createOrOverwrite(ParquetIO.java:153)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:293)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:259)
at
org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:101)
at
org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:250)
at
org.apache.iceberg.spark.source.SparkAppenderFactory.newAppender(SparkAppenderFactory.java:110)
at
org.apache.iceberg.spark.source.SparkAppenderFactory.newDataWriter(SparkAppenderFactory.java:139)
at
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.newWriter(BaseTaskWriter.java:310)
at
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.newWriter(BaseTaskWriter.java:303)
at
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.openCurrent(BaseTaskWriter.java:271)
at
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.<init>(BaseTaskWriter.java:233)
at
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.<init>(BaseTaskWriter.java:223)
at
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.<init>(BaseTaskWriter.java:305)
at
org.apache.iceberg.io.PartitionedWriter.write(PartitionedWriter.java:73)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No such file or directory
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createTempFile(File.java:2026)
at
org.apache.iceberg.aws.s3.S3OutputStream.newStream(S3OutputStream.java:178)
at
org.apache.iceberg.aws.s3.S3OutputStream.<init>(S3OutputStream.java:114)
at
org.apache.iceberg.aws.s3.S3OutputFile.createOrOverwrite(S3OutputFile.java:58)
... 26 more
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
... 34 more
Caused by: java.io.UncheckedIOException: Filed to create output stream for
location:
s3://acme-bucket/dummy_tests/test_10000000/data/partition_field=81700/00005-14-1a016409-f7b1-4f04-bb10-27989cdea703-00001.parquet
at
org.apache.iceberg.aws.s3.S3OutputFile.createOrOverwrite(S3OutputFile.java:60)
at
org.apache.iceberg.parquet.ParquetIO$ParquetOutputFile.createOrOverwrite(ParquetIO.java:153)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:293)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:259)
at
org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:101)
at
org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:250)
at
org.apache.iceberg.spark.source.SparkAppenderFactory.newAppender(SparkAppenderFactory.java:110)
at
org.apache.iceberg.spark.source.SparkAppenderFactory.newDataWriter(SparkAppenderFactory.java:139)
at
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.newWriter(BaseTaskWriter.java:310)
at
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.newWriter(BaseTaskWriter.java:303)
at
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.openCurrent(BaseTaskWriter.java:271)
at
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.<init>(BaseTaskWriter.java:233)
at
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.<init>(BaseTaskWriter.java:223)
at
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.<init>(BaseTaskWriter.java:305)
at
org.apache.iceberg.io.PartitionedWriter.write(PartitionedWriter.java:73)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No such file or directory
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createTempFile(File.java:2026)
at
org.apache.iceberg.aws.s3.S3OutputStream.newStream(S3OutputStream.java:178)
at
org.apache.iceberg.aws.s3.S3OutputStream.<init>(S3OutputStream.java:114)
at
org.apache.iceberg.aws.s3.S3OutputFile.createOrOverwrite(S3OutputFile.java:58)
... 26 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]