This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7e1fe395d [flink] Support updating row type nested in array/map in
Flink (#4528)
7e1fe395d is described below
commit 7e1fe395df0831a9d65bf4b6126562252a7a7bf2
Author: tsreaper <[email protected]>
AuthorDate: Thu Nov 14 18:36:02 2024 +0800
[flink] Support updating row type nested in array/map in Flink (#4528)
---
...CastFieldGetter.java => CastElementGetter.java} | 22 ++-
.../org/apache/paimon/casting/CastFieldGetter.java | 3 +
.../org/apache/paimon/casting/CastedArray.java | 201 +++++++++++++++++++++
.../java/org/apache/paimon/casting/CastedMap.java | 70 +++++++
.../java/org/apache/paimon/casting/CastedRow.java | 2 -
.../apache/paimon/schema/SchemaEvolutionUtil.java | 99 ++++++----
.../org/apache/paimon/schema/SchemaManager.java | 44 ++++-
.../apache/paimon/schema/SchemaManagerTest.java | 50 ++++-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 41 ++++-
.../apache/paimon/flink/SchemaChangeITCase.java | 66 ++++++-
10 files changed, 533 insertions(+), 65 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java
b/paimon-common/src/main/java/org/apache/paimon/casting/CastElementGetter.java
similarity index 64%
copy from
paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java
copy to
paimon-common/src/main/java/org/apache/paimon/casting/CastElementGetter.java
index 02168300a..b8a91f572 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/casting/CastElementGetter.java
@@ -18,22 +18,24 @@
package org.apache.paimon.casting;
-import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalArray;
-/**
- * Get field value from row with given pos and cast it according to specific
{@link CastExecutor}.
- */
-public class CastFieldGetter {
- private final InternalRow.FieldGetter fieldGetter;
+/** Get element from array and cast it according to specific {@link
CastExecutor}. */
+public class CastElementGetter {
+
+ private final InternalArray.ElementGetter elementGetter;
private final CastExecutor<Object, Object> castExecutor;
- public CastFieldGetter(InternalRow.FieldGetter fieldGetter,
CastExecutor<?, ?> castExecutor) {
- this.fieldGetter = fieldGetter;
+ @SuppressWarnings("unchecked")
+ public CastElementGetter(
+ InternalArray.ElementGetter elementGetter, CastExecutor<?, ?>
castExecutor) {
+ this.elementGetter = elementGetter;
this.castExecutor = (CastExecutor<Object, Object>) castExecutor;
}
- public <V> V getFieldOrNull(InternalRow row) {
- Object value = fieldGetter.getFieldOrNull(row);
+ @SuppressWarnings("unchecked")
+ public <V> V getElementOrNull(InternalArray array, int pos) {
+ Object value = elementGetter.getElementOrNull(array, pos);
return value == null ? null : (V) castExecutor.cast(value);
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java
b/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java
index 02168300a..208ef5f30 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java
@@ -24,14 +24,17 @@ import org.apache.paimon.data.InternalRow;
* Get field value from row with given pos and cast it according to specific
{@link CastExecutor}.
*/
public class CastFieldGetter {
+
private final InternalRow.FieldGetter fieldGetter;
private final CastExecutor<Object, Object> castExecutor;
+ @SuppressWarnings("unchecked")
public CastFieldGetter(InternalRow.FieldGetter fieldGetter,
CastExecutor<?, ?> castExecutor) {
this.fieldGetter = fieldGetter;
this.castExecutor = (CastExecutor<Object, Object>) castExecutor;
}
+ @SuppressWarnings("unchecked")
public <V> V getFieldOrNull(InternalRow row) {
Object value = fieldGetter.getFieldOrNull(row);
return value == null ? null : (V) castExecutor.cast(value);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java
b/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java
new file mode 100644
index 000000000..778b11d1f
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+
+/**
+ * An implementation of {@link InternalArray} which provides a casted view of
the underlying {@link
+ * InternalArray}.
+ *
+ * <p>It reads data from underlying {@link InternalArray} according to source
logical type and casts
+ * it with specific {@link CastExecutor}.
+ */
+public class CastedArray implements InternalArray {
+
+ private final CastElementGetter castElementGetter;
+ private InternalArray array;
+
+ protected CastedArray(CastElementGetter castElementGetter) {
+ this.castElementGetter = castElementGetter;
+ }
+
+ /**
+ * Replaces the underlying {@link InternalArray} backing this {@link
CastedArray}.
+ *
+ * <p>This method replaces the array in place and does not return a new
object. This is done for
+ * performance reasons.
+ */
+ public static CastedArray from(CastElementGetter castElementGetter) {
+ return new CastedArray(castElementGetter);
+ }
+
+ public CastedArray replaceArray(InternalArray array) {
+ this.array = array;
+ return this;
+ }
+
+ @Override
+ public int size() {
+ return array.size();
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ boolean[] result = new boolean[size()];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = castElementGetter.getElementOrNull(array, i);
+ }
+ return result;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ byte[] result = new byte[size()];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = castElementGetter.getElementOrNull(array, i);
+ }
+ return result;
+ }
+
+ @Override
+ public short[] toShortArray() {
+ short[] result = new short[size()];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = castElementGetter.getElementOrNull(array, i);
+ }
+ return result;
+ }
+
+ @Override
+ public int[] toIntArray() {
+ int[] result = new int[size()];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = castElementGetter.getElementOrNull(array, i);
+ }
+ return result;
+ }
+
+ @Override
+ public long[] toLongArray() {
+ long[] result = new long[size()];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = castElementGetter.getElementOrNull(array, i);
+ }
+ return result;
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ float[] result = new float[size()];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = castElementGetter.getElementOrNull(array, i);
+ }
+ return result;
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ double[] result = new double[size()];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = castElementGetter.getElementOrNull(array, i);
+ }
+ return result;
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return castElementGetter.getElementOrNull(array, pos) == null;
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ return castElementGetter.getElementOrNull(array, pos);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/casting/CastedMap.java
b/paimon-common/src/main/java/org/apache/paimon/casting/CastedMap.java
new file mode 100644
index 000000000..4068407ca
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedMap.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+
+/**
+ * An implementation of {@link InternalMap} which provides a casted view of
the underlying {@link
+ * InternalMap}.
+ *
+ * <p>It reads data from underlying {@link InternalMap} according to source
logical type and casts
+ * it with specific {@link CastExecutor}.
+ */
+public class CastedMap implements InternalMap {
+
+ private final CastedArray castedValueArray;
+ private InternalMap map;
+
+ protected CastedMap(CastElementGetter castValueGetter) {
+ this.castedValueArray = CastedArray.from(castValueGetter);
+ }
+
+ /**
+ * Replaces the underlying {@link InternalMap} backing this {@link
CastedMap}.
+ *
+ * <p>This method replaces the map in place and does not return a new
object. This is done for
+ * performance reasons.
+ */
+ public static CastedMap from(CastElementGetter castValueGetter) {
+ return new CastedMap(castValueGetter);
+ }
+
+ public CastedMap replaceMap(InternalMap map) {
+ this.castedValueArray.replaceArray(map.valueArray());
+ this.map = map;
+ return this;
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public InternalArray keyArray() {
+ return map.keyArray();
+ }
+
+ @Override
+ public InternalArray valueArray() {
+ return castedValueArray;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java
b/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java
index 25c574425..f9216d10b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java
@@ -34,8 +34,6 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
*
* <p>It reads data from underlying {@link InternalRow} according to source
logical type and casts
* it with specific {@link CastExecutor}.
- *
- * <p>Note: This class supports only top-level castings, not nested castings.
*/
public class CastedRow implements InternalRow {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
index b5d730707..0ae2798c2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
+++
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
@@ -19,10 +19,15 @@
package org.apache.paimon.schema;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.casting.CastElementGetter;
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.casting.CastFieldGetter;
+import org.apache.paimon.casting.CastedArray;
+import org.apache.paimon.casting.CastedMap;
import org.apache.paimon.casting.CastedRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
@@ -31,7 +36,6 @@ import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowUtils;
import org.apache.paimon.utils.ProjectedRow;
@@ -372,6 +376,7 @@ public class SchemaEvolutionUtil {
List<DataField> tableFields, List<DataField> dataFields, int[]
indexMapping) {
CastFieldGetter[] converterMapping = new
CastFieldGetter[tableFields.size()];
boolean castExist = false;
+
for (int i = 0; i < tableFields.size(); i++) {
int dataIndex = indexMapping == null ? i : indexMapping[i];
if (dataIndex < 0) {
@@ -380,53 +385,39 @@ public class SchemaEvolutionUtil {
} else {
DataField tableField = tableFields.get(i);
DataField dataField = dataFields.get(dataIndex);
- if (dataField.type().equalsIgnoreNullable(tableField.type())) {
- // Create getter with index i and projected row data will
convert to underlying
- // data
- converterMapping[i] =
- new CastFieldGetter(
-
InternalRowUtils.createNullCheckingFieldGetter(
- dataField.type(), i),
- CastExecutors.identityCastExecutor());
- } else {
- // TODO support column type evolution in nested type
- checkState(
- !(tableField.type() instanceof MapType
- || dataField.type() instanceof ArrayType
- || dataField.type() instanceof
MultisetType),
- "Only support column type evolution in atomic and
row data type.");
-
- CastExecutor<?, ?> castExecutor;
- if (tableField.type() instanceof RowType
- && dataField.type() instanceof RowType) {
- castExecutor =
- createRowCastExecutor(
- (RowType) dataField.type(), (RowType)
tableField.type());
- } else {
- castExecutor = CastExecutors.resolve(dataField.type(),
tableField.type());
- }
- checkNotNull(
- castExecutor,
- "Cannot cast from type "
- + dataField.type()
- + " to type "
- + tableField.type());
-
- // Create getter with index i and projected row data will
convert to underlying
- // data
- converterMapping[i] =
- new CastFieldGetter(
-
InternalRowUtils.createNullCheckingFieldGetter(
- dataField.type(), i),
- castExecutor);
+ if (!dataField.type().equalsIgnoreNullable(tableField.type()))
{
castExist = true;
}
+
+ // Create getter with index i and projected row data will
convert to underlying data
+ converterMapping[i] =
+ new CastFieldGetter(
+
InternalRowUtils.createNullCheckingFieldGetter(dataField.type(), i),
+ createCastExecutor(dataField.type(),
tableField.type()));
}
}
return castExist ? converterMapping : null;
}
+ private static CastExecutor<?, ?> createCastExecutor(DataType inputType,
DataType targetType) {
+ if (targetType.equalsIgnoreNullable(inputType)) {
+ return CastExecutors.identityCastExecutor();
+ } else if (inputType instanceof RowType && targetType instanceof
RowType) {
+ return createRowCastExecutor((RowType) inputType, (RowType)
targetType);
+ } else if (inputType instanceof ArrayType && targetType instanceof
ArrayType) {
+ return createArrayCastExecutor((ArrayType) inputType, (ArrayType)
targetType);
+ } else if (inputType instanceof MapType && targetType instanceof
MapType) {
+ return createMapCastExecutor((MapType) inputType, (MapType)
targetType);
+ } else {
+ return checkNotNull(
+ CastExecutors.resolve(inputType, targetType),
+ "Cannot cast from type %s to type %s",
+ inputType,
+ targetType);
+ }
+ }
+
private static CastExecutor<InternalRow, InternalRow>
createRowCastExecutor(
RowType inputType, RowType targetType) {
int[] indexMapping = createIndexMapping(targetType.getFields(),
inputType.getFields());
@@ -446,4 +437,32 @@ public class SchemaEvolutionUtil {
return value;
};
}
+
+ private static CastExecutor<InternalArray, InternalArray>
createArrayCastExecutor(
+ ArrayType inputType, ArrayType targetType) {
+ CastElementGetter castElementGetter =
+ new CastElementGetter(
+
InternalArray.createElementGetter(inputType.getElementType()),
+ createCastExecutor(
+ inputType.getElementType(),
targetType.getElementType()));
+
+ CastedArray castedArray = CastedArray.from(castElementGetter);
+ return castedArray::replaceArray;
+ }
+
+ private static CastExecutor<InternalMap, InternalMap>
createMapCastExecutor(
+ MapType inputType, MapType targetType) {
+ checkState(
+ inputType.getKeyType().equals(targetType.getKeyType()),
+ "Cannot cast map type %s to map type %s, because they have
different key types.",
+ inputType.getKeyType(),
+ targetType.getKeyType());
+ CastElementGetter castElementGetter =
+ new CastElementGetter(
+
InternalArray.createElementGetter(inputType.getValueType()),
+ createCastExecutor(inputType.getValueType(),
targetType.getValueType()));
+
+ CastedMap castedMap = CastedMap.from(castElementGetter);
+ return castedMap::replaceMap;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 86e365a88..a84348810 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -37,9 +37,11 @@ import
org.apache.paimon.schema.SchemaChange.UpdateColumnPosition;
import org.apache.paimon.schema.SchemaChange.UpdateColumnType;
import org.apache.paimon.schema.SchemaChange.UpdateComment;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeCasts;
+import org.apache.paimon.types.MapType;
import org.apache.paimon.types.ReassignFieldId;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
@@ -636,17 +638,17 @@ public class SchemaManager implements Serializable {
continue;
}
+ String fullFieldName =
+ String.join(".",
Arrays.asList(updateFieldNames).subList(0, depth + 1));
List<DataField> nestedFields =
- new ArrayList<>(
- ((org.apache.paimon.types.RowType)
field.type()).getFields());
+ new ArrayList<>(extractRowType(field.type(),
fullFieldName).getFields());
updateIntermediateColumn(nestedFields, depth + 1);
newFields.set(
i,
new DataField(
field.id(),
field.name(),
- new org.apache.paimon.types.RowType(
- field.type().isNullable(),
nestedFields),
+ wrapNewRowType(field.type(), nestedFields),
field.description()));
return;
}
@@ -656,6 +658,40 @@ public class SchemaManager implements Serializable {
String.join(".",
Arrays.asList(updateFieldNames).subList(0, depth + 1)));
}
+ private RowType extractRowType(DataType type, String fullFieldName) {
+ switch (type.getTypeRoot()) {
+ case ROW:
+ return (RowType) type;
+ case ARRAY:
+ return extractRowType(((ArrayType) type).getElementType(),
fullFieldName);
+ case MAP:
+ return extractRowType(((MapType) type).getValueType(),
fullFieldName);
+ default:
+ throw new IllegalArgumentException(
+ fullFieldName + " is not a structured type.");
+ }
+ }
+
+ private DataType wrapNewRowType(DataType type, List<DataField>
nestedFields) {
+ switch (type.getTypeRoot()) {
+ case ROW:
+ return new RowType(type.isNullable(), nestedFields);
+ case ARRAY:
+ return new ArrayType(
+ type.isNullable(),
+ wrapNewRowType(((ArrayType)
type).getElementType(), nestedFields));
+ case MAP:
+ MapType mapType = (MapType) type;
+ return new MapType(
+ type.isNullable(),
+ mapType.getKeyType(),
+ wrapNewRowType(mapType.getValueType(),
nestedFields));
+ default:
+ throw new IllegalStateException(
+ "Trying to wrap a row type in " + type + ". This
is unexpected.");
+ }
+ }
+
protected abstract void updateLastColumn(List<DataField> newFields,
String fieldName)
throws Catalog.ColumnNotExistException,
Catalog.ColumnAlreadyExistException;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 088cb72f9..f0d654369 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -31,6 +31,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -690,7 +691,7 @@ public class SchemaManagerTest {
SchemaChange updateColumnType =
SchemaChange.updateColumnType(
- new String[] {"v", "f2", "f1"}, DataTypes.BIGINT(),
true);
+ new String[] {"v", "f2", "f1"}, DataTypes.BIGINT(),
false);
manager.commitChanges(updateColumnType);
innerType =
@@ -708,8 +709,53 @@ public class SchemaManagerTest {
SchemaChange middleColumnNotExistUpdateColumnType =
SchemaChange.updateColumnType(
- new String[] {"v", "invalid", "f1"},
DataTypes.BIGINT(), true);
+ new String[] {"v", "invalid", "f1"},
DataTypes.BIGINT(), false);
assertThatCode(() ->
manager.commitChanges(middleColumnNotExistUpdateColumnType))
.hasMessageContaining("Column v.invalid does not exist");
}
+
+ @Test
+ public void testUpdateRowTypeInArrayAndMap() throws Exception {
+ RowType innerType =
+ RowType.of(
+ new DataField(2, "f1", DataTypes.INT()),
+ new DataField(3, "f2", DataTypes.BIGINT()));
+ RowType outerType =
+ RowType.of(
+ new DataField(0, "k", DataTypes.INT()),
+ new DataField(
+ 1, "v", new ArrayType(new
MapType(DataTypes.INT(), innerType))));
+
+ Schema schema =
+ new Schema(
+ outerType.getFields(),
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ new HashMap<>(),
+ "");
+ SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
+ manager.createTable(schema);
+
+ SchemaChange addColumn =
+ SchemaChange.addColumn(
+ new String[] {"v", "f3"},
+ DataTypes.STRING(),
+ null,
+ SchemaChange.Move.first("f3"));
+ SchemaChange dropColumn = SchemaChange.dropColumn(new String[] {"v",
"f2"});
+ SchemaChange updateColumnType =
+ SchemaChange.updateColumnType(new String[] {"v", "f1"},
DataTypes.BIGINT(), false);
+ manager.commitChanges(addColumn, dropColumn, updateColumnType);
+
+ innerType =
+ RowType.of(
+ new DataField(4, "f3", DataTypes.STRING()),
+ new DataField(2, "f1", DataTypes.BIGINT()));
+ outerType =
+ RowType.of(
+ new DataField(0, "k", DataTypes.INT()),
+ new DataField(
+ 1, "v", new ArrayType(new
MapType(DataTypes.INT(), innerType))));
+
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index ae30fa569..09fc0328e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -673,14 +673,13 @@ public class FlinkCatalog extends AbstractCatalog {
org.apache.paimon.types.DataType oldType,
org.apache.paimon.types.DataType newType,
List<SchemaChange> schemaChanges) {
+ String joinedNames = String.join(".", fieldNames);
if (oldType.getTypeRoot() == DataTypeRoot.ROW) {
Preconditions.checkArgument(
newType.getTypeRoot() == DataTypeRoot.ROW,
- "Column "
- + String.join(".", fieldNames)
- + " can only be updated to row type, and cannot be
updated to "
- + newType
- + " type");
+ "Column %s can only be updated to row type, and cannot be
updated to %s type",
+ joinedNames,
+ newType.getTypeRoot());
org.apache.paimon.types.RowType oldRowType =
(org.apache.paimon.types.RowType) oldType;
org.apache.paimon.types.RowType newRowType =
(org.apache.paimon.types.RowType) newType;
@@ -699,7 +698,7 @@ public class FlinkCatalog extends AbstractCatalog {
lastIdx < idx,
"Order of existing fields in column %s must be
kept the same. "
+ "However, field %s and %s have changed
their orders.",
- String.join(".", fieldNames),
+ joinedNames,
lastFieldName,
name);
lastIdx = idx;
@@ -751,6 +750,36 @@ public class FlinkCatalog extends AbstractCatalog {
fullFieldNames, oldField.type(), field.type(),
schemaChanges);
}
}
+ } else if (oldType.getTypeRoot() == DataTypeRoot.ARRAY) {
+ Preconditions.checkArgument(
+ newType.getTypeRoot() == DataTypeRoot.ARRAY,
+ "Column %s can only be updated to array type, and cannot
be updated to %s type",
+ joinedNames,
+ newType);
+ generateNestedColumnUpdates(
+ fieldNames,
+ ((org.apache.paimon.types.ArrayType)
oldType).getElementType(),
+ ((org.apache.paimon.types.ArrayType)
newType).getElementType(),
+ schemaChanges);
+ } else if (oldType.getTypeRoot() == DataTypeRoot.MAP) {
+ Preconditions.checkArgument(
+ newType.getTypeRoot() == DataTypeRoot.MAP,
+ "Column %s can only be updated to map type, and cannot be
updated to %s type",
+ joinedNames,
+ newType);
+ org.apache.paimon.types.MapType oldMapType =
(org.apache.paimon.types.MapType) oldType;
+ org.apache.paimon.types.MapType newMapType =
(org.apache.paimon.types.MapType) newType;
+ Preconditions.checkArgument(
+ oldMapType.getKeyType().equals(newMapType.getKeyType()),
+ "Cannot update key type of column %s from %s type to %s
type",
+ joinedNames,
+ oldMapType.getKeyType(),
+ newMapType.getKeyType());
+ generateNestedColumnUpdates(
+ fieldNames,
+ oldMapType.getValueType(),
+ newMapType.getValueType(),
+ schemaChanges);
} else {
if (!oldType.equalsIgnoreNullable(newType)) {
schemaChanges.add(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index a2ef1d5c8..a8e833215 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -1133,6 +1134,69 @@ public class SchemaChangeITCase extends
CatalogITCaseBase {
assertThatCode(() -> sql("ALTER TABLE T MODIFY (v ROW(f1 BIGINT, f2
INT, f3 STRING))"))
.hasRootCauseMessage(
- "Column v.f2 can only be updated to row type, and
cannot be updated to INT type");
+ "Column v.f2 can only be updated to row type, and
cannot be updated to INTEGER type");
+ }
+
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testUpdateRowInArrayAndMap(String formatType) {
+ sql(
+ "CREATE TABLE T "
+ + "( k INT, v1 ARRAY<ROW(f1 INT, f2 STRING)>, v2
MAP<INT, ROW(f1 STRING, f2 INT)>, PRIMARY KEY (k) NOT ENFORCED ) "
+ + "WITH ( 'bucket' = '1', 'file.format' = '"
+ + formatType
+ + "' )");
+ sql(
+ "INSERT INTO T VALUES "
+ + "(1, ARRAY[ROW(100, 'apple'), ROW(101, 'banana')],
MAP[100, ROW('cat', 1000), 101, ROW('dog', 1001)]), "
+ + "(2, ARRAY[ROW(200, 'pear'), ROW(201, 'grape')],
MAP[200, ROW('tiger', 2000), 201, ROW('wolf', 2001)])");
+
+ Map<Integer, Row> map1 = new HashMap<>();
+ map1.put(100, Row.of("cat", 1000));
+ map1.put(101, Row.of("dog", 1001));
+ Map<Integer, Row> map2 = new HashMap<>();
+ map2.put(200, Row.of("tiger", 2000));
+ map2.put(201, Row.of("wolf", 2001));
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, new Row[] {Row.of(100, "apple"), Row.of(101,
"banana")}, map1),
+ Row.of(2, new Row[] {Row.of(200, "pear"), Row.of(201,
"grape")}, map2));
+
+ sql(
+ "ALTER TABLE T MODIFY (v1 ARRAY<ROW(f1 BIGINT, f2 STRING, f3
STRING)>, v2 MAP<INT, ROW(f3 DOUBLE, f2 INT)>)");
+ sql(
+ "INSERT INTO T VALUES "
+ + "(1, ARRAY[ROW(1000000000000, 'apple', 'A'),
ROW(1000000000001, 'banana', 'B')], MAP[100, ROW(1000.0, 1000), 101,
ROW(1001.0, 1001)]), "
+ + "(3, ARRAY[ROW(3000000000000, 'mango', 'M'),
ROW(3000000000001, 'cherry', 'C')], MAP[300, ROW(3000.0, 3000), 301,
ROW(3001.0, 3001)])");
+
+ map1.clear();
+ map1.put(100, Row.of(1000.0, 1000));
+ map1.put(101, Row.of(1001.0, 1001));
+ map2.clear();
+ map2.put(200, Row.of(null, 2000));
+ map2.put(201, Row.of(null, 2001));
+ Map<Integer, Row> map3 = new HashMap<>();
+ map3.put(300, Row.of(3000.0, 3000));
+ map3.put(301, Row.of(3001.0, 3001));
+ assertThat(sql("SELECT v2, v1, k FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(
+ map1,
+ new Row[] {
+ Row.of(1000000000000L, "apple", "A"),
+ Row.of(1000000000001L, "banana", "B")
+ },
+ 1),
+ Row.of(
+ map2,
+ new Row[] {Row.of(200L, "pear", null),
Row.of(201L, "grape", null)},
+ 2),
+ Row.of(
+ map3,
+ new Row[] {
+ Row.of(3000000000000L, "mango", "M"),
+ Row.of(3000000000001L, "cherry", "C")
+ },
+ 3));
}
}