paulpaul1076 commented on issue #9679:
URL: https://github.com/apache/iceberg/issues/9679#issuecomment-1936239049

   @nastra where should I upload the data for you? I will upload it, then you 
can register the table in your catalog. I used hive catalog, but I don't think 
it matters.
   
   Anyways, this data is also easily reproducible, I just made a spark 
streaming job that wrote random data:
   
   ```
   package org.example;
   
   import org.apache.spark.sql.Column;
   import org.apache.spark.sql.Row;
   import org.apache.spark.sql.SparkSession;
   import org.apache.spark.sql.streaming.DataStreamWriter;
   import org.apache.spark.sql.streaming.StreamingQueryException;
   import org.apache.spark.sql.streaming.Trigger;
   
   import static org.apache.spark.sql.functions.*;
   
   import java.util.concurrent.TimeUnit;
   import java.util.concurrent.TimeoutException;
   
   public class StreamingSparkPartitioned {
       public static void main(String[] args) throws TimeoutException, 
StreamingQueryException {
           SparkSession spark = SparkSession.builder()
                   .getOrCreate();
   
           spark.sql("ALTER TABLE " + args[0] + " SET 
TBLPROPERTIES('compatibility.snapshot-id-inheritance.enabled'='true')");
           System.out.println("TBL PROPS SHOW:");
           spark.sql("SHOW TBLPROPERTIES " + args[0] + 
"('compatibility.snapshot-id-inheritance.enabled')").show();
   
           Column expression =  when(expr("value % 3 = 1"), 
"stupid_event").otherwise(
                                   when(expr("value % 3 = 2"), "smart_event")
                                   .otherwise("neutral_event"));
   
           DataStreamWriter<Row> streamingDF = spark.readStream()
                   .format("rate-micro-batch")
                   .option("rowsPerBatch", "300000")
                   .load()
                   .withColumn("event_type", expression)
                   .withColumn("date", current_timestamp().cast("date"))
                   .withColumn("some_number", rand().multiply(4))
                   .withColumn("date2", expr("date_sub(date, cast(some_number 
as int))"))
                   .withColumn("random_str", split(lit("ABCDEFGHIJK"), ""))
                   .withColumn("random_str2", repeat(concat_ws("", 
shuffle(col("random_str"))), 500))
                   .writeStream()
                   .option("checkpointLocation", 
"s3a://obs-zdp-warehouse-stage-mz/test/checkpoint_2");
           
           streamingDF
                   .format("iceberg")
                   .outputMode("append")
                   .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
                   .option("fanout-enabled", "true")
                   .partitionBy("date", "date2", "event_type")
                   .toTable(args[0])
                   .awaitTermination();
       }
   }
   ```
   
   It streaming for a while, it generated some data for a few days, like 2k 
files or so, then I started using this:
   
   ```
           spark.sql("CALL iceberg_catalog.system.rewrite_data_files(table => 
'stage.test_partitioned')");
   ```
   
   or this:
   
   ```
   spark.sql("CALL iceberg_catalog.system.rewrite_data_files(table => 
'stage.test_partitioned', options => map('partial-progress.enabled', 'true'))");
   ```
   
   for compaction, and it fails with that content length exception.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to