thaismonti1912 opened a new issue #2991:
URL: https://github.com/apache/iceberg/issues/2991


   When inserting new records to an Iceberg table using multiple Spark 
executors (EMR) we get an `java.io.IOException: No such file or directory`. See 
stack trace below.
   
   
   It seems that this only happens when the Spark application is deployed in 
cluster mode, on a cluster containing multiple core nodes.
   
   These are the scenarios:
   
   - spark application deployed in client  mode on a cluster with one core node 
(works fine)
   - spark application deployed in cluster mode on a cluster with one core node 
(works fine)
   - spark application deployed in client  mode on a cluster with multiple core 
nodes (works fine)
   - spark application deployed in cluster mode on a cluster with multiple core 
nodes (throws `java.io.IOException: No such file or directory`)
   
   I am using S3FileIO. The problem seems to be happening when Iceberg writes 
to the staging directory (java.io.tmpdir default value) and it cannot find the 
directory or file. I tried to set `s3.staging-dir` to a different directory, 
but it didn't work either.
   
   I found a workaround setting `java.io.tmpdir` property to a different 
directory in the driver as below:
   
   ```bash
   spark-submit --verbose 
       --conf spark.driver.extraJavaOptions=-Djava.io.tmpdir=/tmp/driver/
       --deploy-mode cluster
       --driver-memory 4G
       --num-executors 8
       --executor-memory 2G
       --executor-cores 1
       --class com.example.IcebergTest
       --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
       --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog
       --conf 
spark.sql.catalog.my_catalog.warehouse=s3://acme-bucket-name/my-catalog/
       --conf spark.sql.catalog.my_catalog.type=hive
       --conf 
spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
       --conf spark.sql.catalog.my_catalog.uri=thrift://hive.acme.com.cloud:9083
       /usr/generate_dummy_records_2.12-0.1.jar --to-id 10000000 
--records-in-parallel 100 --from-id 8170000
   
   ```
   
   Configuration:
   EMR version: emr-6.3.0
       Master node: 1 instance  r5.xlarge
       Core node:   2 instances m5.xlarge
   Spark version: 3.1.1
   
   Do you have any idea what is happening under the covers or what might be 
causing this?
   Is this a misconfiguration or a bug?
   
   My hypothesis is that when the driver and the worker are deployed on the 
same instance the driver somehow erases the default temp directory.
   
   
   ```
   21/08/16 20:53:19 ERROR ApplicationMaster: User class threw exception: 
org.apache.spark.SparkException: Writing job aborted.
   org.apache.spark.SparkException: Writing job aborted.
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
        at 
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
        at 
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
        at 
org.apache.spark.sql.DataFrameWriterV2.$anonfun$runCommand$1(DataFrameWriterV2.scala:196)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at 
org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:196)
        at 
org.apache.spark.sql.DataFrameWriterV2.append(DataFrameWriterV2.scala:149)
        at com.example.IcebergTest$.main(IcebergTest.scala:97)
        at com.example.IcebergTest.main(IcebergTest.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:735)
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 5 in stage 2.0 failed 4 times, most recent failure: Lost task 5.3 
in stage 2.0 (TID 14) (ip-172-22-2-121.ec2.internal executor 7): 
java.io.UncheckedIOException: Filed to create output stream for location: 
s3://acme-bucket/dummy_tests/test_10000000/data/partition_field=81700/00005-14-1a016409-f7b1-4f04-bb10-27989cdea703-00001.parquet
        at 
org.apache.iceberg.aws.s3.S3OutputFile.createOrOverwrite(S3OutputFile.java:60)
        at 
org.apache.iceberg.parquet.ParquetIO$ParquetOutputFile.createOrOverwrite(ParquetIO.java:153)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:293)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:259)
        at 
org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:101)
        at 
org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:250)
        at 
org.apache.iceberg.spark.source.SparkAppenderFactory.newAppender(SparkAppenderFactory.java:110)
        at 
org.apache.iceberg.spark.source.SparkAppenderFactory.newDataWriter(SparkAppenderFactory.java:139)
        at 
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.newWriter(BaseTaskWriter.java:310)
        at 
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.newWriter(BaseTaskWriter.java:303)
        at 
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.openCurrent(BaseTaskWriter.java:271)
        at 
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.<init>(BaseTaskWriter.java:233)
        at 
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.<init>(BaseTaskWriter.java:223)
        at 
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.<init>(BaseTaskWriter.java:305)
        at 
org.apache.iceberg.io.PartitionedWriter.write(PartitionedWriter.java:73)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.IOException: No such file or directory
        at java.io.UnixFileSystem.createFileExclusively(Native Method)
        at java.io.File.createTempFile(File.java:2026)
        at 
org.apache.iceberg.aws.s3.S3OutputStream.newStream(S3OutputStream.java:178)
        at 
org.apache.iceberg.aws.s3.S3OutputStream.<init>(S3OutputStream.java:114)
        at 
org.apache.iceberg.aws.s3.S3OutputFile.createOrOverwrite(S3OutputFile.java:58)
        ... 26 more
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
        ... 34 more
   Caused by: java.io.UncheckedIOException: Filed to create output stream for 
location: 
s3://acme-bucket/dummy_tests/test_10000000/data/partition_field=81700/00005-14-1a016409-f7b1-4f04-bb10-27989cdea703-00001.parquet
        at 
org.apache.iceberg.aws.s3.S3OutputFile.createOrOverwrite(S3OutputFile.java:60)
        at 
org.apache.iceberg.parquet.ParquetIO$ParquetOutputFile.createOrOverwrite(ParquetIO.java:153)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:293)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:259)
        at 
org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:101)
        at 
org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:250)
        at 
org.apache.iceberg.spark.source.SparkAppenderFactory.newAppender(SparkAppenderFactory.java:110)
        at 
org.apache.iceberg.spark.source.SparkAppenderFactory.newDataWriter(SparkAppenderFactory.java:139)
        at 
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.newWriter(BaseTaskWriter.java:310)
        at 
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.newWriter(BaseTaskWriter.java:303)
        at 
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.openCurrent(BaseTaskWriter.java:271)
        at 
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.<init>(BaseTaskWriter.java:233)
        at 
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.<init>(BaseTaskWriter.java:223)
        at 
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.<init>(BaseTaskWriter.java:305)
        at 
org.apache.iceberg.io.PartitionedWriter.write(PartitionedWriter.java:73)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.IOException: No such file or directory
        at java.io.UnixFileSystem.createFileExclusively(Native Method)
        at java.io.File.createTempFile(File.java:2026)
        at 
org.apache.iceberg.aws.s3.S3OutputStream.newStream(S3OutputStream.java:178)
        at 
org.apache.iceberg.aws.s3.S3OutputStream.<init>(S3OutputStream.java:114)
        at 
org.apache.iceberg.aws.s3.S3OutputFile.createOrOverwrite(S3OutputFile.java:58)
        ... 26 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to