andrei-ionescu opened a new issue #3564:
URL: https://github.com/apache/iceberg/issues/3564


   Timezone implementation in Spark 3 leads to `Already closed files for 
partition` error.
   
   To reproduce follow the next steps...
   
   First, download the source file: 
[data-dimension-vehicle-20210609T222533Z-4cols-100K.csv](https://github.com/apache/iceberg/files/7546106/data-dimension-vehicle-20210609T222533Z-4cols-100K.csv)
 and CD where the file was downloaded.
   
   Start Spark 3.1.2 shell setting the timezone to `Europe/Bucharest` with:
   ```
   spark-shell --master "local[8]" --driver-memory 4g \
   --packages 
"org.apache.iceberg:iceberg-spark3-runtime:0.10.0,org.apache.iceberg:iceberg-core:0.10.0,org.apache.iceberg:iceberg-common:0.10.0,org.apache.iceberg:iceberg-api:0.10.0,org.apache.iceberg:iceberg-parquet:0.10.0,org.apache.iceberg:iceberg-spark3:0.10.0,org.apache.iceberg:iceberg-bundled-guava:0.10.0"
 \
   --conf "spark.driver.extraJavaOptions=-Dpackaging.type=jar 
-Duser.timezone=Europe/Bucharest" \
   --conf "spark.executor.extraJavaOptions=-Dpackaging.type=jar 
-Duser.timezone=Europe/Bucharest"
   ```
   
   Copy paste the following lines of code:
   ```
   import org.apache.iceberg.{PartitionSpec => IcebergPartitionSpec}
   import org.apache.iceberg.hadoop.HadoopTables
   import org.apache.iceberg.spark.SparkSchemaUtil
   import org.apache.spark.sql._
   import org.apache.spark.sql.types._
   import org.apache.spark.sql.functions._
   import scala.collection.JavaConverters._
   import spark.implicits._
   
   val dst = "/tmp/tmp_iceberg_issue"
   val src = "data-dimension-vehicle-20210609T222533Z-4cols-100K.csv"
   
   val schema = StructType(Seq(
     StructField("licence_code", StringType, true), 
     StructField("vehicle_make", StringType, true), 
     StructField("fuel_type", StringType, true), 
     StructField("dimension_load_date", TimestampType, true)))
   
   val ds = spark
     .read
     .format("csv")
     .schema(schema)
     .load(src)
   
   val iceSchema = SparkSchemaUtil.convert(ds.schema)
   val iceSpec = IcebergPartitionSpec.builderFor(iceSchema)
     .day("dimension_load_date", "load_date")
     .identity("fuel_type")
     .build()
   
   val newTbls = new 
HadoopTables(spark.sessionState.newHadoopConf).create(iceSchema, iceSpec, 
Map.empty[String, String].asJava, dst)
   
   ds
     .sort(to_date(col("dimension_load_date")), col("fuel_type"))
     .write
     .format("iceberg")
     .mode("overwrite")
     .save(dst)
   ```
   
   The result is failure due to the `Already closed files for partition` 
exception:
   ```
   Driver stacktrace:
     at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
     at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
     at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
     at scala.Option.foreach(Option.scala:407)
     at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
     at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
     ... 97 more
   Caused by: java.lang.IllegalStateException: Already closed files for 
partition: load_date=2021-06-09/fuel_type=Diesel
     at org.apache.iceberg.io.PartitionedWriter.write(PartitionedWriter.java:69)
     at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
     at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
     at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
     at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
     at org.apache.spark.scheduler.Task.run(Task.scala:131)
     at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
     at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
     at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
     at java.base/java.lang.Thread.run(Thread.java:834)
   ```
   
   The root cause is the difference in how Iceberg's date/time partitioning 
functions are implemented versus the date/time functions in Spark. The sorting 
step needed as a workaround for the `Already close files for partition` (as 
seen in Iceberg's documentation 
[here](https://iceberg.apache.org/#spark-writes/#writing-to-partitioned-tables))
 when used on top of date/time functions, takes into account the timezone of 
the nodes it runs onto, while Iceberg date/time partitioning functions uses UTC.
   
   There is a workaround for this issue. That is to use the `UTC` timezone at 
JVM level so that Spark's timezone used in the date/time  related functions 
will match the ones in Iceberg.
   
   So, starting Spark with the following will make it work:
   
   ```
   spark-shell --master "local[8]" --driver-memory 4g \
   --packages 
"org.apache.iceberg:iceberg-spark3-runtime:0.10.0,org.apache.iceberg:iceberg-core:0.10.0,org.apache.iceberg:iceberg-common:0.10.0,org.apache.iceberg:iceberg-api:0.10.0,org.apache.iceberg:iceberg-parquet:0.10.0,org.apache.iceberg:iceberg-spark3:0.10.0,org.apache.iceberg:iceberg-bundled-guava:0.10.0"
 \
   --conf "spark.driver.extraJavaOptions=-Dpackaging.type=jar 
-Duser.timezone=UTC" \
   --conf "spark.executor.extraJavaOptions=-Dpackaging.type=jar 
-Duser.timezone=UTC"
   ```
   
   Using the `spark.conf.set("spark.sql.session.timeZone", true)` is not always 
reliable.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to