paulpaul1076 commented on issue #9679: URL: https://github.com/apache/iceberg/issues/9679#issuecomment-1936239049
@nastra where should I upload the data for you? I will upload it, then you can register the table in your catalog. I used hive catalog, but I don't think it matters. Anyways, this data is also easily reproducible, I just made a spark streaming job that wrote random data: ``` package org.example; import org.apache.spark.sql.Column; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.DataStreamWriter; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.streaming.Trigger; import static org.apache.spark.sql.functions.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class StreamingSparkPartitioned { public static void main(String[] args) throws TimeoutException, StreamingQueryException { SparkSession spark = SparkSession.builder() .getOrCreate(); spark.sql("ALTER TABLE " + args[0] + " SET TBLPROPERTIES('compatibility.snapshot-id-inheritance.enabled'='true')"); System.out.println("TBL PROPS SHOW:"); spark.sql("SHOW TBLPROPERTIES " + args[0] + "('compatibility.snapshot-id-inheritance.enabled')").show(); Column expression = when(expr("value % 3 = 1"), "stupid_event").otherwise( when(expr("value % 3 = 2"), "smart_event") .otherwise("neutral_event")); DataStreamWriter<Row> streamingDF = spark.readStream() .format("rate-micro-batch") .option("rowsPerBatch", "300000") .load() .withColumn("event_type", expression) .withColumn("date", current_timestamp().cast("date")) .withColumn("some_number", rand().multiply(4)) .withColumn("date2", expr("date_sub(date, cast(some_number as int))")) .withColumn("random_str", split(lit("ABCDEFGHIJK"), "")) .withColumn("random_str2", repeat(concat_ws("", shuffle(col("random_str"))), 500)) .writeStream() .option("checkpointLocation", "s3a://obs-zdp-warehouse-stage-mz/test/checkpoint_2"); streamingDF .format("iceberg") .outputMode("append") .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) .option("fanout-enabled", "true") .partitionBy("date", "date2", "event_type") .toTable(args[0]) .awaitTermination(); } } ``` It streaming for a while, it generated some data for a few days, like 2k files or so, then I started using this: ``` spark.sql("CALL iceberg_catalog.system.rewrite_data_files(table => 'stage.test_partitioned')"); ``` or this: ``` spark.sql("CALL iceberg_catalog.system.rewrite_data_files(table => 'stage.test_partitioned', options => map('partial-progress.enabled', 'true'))"); ``` for compaction, and it fails with that content length exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org