mskyrim opened a new issue, #35417:
URL: https://github.com/apache/beam/issues/35417

   ### What happened?
   
   ### What happened?
   
   We are using an Apache Beam pipeline (v2.62.0) to ingest data read in 
Protobuf format, transform it into a Beam schema (dynamically), and then write 
it to an Iceberg table using IcebergIO.writeRows as the final step of the 
pipeline.
   We have noticed an issue when writing to Iceberg tables that are partitioned 
by the meta_processing_time field using the following specification:
    
`(PartitionSpec.builderFor(contractIcebergSchema).month("meta_processing_time").build())`
 
   Although the Parquet data files are correctly written under the expected 
monthly partition folders (e.g., path/2025-06/filename.parquet), when querying 
the table using Trino: 
   `select "$partition" , "$path",* from iceberg_table"`
   
   The $partition metadata field incorrectly shows a value of "1970-07", while:
   
   the $path value is correctly referencing the meta_processing_time month 
(e.g., 'path/2025-06/filename.parquet')
   the actual meta_processing_time column in the data contains the correct 
timestamp value (e.g., '2025-06-24 11:06:06.187 +00:00')
   After running the following query in Trino:
   
   `ALTER TABLE table_name EXECUTE optimize`
   
   the $partition value is corrected and shows the expected value '2025-06'.
   We have upgraded to the latest Apache Beam version as well as the Iceberg 
core library, but the issue persists.
   
   here is a unit test method that simulates the issue : 
   `@When(
         "I insert data into the iceberg schema {string} partitioned by 
{string} with the following values:")
     public void 
i_insert_data_into_the_iceberg_schema_with_the_following_values(
         String tableName, String partitionColumn, DataTable dataTable) {
       // Step 1: Get Iceberg table and config
       Namespace namespace = Namespace.of("platform_data");
       TableIdentifier tableIdentifier = TableIdentifier.of(namespace, 
tableName);
       Table table = catalog.catalog().loadTable(tableIdentifier);
   
       if (table == null) {
         throw new IllegalStateException(
             "Iceberg table '" + tableName + "' does not exist in the 
catalog.");
       }
   
       // Build Beam schema
       Schema beamSchema =
           Schema.builder()
               .addInt32Field("id")
               .addStringField("name")
               .addDateTimeField(partitionColumn)
               .build();
   
       // Convert DataTable to Beam Row
       List<Row> rows =
           dataTable.asMaps(String.class, String.class).stream()
               .map(
                   map -> {
                     return Row.withSchema(beamSchema)
                         .addValues(
                             Integer.parseInt(map.get("id")),
                             map.get("name"),
                             org.joda.time.Instant.now())
                         .build();
                   })
               .collect(Collectors.toList());
   
       // Step 2: Create Beam pipeline and write to Iceberg
       PipelineOptions options = PipelineOptionsFactory.create();
       options.setRunner(DirectRunner.class);
       Pipeline pipeline = Pipeline.create(options);
   
       rows.forEach(
           row -> {
             log.info("Row created_at = {}", row.getDateTime(partitionColumn));
           });
   
       PCollection<Row> input = pipeline.apply("CreateRows", 
Create.of(rows)).setRowSchema(beamSchema);
   
       IcebergCatalogConfig icebergCatalogConfig = 
IcebergHiveCatalogPerEnv.getBeamCatalogConfig();
   
       input.apply(new IcebergWriteTransform(icebergCatalogConfig, 
tableIdentifier));
   
       pipeline.run().waitUntilFinish();
     }`
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [x] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [x] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to