This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
commit d6874a8b8c363a71d44182db626c62322ad02e41 Author: thesumery <[email protected]> AuthorDate: Sat Nov 5 15:57:58 2022 +0800 [INLONG-6401][Sort] Schema update causes stack overflow in multiple sink scenes (#6407) --- .../sink/multiple/DynamicSchemaHandleOperator.java | 30 +++++++++++----------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java index e7fe68127..a0a9092c6 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java @@ -229,10 +229,12 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi Transaction transaction = table.newTransaction(); if (table.schema().sameSchema(oldSchema)) { List<TableChange> tableChanges = SchemaChangeUtils.diffSchema(oldSchema, newSchema); - if (canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) { - SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges); - LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges); + if (!canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) { + // If can not handle this schema update, should not push data into next operator + return; } + SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges); + LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges); } transaction.commitTransaction(); handleSchemaInfoEvent(tableId, table.schema()); @@ -270,22 +272,20 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi private boolean canHandleWithSchemaUpdatePolicy(TableIdentifier tableId, List<TableChange> tableChanges) { boolean canHandle = true; for (TableChange tableChange : tableChanges) { - if (tableChange instanceof AddColumn) { - canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, - multipleSinkOption.getSchemaUpdatePolicy()); - } else { - if (MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, - multipleSinkOption.getSchemaUpdatePolicy())) { - LOG.info("Ignore table {} schema change: {} because iceberg can't handle it.", - tableId, tableChange); - } + canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, + multipleSinkOption.getSchemaUpdatePolicy()); + if (!(tableChange instanceof AddColumn)) { // todo:currently iceberg can only handle addColumn, so always return false + LOG.info("Ignore table {} schema change: {} because iceberg can't handle it.", + tableId, tableChange); canHandle = false; } + if (!canHandle) { + blacklist.add(tableId); + break; + } } - if (!canHandle) { - blacklist.add(tableId); - } + return canHandle; } }
