barunkumaracharya commented on issue #37524:
URL: https://github.com/apache/beam/issues/37524#issuecomment-3885280175
I created a table using spark sql - this was my table creation command -
**CREATE TABLE catalog.default.table1 (time TIMESTAMP_NTZ, name STRING)**
Then, i inserted 1 row using spark sql.
**INSERT INTO catalog.default.table1 SELECT CAST(CAST(1770816992L AS
TIMESTAMP) AS TIMESTAMP_NTZ), 'abdcd'**
Then, i tried reading the data via apache beam using this example -
`package com.sample.iceberg.job;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Instant;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class IcebergToFile {
@Data
@DefaultSchema(JavaFieldSchema.class)
private static class myPojo {
@Nullable Instant time;
@Nullable String name;
}
public IcebergToFile() {
}
public static void main(String[] args) {
Map<String, Object> tableConfig = new HashMap<>();
Map<String, Object> catalogProps = new HashMap<>();
catalogProps.put("type", "hadoop");
catalogProps.put("warehouse", "gs://gs-bucket/warehouse/");
Map<String, Object> configProps = new HashMap<>();
configProps.put("fs.gs.auth.service.account.enable", "true");
configProps.put("fs.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
configProps.put("fs.gs.auth.type", "SERVICE_ACCOUNT_JSON_KEYFILE");
configProps.put("fs.AbstractFileSystem.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
configProps.put("fs.gs.auth.service.account.json.keyfile",
"/fileToJsonKeyFile.json");
tableConfig.put("catalog_name", "catalog");
tableConfig.put("table", "default.table1");
tableConfig.put("catalog_properties", catalogProps);
tableConfig.put("config_properties", configProps);
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
options.setJobName("sample iceberg job");
Pipeline pipeline = Pipeline.create(options);
PCollection<Row> tableData =
pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(tableConfig)).getSinglePCollection();
PCollection<myPojo> myPojoCollection = tableData.apply("convert",
Convert.fromRows(myPojo.class));
}
}`
The error that i get while running this program is this -
`[main] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
[main] INFO org.apache.iceberg.CatalogUtil - Loading custom FileIO
implementation: org.apache.iceberg.hadoop.HadoopFileIO
Feb 11, 2026 9:04:20 PM com.google.cloud.hadoop.fs.gcs.GhfsStorageStatistics
updateMinMaxStats
INFO: Detected potential high latency for operation op_open. latencyMs=622;
previousMaxLatencyMs=0; operationCount=1;
context=gs://gs-bucket/warehouse/default/table1/metadata/version-hint.text
[main] INFO org.apache.iceberg.BaseMetastoreCatalog - Table loaded by
catalog: catalog.default.table1
[main] INFO org.apache.iceberg.CatalogUtil - Loading custom FileIO
implementation: org.apache.iceberg.hadoop.HadoopFileIO
[main] INFO org.apache.iceberg.BaseMetastoreCatalog - Table loaded by
catalog: catalog.default.table1
Exception in thread "main" java.lang.RuntimeException: Cannot convert
between types that don't have equivalent schemas. input schema: Fields:
Field{name=time, description=,
type=LOGICAL_TYPE<beam:logical_type:datetime:v1>, options={{}}}
Field{name=name, description=, type=STRING, options={{}}}
Encoding positions:
{name=1, time=0}
Options:{{}}UUID: 07b4b693-67c4-4bf4-aa9b-02bab33de9f2 output schema: Fields:
Field{name=time, description=, type=DATETIME, options={{}}}
Field{name=name, description=, type=STRING, options={{}}}
Encoding positions:
{name=1, time=0}
Options:{{}}UUID: 2f7c9d70-ac9b-4516-bab8-c9546b259883
at
org.apache.beam.sdk.schemas.utils.ConvertHelpers.getConvertedSchemaInformation(ConvertHelpers.java:137)
at
org.apache.beam.sdk.schemas.transforms.Convert$ConvertTransform.expand(Convert.java:118)
at
org.apache.beam.sdk.schemas.transforms.Convert$ConvertTransform.expand(Convert.java:97)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:559)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:507)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:365)
at com.sample.iceberg.job.IcebergToFile.main(IcebergToFile.java:60)
`
@ahmedabu98 For your reference
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]