AHeise commented on code in PR #28194:
URL: https://github.com/apache/flink/pull/28194#discussion_r3269427382
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -86,8 +112,105 @@ protected MaterializedTableChangeHandler
getHandlerWithChanges() {
return handler;
}
+ @VisibleForTesting
+ public void validateChanges() {
+ final List<TableChange> changes = getTableChanges();
+ final boolean isQueryChange =
+
changes.stream().anyMatch(ModifyDefinitionQuery.class::isInstance);
+ final List<Column> oldColumns =
oldTable.getResolvedSchema().getColumns();
+ final Map<String, Integer> columnIndex =
+ IntStream.range(0, oldColumns.size())
+ .boxed()
+ .collect(
+ Collectors.toMap(
+ i -> oldColumns.get(i).getName(),
Function.identity()));
+ final List<String> errors = new ArrayList<>();
+ for (final TableChange change : changes) {
+ if (change instanceof DropColumn) {
+ checkDroppedColumn((DropColumn) change, oldColumns,
columnIndex, errors);
+ } else if (isQueryChange && change instanceof
ModifyColumnPosition) {
+ checkPositionChange((ModifyColumnPosition) change, oldColumns,
columnIndex, errors);
+ } else if (isQueryChange && change instanceof
ModifyPhysicalColumnType) {
+ checkPhysicalTypeChange(
+ (ModifyPhysicalColumnType) change, oldColumns,
columnIndex, errors);
+ }
+ }
+ if (!errors.isEmpty()) {
+ throw new ValidationException(String.join("\n", errors));
+ }
+ }
+
+ private void checkDroppedColumn(
+ DropColumn change,
+ List<Column> oldColumns,
+ Map<String, Integer> columnIndex,
+ List<String> errors) {
+ final Integer idx = columnIndex.get(change.getColumnName());
Review Comment:
Fixed.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -86,8 +112,105 @@ protected MaterializedTableChangeHandler
getHandlerWithChanges() {
return handler;
}
+ @VisibleForTesting
+ public void validateChanges() {
+ final List<TableChange> changes = getTableChanges();
+ final boolean isQueryChange =
+
changes.stream().anyMatch(ModifyDefinitionQuery.class::isInstance);
+ final List<Column> oldColumns =
oldTable.getResolvedSchema().getColumns();
+ final Map<String, Integer> columnIndex =
+ IntStream.range(0, oldColumns.size())
+ .boxed()
+ .collect(
+ Collectors.toMap(
+ i -> oldColumns.get(i).getName(),
Function.identity()));
+ final List<String> errors = new ArrayList<>();
+ for (final TableChange change : changes) {
+ if (change instanceof DropColumn) {
+ checkDroppedColumn((DropColumn) change, oldColumns,
columnIndex, errors);
+ } else if (isQueryChange && change instanceof
ModifyColumnPosition) {
+ checkPositionChange((ModifyColumnPosition) change, oldColumns,
columnIndex, errors);
+ } else if (isQueryChange && change instanceof
ModifyPhysicalColumnType) {
+ checkPhysicalTypeChange(
+ (ModifyPhysicalColumnType) change, oldColumns,
columnIndex, errors);
+ }
+ }
+ if (!errors.isEmpty()) {
+ throw new ValidationException(String.join("\n", errors));
+ }
+ }
+
+ private void checkDroppedColumn(
+ DropColumn change,
+ List<Column> oldColumns,
+ Map<String, Integer> columnIndex,
+ List<String> errors) {
+ final Integer idx = columnIndex.get(change.getColumnName());
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]