barunkumaracharya commented on issue #37524:
URL: https://github.com/apache/beam/issues/37524#issuecomment-3885280175

   I created a table using spark sql - this was my table creation command - 
   **CREATE TABLE catalog.default.table1 (time TIMESTAMP_NTZ, name STRING)**
   
   Then, i inserted 1 row using spark sql. 
   **INSERT INTO catalog.default.table1  SELECT CAST(CAST(1770816992L AS 
TIMESTAMP) AS TIMESTAMP_NTZ), 'abdcd'**
   
   Then, i tried reading the data via apache beam using this example -
   
   `package com.sample.iceberg.job;
   
   import lombok.Data;
   import lombok.extern.slf4j.Slf4j;
   import org.apache.beam.runners.direct.DirectRunner;
   import org.apache.beam.sdk.Pipeline;
   import org.apache.beam.sdk.managed.Managed;
   import org.apache.beam.sdk.options.PipelineOptions;
   import org.apache.beam.sdk.options.PipelineOptionsFactory;
   import org.apache.beam.sdk.schemas.JavaFieldSchema;
   import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
   import org.apache.beam.sdk.schemas.transforms.Convert;
   import org.apache.beam.sdk.values.PCollection;
   import org.apache.beam.sdk.values.Row;
   import org.joda.time.Instant;
   import javax.annotation.Nullable;
   import java.util.HashMap;
   import java.util.Map;
   
   
   
   @Slf4j
   public class IcebergToFile {
   
       @Data
       @DefaultSchema(JavaFieldSchema.class)
       private static class myPojo {
           @Nullable Instant time;
           @Nullable String name;
       }
   
       public IcebergToFile() {
   
       }
   
   
       public static void main(String[] args) {
           Map<String, Object> tableConfig = new HashMap<>();
           Map<String, Object> catalogProps = new HashMap<>();
           catalogProps.put("type", "hadoop");
           catalogProps.put("warehouse", "gs://gs-bucket/warehouse/");
   
           Map<String, Object> configProps = new HashMap<>();
           configProps.put("fs.gs.auth.service.account.enable", "true");
           configProps.put("fs.gs.impl", 
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
           configProps.put("fs.gs.auth.type", "SERVICE_ACCOUNT_JSON_KEYFILE");
           configProps.put("fs.AbstractFileSystem.gs.impl", 
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
           configProps.put("fs.gs.auth.service.account.json.keyfile", 
"/fileToJsonKeyFile.json");
   
           tableConfig.put("catalog_name", "catalog");
           tableConfig.put("table", "default.table1");
           tableConfig.put("catalog_properties", catalogProps);
           tableConfig.put("config_properties", configProps);
   
           PipelineOptions options = PipelineOptionsFactory.create();
           options.setRunner(DirectRunner.class);
           options.setJobName("sample iceberg job");
           Pipeline pipeline = Pipeline.create(options);
           PCollection<Row> tableData = 
pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(tableConfig)).getSinglePCollection();
           PCollection<myPojo> myPojoCollection = tableData.apply("convert", 
Convert.fromRows(myPojo.class));
      }
   }`
   
   
   
   
   The error that i get while running this program is this - 
   
   `[main] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
   [main] INFO org.apache.iceberg.CatalogUtil - Loading custom FileIO 
implementation: org.apache.iceberg.hadoop.HadoopFileIO
   Feb 11, 2026 9:04:20 PM com.google.cloud.hadoop.fs.gcs.GhfsStorageStatistics 
updateMinMaxStats
   INFO: Detected potential high latency for operation op_open. latencyMs=622; 
previousMaxLatencyMs=0; operationCount=1; 
context=gs://gs-bucket/warehouse/default/table1/metadata/version-hint.text
   [main] INFO org.apache.iceberg.BaseMetastoreCatalog - Table loaded by 
catalog: catalog.default.table1
   [main] INFO org.apache.iceberg.CatalogUtil - Loading custom FileIO 
implementation: org.apache.iceberg.hadoop.HadoopFileIO
   [main] INFO org.apache.iceberg.BaseMetastoreCatalog - Table loaded by 
catalog: catalog.default.table1
   Exception in thread "main" java.lang.RuntimeException: Cannot convert 
between types that don't have equivalent schemas. input schema: Fields:
   Field{name=time, description=, 
type=LOGICAL_TYPE<beam:logical_type:datetime:v1>, options={{}}}
   Field{name=name, description=, type=STRING, options={{}}}
   Encoding positions:
   {name=1, time=0}
   Options:{{}}UUID: 07b4b693-67c4-4bf4-aa9b-02bab33de9f2 output schema: Fields:
   Field{name=time, description=, type=DATETIME, options={{}}}
   Field{name=name, description=, type=STRING, options={{}}}
   Encoding positions:
   {name=1, time=0}
   Options:{{}}UUID: 2f7c9d70-ac9b-4516-bab8-c9546b259883
        at 
org.apache.beam.sdk.schemas.utils.ConvertHelpers.getConvertedSchemaInformation(ConvertHelpers.java:137)
        at 
org.apache.beam.sdk.schemas.transforms.Convert$ConvertTransform.expand(Convert.java:118)
        at 
org.apache.beam.sdk.schemas.transforms.Convert$ConvertTransform.expand(Convert.java:97)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:559)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:507)
        at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:365)
        at com.sample.iceberg.job.IcebergToFile.main(IcebergToFile.java:60)
   
   `
   
   @ahmedabu98  For your reference


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to