kumarpritam863 commented on PR #15396:
URL: https://github.com/apache/iceberg/pull/15396#issuecomment-4183124967
@annurahar
the current code actually traverses to the nested schema if you observe
carefully. For the case you have described
```
orders (
order_id: int
customer: struct
- customer_id: int
- customer_details: struct (NEW) <-- Not detected
- name: string
- email: string
)
```
When customer_details are added then while traversing it will first look for
order_id and it will find it in the tables and will convert value from connect
to iceberg and move to customer.
When coming to customer
```
NestedField tableField = lookupStructField(recordField.name(), schema,
structFieldId);
```
This will find the Nested Field as customer does exist and it will proceed
with the Customer for convertValue but as customer being a struct, if you see
the convertValue method
```
private Object convertValue(
Object value, Type type, int fieldId, SchemaUpdate.Consumer
schemaUpdateConsumer) {
if (value == null) {
return null;
}
switch (type.typeId()) {
case STRUCT:
return convertStructValue(value, type.asStructType(), fieldId,
schemaUpdateConsumer);
case LIST:
return convertListValue(value, type.asListType(),
schemaUpdateConsumer);
case MAP:
return convertMapValue(value, type.asMapType(),
schemaUpdateConsumer);
case INTEGER:
return convertInt(value);
case LONG:
return convertLong(value);
case FLOAT:
return convertFloat(value);
case DOUBLE:
return convertDouble(value);
case DECIMAL:
return convertDecimal(value, (DecimalType) type);
case BOOLEAN:
return convertBoolean(value);
case STRING:
return convertString(value);
case UUID:
return convertUUID(value);
case BINARY:
return convertBase64Binary(value);
case FIXED:
return ByteBuffers.toByteArray(convertBase64Binary(value));
case DATE:
return convertDateValue(value);
case TIME:
return convertTimeValue(value);
case TIMESTAMP:
return convertTimestampValue(value, (TimestampType) type);
}
throw new UnsupportedOperationException("Unsupported type: " +
type.typeId());
}
```
It again calls "convertStructValue" with the schema of struct (which here
will be the schema of the "customer" and with "customer" as the parent.
Now the convertStructValue will iterate through the "customer" and will
first find "customer_id" which is in the table so just convert the value and
for "customer_details" the NestedField will be null and this block get's
activated:
```
if (tableField == null) {
// add the column if schema evolution is on, otherwise skip
the value
if (schemaUpdateConsumer != null) {
String parentFieldName =
structFieldId < 0 ? null :
tableSchema.findColumnName(structFieldId);
Type type =
SchemaUtils.toIcebergType(recordField.schema(), config);
schemaUpdateConsumer.addColumn(parentFieldName,
recordField.name(), type);
}
}
```
This is a very primitive nesting, we run kafka-connect-iceberg at huge
scales with schema referring other schemas and even with the referred schema
being evolved our table gets evolved to the new state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]