Hi,
We would like to be able to use the iceberg spark Datasource
(IcebergSource) to write kafka sourced streaming dataframes.
In tests, we are able to succesfully create a partitioned table and write
when using the MemoryStream, but when using a kafka source:
*spark.readStream.format("kafka")*
and writing to iceberg:
*dataFrame.writeStream .format("catwalk-iceberg")
.outputMode(OutputMode.Append) .trigger(Trigger.Once)
.option("path", uri.toString) .option("checkpointLocation",
Paths.get(uri.toString, "checkpoint").toString) .start
.awaitTermination*
we get this exception:
Caused by: java.lang.IllegalStateException: Already closed file for
partition: happened_at_day=2000-01-01
at
org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:389)
at
org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:350)
Before I dig deeper, is this something that has worked for anyone?
Thanks!