hugokitano opened a new issue, #1202:
URL: https://github.com/apache/iceberg-rust/issues/1202
### Apache Iceberg Rust version
0.4.0 (latest version)
### Describe the bug
The apache rust map writer requires unique metadata fields for every Field.
The complex type `map` contains multiple Fields that must have metadata
assigned, yet upon writing a record batch, the metadata for the outer `Map`
field is lost
### To Reproduce
```
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
run_iceberg_write().await
}
async fn run_iceberg_write() -> Result<(), Box<dyn std::error::Error>> {
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
// Create an in-memory file IO
let file_io = iceberg::io::FileIOBuilder::new("memory").build()?;
let namespace_name = "test_namespace";
let table_name = "test_table";
// Create the key and value fields WITH metadata
let mut key_metadata = HashMap::new();
key_metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string());
let key_field = Field::new("key", arrow_schema::DataType::Int64, false)
.with_metadata(key_metadata);
let mut value_metadata = HashMap::new();
value_metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string());
let value_field = Field::new("value", arrow_schema::DataType::Float64,
true)
.with_metadata(value_metadata);
// Create the key_value field with metadata
let mut key_value_metadata = HashMap::new();
key_value_metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string());
let key_value_field = Field::new(
"struct",
arrow_schema::DataType::Struct(vec![
Arc::new(key_field.clone()),
Arc::new(value_field.clone()),
].into()),
false
).with_metadata(key_value_metadata.clone());
// Create the map field with metadata
let mut map_metadata = HashMap::new();
map_metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(),
"4".to_string());
let map_field = Field::new(
"map",
arrow_schema::DataType::Map(Arc::new(key_value_field.clone()),
false),
true
).with_metadata(map_metadata);
// Create schema with the map field
let schema = ArrowSchema::new(vec![map_field]);
// Create test data for the map
let map_keys = Int64Array::from(vec![101, 102, 201, 202]);
let map_values = Float64Array::from(vec![Some(10.1), Some(10.2),
Some(20.1), Some(20.2)]);
// Create the struct array
let map_struct_array = StructArray::new(
arrow_schema::Fields::from(vec![
Arc::new(key_field.clone()),
Arc::new(value_field.clone()),
]),
vec![Arc::new(map_keys), Arc::new(map_values)],
None
);
// Define offsets for the map
let map_offsets =
OffsetBuffer::<i32>::new(arrow_buffer::ScalarBuffer::from(vec![0, 2, 4]));
// Create the map array
let map_array = arrow_array::MapArray::new(
Arc::new(key_value_field.clone()),
map_offsets,
map_struct_array,
None,
false
);
// Create record batch
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(map_array) as ArrayRef],
)?;
println!("batch with schema {}", batch.schema());
// Create an in-memory catalog
let catalog =
iceberg_catalog_memory::MemoryCatalog::new(file_io.clone(),
Some("memory://warehouse".to_string()));
// Create the namespace and table
let namespace = NamespaceIdent::new(namespace_name.to_string());
catalog.create_namespace(&namespace, HashMap::new()).await?;
let table_ident = TableIdent::new(namespace.clone(),
table_name.to_string());
// Convert schema to Iceberg schema
let iceberg_schema = iceberg::spec::Schema::try_from(&schema)?;
let table = catalog.create_table(&namespace, TableCreation::builder()
.name(table_name.to_string())
.schema(iceberg_schema.clone())
.properties(HashMap::new())
.build()).await?;
// Create location generator
let location_generator =
DefaultLocationGenerator::new(table.metadata().clone())?;
// Create file name generator
let writer_id = format!("file-{}", Uuid::new_v4());
let file_name_generator = DefaultFileNameGenerator::new(
writer_id,
None,
iceberg::spec::DataFileFormat::Parquet
);
// Create Parquet writer builder
let parquet_props =
parquet::file::properties::WriterProperties::builder().build();
let parquet_writer_builder = ParquetWriterBuilder::new(
parquet_props,
Arc::new(iceberg_schema),
file_io.clone(),
location_generator,
file_name_generator,
);
// Create data file writer
let data_file_writer_builder =
DataFileWriterBuilder::new(parquet_writer_builder, None);
let mut writer = data_file_writer_builder.build().await?;
// Write the batch
writer.write(batch).await?;
// Close the writer and get data files
let data_files = writer.close().await?;
// Create a transaction to append data files
let tx = Transaction::new(&table);
let mut fast_append = tx.fast_append(None, vec![])?;
fast_append.add_data_files(data_files)?;
fast_append.apply().await?;
println!("Successfully wrote table!");
Ok(())
}
```
produces the output
```
batch with schema Field { name: "map", data_type: Map(Field { name:
"struct", data_type: Struct([Field { name: "key", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"}
}, Field { name: "value", data_type: Float64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }]), nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"}
}, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{"PARQUET:field_id": "4"} }
Error: Unexpected => Failed to write using parquet writer.
Source: Arrow: Incompatible type. Field 'map' has type Map(Field { name:
"key_value", data_type: Struct([Field { name: "key", data_type: Int64,
nullable: false, dict_id: 0, dict_is_ordered: false, metadata:
{"PARQUET:field_id": "1"} }, Field { name: "value", data_type: Float64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{"PARQUET:field_id": "2"} }]), nullable: false, dict_id: 0, dict_is_ordered:
false, metadata: {} }, false), array has type Map(Field { name: "struct",
data_type: Struct([Field { name: "key", data_type: Int64, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
Field { name: "value", data_type: Float64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }]), nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"}
}, false)
```
Notice metadata value "4" is missing from the Map datatype in the error
message, but is definitely present in the record batch.
### Expected behavior
_No response_
### Willingness to contribute
None
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]