mxm commented on code in PR #12298: URL: https://github.com/apache/iceberg/pull/12298#discussion_r2227988694
########## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTransformer.java: ########## @@ -0,0 +1,545 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.Deque; +import java.util.List; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.RowTransformer; +import org.apache.iceberg.flink.data.FlinkSchemaVisitor; +import org.apache.iceberg.relocated.com.google.common.collect.Queues; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +class RowDataTransformer implements RowTransformer<RowData> { + private static final int ROOT_POSITION = -1; + private final PositionalGetter<RowData, RowData> getter; + + RowDataTransformer(RowType rowType, Types.StructType struct) { + this.getter = + (PositionalGetter<RowData, RowData>) + RowDataVisitor.visit(rowType, new Schema(struct.fields()), new RowDataVisitor()); + } + + @Override + public RowData transform(RowData data) { + return getter.get(data, ROOT_POSITION); + } + + private interface PositionalGetter<T, I> { + T get(I data, int pos); + } + + private static class RowDataAccessor implements RowData { + private final PositionalGetter<?, RowData>[] getters; + private RowData rowData = null; + + private RowDataAccessor(PositionalGetter<?, RowData>[] getters) { + this.getters = getters; + } + + @Override + public int getArity() { + return rowData.getArity(); + } + + @Override + public RowKind getRowKind() { + return rowData.getRowKind(); + } + + @Override + public void setRowKind(RowKind kind) { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public boolean isNullAt(int pos) { + return rowData.isNullAt(pos); + } + + @Override + public boolean getBoolean(int pos) { + return (boolean) getters[pos].get(rowData, pos); + } + + @Override + public byte getByte(int pos) { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public short getShort(int pos) { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public int getInt(int pos) { + return (int) getters[pos].get(rowData, pos); + } + + @Override + public long getLong(int pos) { + return (long) getters[pos].get(rowData, pos); + } + + @Override + public float getFloat(int pos) { + return (float) getters[pos].get(rowData, pos); + } + + @Override + public double getDouble(int pos) { + return (double) getters[pos].get(rowData, pos); + } + + @Override + public StringData getString(int pos) { + return (StringData) getters[pos].get(rowData, pos); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return (DecimalData) getters[pos].get(rowData, pos); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + return (TimestampData) getters[pos].get(rowData, pos); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public byte[] getBinary(int pos) { + return (byte[]) getters[pos].get(rowData, pos); + } + + @Override + public ArrayData getArray(int pos) { + return (ArrayData) getters[pos].get(rowData, pos); + } + + @Override + public MapData getMap(int pos) { + return (MapData) getters[pos].get(rowData, pos); + } + + @Override + public RowData getRow(int pos, int numFields) { + return (RowData) getters[pos].get(rowData, pos); + } + } + + private static class ArrayDataAccessor implements ArrayData { + private final PositionalGetter<?, ArrayData> getter; + private ArrayData arrayData = null; + + private ArrayDataAccessor(PositionalGetter<?, ArrayData> getter) { + this.getter = getter; + } + + @Override + public int size() { + return arrayData.size(); + } + + @Override + public boolean[] toBooleanArray() { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public byte[] toByteArray() { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public short[] toShortArray() { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public int[] toIntArray() { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public long[] toLongArray() { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public float[] toFloatArray() { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public double[] toDoubleArray() { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public boolean isNullAt(int pos) { + return arrayData.isNullAt(pos); + } + + @Override + public boolean getBoolean(int pos) { + return (boolean) getter.get(arrayData, pos); + } + + @Override + public byte getByte(int pos) { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public short getShort(int pos) { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public int getInt(int pos) { + return (int) getter.get(arrayData, pos); + } + + @Override + public long getLong(int pos) { + return (long) getter.get(arrayData, pos); + } + + @Override + public float getFloat(int pos) { + return (float) getter.get(arrayData, pos); + } + + @Override + public double getDouble(int pos) { + return (double) getter.get(arrayData, pos); + } + + @Override + public StringData getString(int pos) { + return (StringData) getter.get(arrayData, pos); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return (DecimalData) getter.get(arrayData, pos); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + return (TimestampData) getter.get(arrayData, pos); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + throw new UnsupportedOperationException("Not supported in RowDataAccessor"); + } + + @Override + public byte[] getBinary(int pos) { + return (byte[]) getter.get(arrayData, pos); + } + + @Override + public ArrayData getArray(int pos) { + return (ArrayData) getter.get(arrayData, pos); + } + + @Override + public MapData getMap(int pos) { + return (MapData) getter.get(arrayData, pos); + } + + @Override + public RowData getRow(int pos, int numFields) { + return (RowData) getter.get(arrayData, pos); + } + } + + private static class MapDataAccessor implements MapData { + private final ArrayDataAccessor key; + private final ArrayDataAccessor value; + + private MapDataAccessor(ArrayDataAccessor key, ArrayDataAccessor value) { + this.key = key; + this.value = value; + } + + @Override + public int size() { + return key.size(); + } + + @Override + public ArrayData keyArray() { + return key; + } + + @Override + public ArrayData valueArray() { + return value; + } + } + + private static class RowDataVisitor extends FlinkSchemaVisitor<PositionalGetter<?, ?>> { + private final Deque<Boolean> isInList = Queues.newArrayDeque(); + + private RowDataVisitor() { + isInList.push(false); + } + + @Override + public void beforeStruct(Types.StructType type) { + isInList.push(false); + super.beforeStruct(type); + } + + @Override + public void afterStruct(Types.StructType type) { + isInList.pop(); + super.afterStruct(type); + } + + @Override + public void beforeListElement(Types.NestedField elementField) { + isInList.push(true); + super.beforeListElement(elementField); + } + + @Override + public void afterListElement(Types.NestedField elementField) { + isInList.pop(); + super.afterListElement(elementField); + } + + @Override + public void beforeMapKey(Types.NestedField keyField) { + isInList.push(true); + super.beforeMapKey(keyField); + } + + @Override + public void afterMapKey(Types.NestedField keyField) { + isInList.pop(); + super.afterMapKey(keyField); + } + + @Override + public void beforeMapValue(Types.NestedField valueField) { + isInList.push(true); + super.beforeMapValue(valueField); + } + + @Override + public void afterMapValue(Types.NestedField valueField) { + isInList.pop(); + super.afterMapValue(valueField); + } + + @Override + public PositionalGetter<RowData, ?> record( + Types.StructType iStruct, + List<PositionalGetter<?, ?>> results, + List<LogicalType> fieldTypes) { + RowDataAccessor accessor = new RowDataAccessor(results.toArray(new PositionalGetter[0])); + boolean isInListFlag = Boolean.TRUE.equals(isInList.peek()); + return ((data, pos) -> { + if (isInListFlag) { + accessor.rowData = ((ArrayData) data).getRow(pos, iStruct.fields().size()); + } else { + if (pos == ROOT_POSITION) { + accessor.rowData = (RowData) data; + } else { + accessor.rowData = ((RowData) data).getRow(pos, iStruct.fields().size()); + } + } + + return accessor; + }); + } + + @Override + public PositionalGetter<ArrayData, ?> list( + Types.ListType iList, PositionalGetter<?, ?> getter, LogicalType elementType) { + ArrayDataAccessor accessor = new ArrayDataAccessor((PositionalGetter<?, ArrayData>) getter); + boolean isInListFlag = Boolean.TRUE.equals(isInList.peek()); + return (data, pos) -> { + accessor.arrayData = + isInListFlag ? ((ArrayData) data).getArray(pos) : ((RowData) data).getArray(pos); + return accessor; + }; + } + + @Override + public PositionalGetter<MapData, ?> map( + Types.MapType iMap, + PositionalGetter<?, ?> keyGetter, + PositionalGetter<?, ?> valueGetter, + LogicalType keyType, + LogicalType valueType) { + MapDataAccessor mapDataAccessor = + new MapDataAccessor( + new ArrayDataAccessor((PositionalGetter<?, ArrayData>) keyGetter), + new ArrayDataAccessor((PositionalGetter<?, ArrayData>) valueGetter)); + boolean isInListFlag = Boolean.TRUE.equals(isInList.peek()); + return (data, pos) -> { + if (isInListFlag) { + mapDataAccessor.key.arrayData = ((ArrayData) data).getMap(pos).keyArray(); + mapDataAccessor.value.arrayData = ((ArrayData) data).getMap(pos).valueArray(); + } else { + mapDataAccessor.key.arrayData = ((RowData) data).getMap(pos).keyArray(); + mapDataAccessor.value.arrayData = ((RowData) data).getMap(pos).valueArray(); + } + + return mapDataAccessor; + }; + } + + @Override + public PositionalGetter<?, ?> primitive(Type.PrimitiveType type, LogicalType logicalType) { + if (Boolean.TRUE.equals(isInList.peek())) { + return arrayPrimitive(type, logicalType); + } else { + return rowDataPrimitive(type, logicalType); + } + } + + private PositionalGetter<?, RowData> rowDataPrimitive(Type type, LogicalType logicalType) { + switch (type.typeId()) { + case BOOLEAN: + return RowData::getBoolean; + case INTEGER: + if (logicalType.getTypeRoot() == LogicalTypeRoot.TINYINT) { + return (row, pos) -> (int) row.getByte(pos); + } else if (logicalType.getTypeRoot() == LogicalTypeRoot.SMALLINT) { + return (row, pos) -> (int) row.getShort(pos); + } + + return RowData::getInt; + case LONG: + return RowData::getLong; + case FLOAT: + return RowData::getFloat; + case DOUBLE: + return RowData::getDouble; + case STRING: + return RowData::getString; + case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; + return (row, pos) -> + row.getDecimal(pos, decimalType.getPrecision(), decimalType.getScale()); + case DATE: + case TIME: + return RowData::getInt; + case TIMESTAMP: + case TIMESTAMP_NANO: + switch (logicalType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (row, pos) -> + row.getTimestamp(pos, ((TimestampType) logicalType).getPrecision()); + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return (row, pos) -> + row.getTimestamp(pos, ((LocalZonedTimestampType) logicalType).getPrecision()); + default: + throw new UnsupportedOperationException( + "Not supported logical type: " + logicalType.getTypeRoot()); + } + case UUID: + case BINARY: + case FIXED: + return RowData::getBinary; Review Comment: Would it make sense to separate the boilerplate mapping code from the custom type mapping that we see here? It is hard to understand what actually changed. Ideally, we would only supply mappings for the types that changed and leave all others untouched. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org