This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 84244672399e21585217589e205d881e05562cf8 Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Mon Dec 23 18:44:13 2024 +0800 [FLINK-36690][cdc-runtime] Fix schema operator hanging under extreme parallelized pressure This closes #3680 --- .../runtime/operators/schema/SchemaOperator.java | 725 --------------------- .../schema/regular/SchemaCoordinator.java | 499 ++++++++++++++ .../SchemaCoordinatorProvider.java} | 53 +- .../operators/schema/regular/SchemaOperator.java | 296 +++++++++ .../{ => regular}/SchemaOperatorFactory.java | 18 +- .../{ => regular}/event/SchemaChangeRequest.java | 23 +- .../{ => regular}/event/SchemaChangeResponse.java | 72 +- .../schema/{ => regular}/SchemaEvolveTest.java | 214 +++--- .../schema/{ => regular}/SchemaOperatorTest.java | 60 +- 9 files changed, 1063 insertions(+), 897 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java deleted file mode 100644 index 6ada032d5..000000000 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ /dev/null @@ -1,725 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.runtime.operators.schema; - -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.cdc.common.annotation.Internal; -import org.apache.flink.cdc.common.annotation.VisibleForTesting; -import org.apache.flink.cdc.common.data.DecimalData; -import org.apache.flink.cdc.common.data.LocalZonedTimestampData; -import org.apache.flink.cdc.common.data.RecordData; -import org.apache.flink.cdc.common.data.StringData; -import org.apache.flink.cdc.common.data.TimestampData; -import org.apache.flink.cdc.common.data.ZonedTimestampData; -import org.apache.flink.cdc.common.event.DataChangeEvent; -import org.apache.flink.cdc.common.event.DropTableEvent; -import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.event.FlushEvent; -import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.event.SchemaChangeEventType; -import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; -import org.apache.flink.cdc.common.route.RouteRule; -import org.apache.flink.cdc.common.schema.Column; -import org.apache.flink.cdc.common.schema.Schema; -import org.apache.flink.cdc.common.schema.Selectors; -import org.apache.flink.cdc.common.types.DataType; -import org.apache.flink.cdc.common.types.DataTypeFamily; -import org.apache.flink.cdc.common.types.DataTypeRoot; -import org.apache.flink.cdc.common.types.DecimalType; -import org.apache.flink.cdc.common.utils.ChangeEventUtils; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils; -import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse; -import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse; -import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse; -import org.apache.flink.cdc.runtime.operators.schema.metrics.SchemaOperatorMetrics; -import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; -import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; -import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; -import org.apache.flink.runtime.operators.coordination.CoordinationRequest; -import org.apache.flink.runtime.operators.coordination.CoordinationResponse; -import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.util.SerializedValue; - -import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder; -import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader; -import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.io.Serializable; -import java.math.BigDecimal; -import java.time.Duration; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; - -import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; - -/** - * The operator will evolve schemas in {@link SchemaRegistry} for incoming {@link - * SchemaChangeEvent}s and block the stream for tables before their schema changes finish. - */ -@Internal -public class SchemaOperator extends AbstractStreamOperator<Event> - implements OneInputStreamOperator<Event, Event>, Serializable { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class); - private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1); - - private final List<RouteRule> routingRules; - - private final String timezone; - - /** - * Storing route source table selector, sink table name (before symbol replacement), and replace - * symbol in a tuple. - */ - private transient List<Tuple3<Selectors, String, String>> routes; - - private transient TaskOperatorEventGateway toCoordinator; - private transient SchemaEvolutionClient schemaEvolutionClient; - private transient LoadingCache<TableId, Schema> originalSchema; - private transient LoadingCache<TableId, Schema> evolvedSchema; - private transient LoadingCache<TableId, Boolean> schemaDivergesMap; - - /** - * Storing mapping relations between upstream tableId (source table) mapping to downstream - * tableIds (sink tables). - */ - private transient LoadingCache<TableId, List<TableId>> tableIdMappingCache; - - private final long rpcTimeOutInMillis; - private final SchemaChangeBehavior schemaChangeBehavior; - - private transient SchemaOperatorMetrics schemaOperatorMetrics; - private transient int subTaskId; - - @VisibleForTesting - public SchemaOperator(List<RouteRule> routingRules) { - this.routingRules = routingRules; - this.chainingStrategy = ChainingStrategy.ALWAYS; - this.rpcTimeOutInMillis = DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis(); - this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE; - this.timezone = "UTC"; - } - - @VisibleForTesting - public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut) { - this.routingRules = routingRules; - this.chainingStrategy = ChainingStrategy.ALWAYS; - this.rpcTimeOutInMillis = rpcTimeOut.toMillis(); - this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE; - this.timezone = "UTC"; - } - - @VisibleForTesting - public SchemaOperator( - List<RouteRule> routingRules, - Duration rpcTimeOut, - SchemaChangeBehavior schemaChangeBehavior) { - this.routingRules = routingRules; - this.chainingStrategy = ChainingStrategy.ALWAYS; - this.rpcTimeOutInMillis = rpcTimeOut.toMillis(); - this.schemaChangeBehavior = schemaChangeBehavior; - this.timezone = "UTC"; - } - - public SchemaOperator( - List<RouteRule> routingRules, - Duration rpcTimeOut, - SchemaChangeBehavior schemaChangeBehavior, - String timezone) { - this.routingRules = routingRules; - this.chainingStrategy = ChainingStrategy.ALWAYS; - this.rpcTimeOutInMillis = rpcTimeOut.toMillis(); - this.schemaChangeBehavior = schemaChangeBehavior; - this.timezone = timezone; - } - - @Override - public void open() throws Exception { - super.open(); - schemaOperatorMetrics = - new SchemaOperatorMetrics( - getRuntimeContext().getMetricGroup(), schemaChangeBehavior); - subTaskId = getRuntimeContext().getIndexOfThisSubtask(); - } - - @Override - public void setup( - StreamTask<?, ?> containingTask, - StreamConfig config, - Output<StreamRecord<Event>> output) { - super.setup(containingTask, config, output); - this.toCoordinator = containingTask.getEnvironment().getOperatorCoordinatorEventGateway(); - routes = - routingRules.stream() - .map( - rule -> { - String tableInclusions = rule.sourceTable; - Selectors selectors = - new Selectors.SelectorsBuilder() - .includeTables(tableInclusions) - .build(); - return new Tuple3<>( - selectors, rule.sinkTable, rule.replaceSymbol); - }) - .collect(Collectors.toList()); - schemaEvolutionClient = new SchemaEvolutionClient(toCoordinator, getOperatorID()); - evolvedSchema = - CacheBuilder.newBuilder() - .expireAfterAccess(CACHE_EXPIRE_DURATION) - .build( - new CacheLoader<TableId, Schema>() { - @Override - public Schema load(TableId tableId) { - return getLatestEvolvedSchema(tableId); - } - }); - originalSchema = - CacheBuilder.newBuilder() - .expireAfterAccess(CACHE_EXPIRE_DURATION) - .build( - new CacheLoader<TableId, Schema>() { - @Override - public Schema load(TableId tableId) throws Exception { - return getLatestOriginalSchema(tableId); - } - }); - schemaDivergesMap = - CacheBuilder.newBuilder() - .expireAfterAccess(CACHE_EXPIRE_DURATION) - .build( - new CacheLoader<TableId, Boolean>() { - @Override - public Boolean load(TableId tableId) throws Exception { - return checkSchemaDiverges(tableId); - } - }); - tableIdMappingCache = - CacheBuilder.newBuilder() - .expireAfterAccess(CACHE_EXPIRE_DURATION) - .build( - new CacheLoader<TableId, List<TableId>>() { - @Override - public List<TableId> load(TableId tableId) { - return getRoutedTables(tableId); - } - }); - } - - /** - * This method is guaranteed to not be called concurrently with other methods of the operator. - */ - @Override - public void processElement(StreamRecord<Event> streamRecord) - throws InterruptedException, TimeoutException, ExecutionException { - Event event = streamRecord.getValue(); - if (event instanceof SchemaChangeEvent) { - processSchemaChangeEvents((SchemaChangeEvent) event); - } else if (event instanceof DataChangeEvent) { - processDataChangeEvents(streamRecord, (DataChangeEvent) event); - } else { - throw new RuntimeException("Unknown event type in Stream record: " + event); - } - } - - private void processSchemaChangeEvents(SchemaChangeEvent event) - throws InterruptedException, TimeoutException, ExecutionException { - TableId tableId = event.tableId(); - LOG.info( - "{}> Table {} received SchemaChangeEvent {} and start to be blocked.", - subTaskId, - tableId, - event); - handleSchemaChangeEvent(tableId, event); - - if (event instanceof DropTableEvent) { - // Update caches unless event is a Drop table event. In that case, no schema will be - // available / necessary. - return; - } - - originalSchema.put(tableId, getLatestOriginalSchema(tableId)); - schemaDivergesMap.put(tableId, checkSchemaDiverges(tableId)); - - List<TableId> optionalRoutedTable = getRoutedTables(tableId); - if (!optionalRoutedTable.isEmpty()) { - tableIdMappingCache - .get(tableId) - .forEach(routed -> evolvedSchema.put(routed, getLatestEvolvedSchema(routed))); - } else { - evolvedSchema.put(tableId, getLatestEvolvedSchema(tableId)); - } - } - - private void processDataChangeEvents(StreamRecord<Event> streamRecord, DataChangeEvent event) { - TableId tableId = event.tableId(); - List<TableId> optionalRoutedTable = getRoutedTables(tableId); - if (!optionalRoutedTable.isEmpty()) { - optionalRoutedTable.forEach( - evolvedTableId -> { - output.collect( - new StreamRecord<>( - normalizeSchemaChangeEvents(event, evolvedTableId, false))); - }); - } else if (Boolean.FALSE.equals(schemaDivergesMap.getIfPresent(tableId))) { - output.collect(new StreamRecord<>(normalizeSchemaChangeEvents(event, true))); - } else { - output.collect(streamRecord); - } - } - - private DataChangeEvent normalizeSchemaChangeEvents( - DataChangeEvent event, boolean tolerantMode) { - return normalizeSchemaChangeEvents(event, event.tableId(), tolerantMode); - } - - private DataChangeEvent normalizeSchemaChangeEvents( - DataChangeEvent event, TableId renamedTableId, boolean tolerantMode) { - try { - Schema originalSchema = this.originalSchema.get(event.tableId()); - Schema evolvedTableSchema = evolvedSchema.get(renamedTableId); - if (originalSchema.equals(evolvedTableSchema)) { - return ChangeEventUtils.recreateDataChangeEvent(event, renamedTableId); - } - switch (event.op()) { - case INSERT: - return DataChangeEvent.insertEvent( - renamedTableId, - regenerateRecordData( - event.after(), - originalSchema, - evolvedTableSchema, - tolerantMode), - event.meta()); - case UPDATE: - return DataChangeEvent.updateEvent( - renamedTableId, - regenerateRecordData( - event.before(), - originalSchema, - evolvedTableSchema, - tolerantMode), - regenerateRecordData( - event.after(), - originalSchema, - evolvedTableSchema, - tolerantMode), - event.meta()); - case DELETE: - return DataChangeEvent.deleteEvent( - renamedTableId, - regenerateRecordData( - event.before(), - originalSchema, - evolvedTableSchema, - tolerantMode), - event.meta()); - case REPLACE: - return DataChangeEvent.replaceEvent( - renamedTableId, - regenerateRecordData( - event.after(), - originalSchema, - evolvedTableSchema, - tolerantMode), - event.meta()); - default: - throw new IllegalArgumentException( - String.format("Unrecognized operation type \"%s\"", event.op())); - } - } catch (Exception e) { - throw new IllegalStateException("Unable to fill null for empty columns", e); - } - } - - private RecordData regenerateRecordData( - RecordData recordData, - Schema originalSchema, - Schema routedTableSchema, - boolean tolerantMode) { - // Regenerate record data - List<RecordData.FieldGetter> fieldGetters = new ArrayList<>(); - for (Column column : routedTableSchema.getColumns()) { - String columnName = column.getName(); - int columnIndex = originalSchema.getColumnNames().indexOf(columnName); - if (columnIndex == -1) { - fieldGetters.add(new NullFieldGetter()); - } else { - RecordData.FieldGetter fieldGetter = - RecordData.createFieldGetter( - originalSchema.getColumn(columnName).get().getType(), columnIndex); - // Check type compatibility, ignoring nullability - if (originalSchema - .getColumn(columnName) - .get() - .getType() - .nullable() - .equals(column.getType().nullable())) { - fieldGetters.add(fieldGetter); - } else { - fieldGetters.add( - new TypeCoercionFieldGetter( - originalSchema.getColumn(columnName).get().getType(), - column.getType(), - fieldGetter, - tolerantMode, - timezone)); - } - } - } - BinaryRecordDataGenerator recordDataGenerator = - new BinaryRecordDataGenerator( - routedTableSchema.getColumnDataTypes().toArray(new DataType[0])); - return recordDataGenerator.generate( - fieldGetters.stream() - .map(fieldGetter -> fieldGetter.getFieldOrNull(recordData)) - .toArray()); - } - - private List<TableId> getRoutedTables(TableId originalTableId) { - return routes.stream() - .filter(route -> route.f0.isMatch(originalTableId)) - .map(route -> resolveReplacement(originalTableId, route)) - .collect(Collectors.toList()); - } - - private TableId resolveReplacement( - TableId originalTable, Tuple3<Selectors, String, String> route) { - if (route.f2 != null) { - return TableId.parse(route.f1.replace(route.f2, originalTable.getTableName())); - } - return TableId.parse(route.f1); - } - - private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) - throws InterruptedException, TimeoutException { - - if (schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION - && schemaChangeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) { - // CreateTableEvent should be applied even in EXCEPTION mode - throw new RuntimeException( - String.format( - "Refused to apply schema change event %s in EXCEPTION mode.", - schemaChangeEvent)); - } - - // The request will block if another schema change event is being handled - SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent); - if (response.isAccepted()) { - LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, tableId); - output.collect(new StreamRecord<>(new FlushEvent(tableId))); - List<SchemaChangeEvent> expectedSchemaChangeEvents = response.getSchemaChangeEvents(); - schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size()); - - // The request will block until flushing finished in each sink writer - SchemaChangeResultResponse schemaEvolveResponse = requestSchemaChangeResult(); - List<SchemaChangeEvent> finishedSchemaChangeEvents = - schemaEvolveResponse.getFinishedSchemaChangeEvents(); - - // Update evolved schema changes based on apply results - finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e))); - } else if (response.isDuplicate()) { - LOG.info( - "{}> Schema change event {} has been handled in another subTask already.", - subTaskId, - schemaChangeEvent); - } else if (response.isIgnored()) { - LOG.info( - "{}> Schema change event {} has been ignored. No schema evolution needed.", - subTaskId, - schemaChangeEvent); - } else { - throw new IllegalStateException("Unexpected response status " + response); - } - } - - private SchemaChangeResponse requestSchemaChange( - TableId tableId, SchemaChangeEvent schemaChangeEvent) - throws InterruptedException, TimeoutException { - long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis; - while (true) { - SchemaChangeResponse response = - sendRequestToCoordinator( - new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId)); - if (response.isRegistryBusy()) { - if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) { - LOG.info( - "{}> Schema Registry is busy now, waiting for next request...", - subTaskId); - Thread.sleep(1000); - } else { - throw new TimeoutException("TimeOut when requesting schema change"); - } - } else { - return response; - } - } - } - - private SchemaChangeResultResponse requestSchemaChangeResult() - throws InterruptedException, TimeoutException { - CoordinationResponse coordinationResponse = - sendRequestToCoordinator(new SchemaChangeResultRequest()); - long nextRpcTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis; - while (coordinationResponse instanceof SchemaChangeProcessingResponse) { - if (System.currentTimeMillis() < nextRpcTimeOutMillis) { - Thread.sleep(1000); - coordinationResponse = sendRequestToCoordinator(new SchemaChangeResultRequest()); - } else { - throw new TimeoutException("TimeOut when requesting release upstream"); - } - } - return ((SchemaChangeResultResponse) coordinationResponse); - } - - private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> - RESPONSE sendRequestToCoordinator(REQUEST request) { - try { - CompletableFuture<CoordinationResponse> responseFuture = - toCoordinator.sendRequestToCoordinator( - getOperatorID(), new SerializedValue<>(request)); - return CoordinationResponseUtils.unwrap(responseFuture.get()); - } catch (Exception e) { - throw new IllegalStateException( - "Failed to send request to coordinator: " + request.toString(), e); - } - } - - private Schema getLatestEvolvedSchema(TableId tableId) { - try { - Optional<Schema> optionalSchema = schemaEvolutionClient.getLatestEvolvedSchema(tableId); - if (!optionalSchema.isPresent()) { - throw new IllegalStateException( - String.format("Schema doesn't exist for table \"%s\"", tableId)); - } - return optionalSchema.get(); - } catch (Exception e) { - throw new IllegalStateException( - String.format("Unable to get latest schema for table \"%s\"", tableId), e); - } - } - - private Schema getLatestOriginalSchema(TableId tableId) { - try { - Optional<Schema> optionalSchema = - schemaEvolutionClient.getLatestOriginalSchema(tableId); - if (!optionalSchema.isPresent()) { - throw new IllegalStateException( - String.format("Schema doesn't exist for table \"%s\"", tableId)); - } - return optionalSchema.get(); - } catch (Exception e) { - throw new IllegalStateException( - String.format("Unable to get latest schema for table \"%s\"", tableId), e); - } - } - - private Boolean checkSchemaDiverges(TableId tableId) { - try { - return getLatestEvolvedSchema(tableId).equals(getLatestOriginalSchema(tableId)); - } catch (IllegalStateException e) { - // schema fetch failed, regard it as diverged - return true; - } - } - - private static class NullFieldGetter implements RecordData.FieldGetter { - @Nullable - @Override - public Object getFieldOrNull(RecordData recordData) { - return null; - } - } - - private static class TypeCoercionFieldGetter implements RecordData.FieldGetter { - private final DataType originalType; - private final DataType destinationType; - private final RecordData.FieldGetter originalFieldGetter; - private final boolean tolerantMode; - private final String timezone; - - public TypeCoercionFieldGetter( - DataType originalType, - DataType destinationType, - RecordData.FieldGetter originalFieldGetter, - boolean tolerantMode, - String timezone) { - this.originalType = originalType; - this.destinationType = destinationType; - this.originalFieldGetter = originalFieldGetter; - this.tolerantMode = tolerantMode; - this.timezone = timezone; - } - - private Object fail(IllegalArgumentException e) throws IllegalArgumentException { - if (tolerantMode) { - return null; - } - throw e; - } - - @Nullable - @Override - public Object getFieldOrNull(RecordData recordData) { - Object originalField = originalFieldGetter.getFieldOrNull(recordData); - if (originalField == null) { - return null; - } - if (destinationType.is(DataTypeRoot.BIGINT)) { - if (originalField instanceof Byte) { - // TINYINT - return ((Byte) originalField).longValue(); - } else if (originalField instanceof Short) { - // SMALLINT - return ((Short) originalField).longValue(); - } else if (originalField instanceof Integer) { - // INT - return ((Integer) originalField).longValue(); - } else if (originalField instanceof Long) { - // BIGINT - return originalField; - } else { - return fail( - new IllegalArgumentException( - String.format( - "Cannot fit type \"%s\" into a BIGINT column. " - + "Currently only TINYINT / SMALLINT / INT / LONG can be accepted by a BIGINT column", - originalField.getClass()))); - } - } else if (destinationType instanceof DecimalType) { - DecimalType decimalType = (DecimalType) destinationType; - BigDecimal decimalValue; - if (originalField instanceof Byte) { - decimalValue = BigDecimal.valueOf(((Byte) originalField).longValue(), 0); - } else if (originalField instanceof Short) { - decimalValue = BigDecimal.valueOf(((Short) originalField).longValue(), 0); - } else if (originalField instanceof Integer) { - decimalValue = BigDecimal.valueOf(((Integer) originalField).longValue(), 0); - } else if (originalField instanceof Long) { - decimalValue = BigDecimal.valueOf((Long) originalField, 0); - } else if (originalField instanceof DecimalData) { - decimalValue = ((DecimalData) originalField).toBigDecimal(); - } else { - return fail( - new IllegalArgumentException( - String.format( - "Cannot fit type \"%s\" into a DECIMAL column. " - + "Currently only BYTE / SHORT / INT / LONG / DECIMAL can be accepted by a DECIMAL column", - originalField.getClass()))); - } - return decimalValue != null - ? DecimalData.fromBigDecimal( - decimalValue, decimalType.getPrecision(), decimalType.getScale()) - : null; - } else if (destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) { - if (originalField instanceof Float) { - // FLOAT - return ((Float) originalField).doubleValue(); - } else { - return fail( - new IllegalArgumentException( - String.format( - "Cannot fit type \"%s\" into a DOUBLE column. " - + "Currently only FLOAT can be accepted by a DOUBLE column", - originalField.getClass()))); - } - } else if (destinationType.is(DataTypeRoot.VARCHAR)) { - if (originalField instanceof StringData) { - return originalField; - } else { - return fail( - new IllegalArgumentException( - String.format( - "Cannot fit type \"%s\" into a STRING column. " - + "Currently only CHAR / VARCHAR can be accepted by a STRING column", - originalField.getClass()))); - } - } else if (destinationType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) - && originalType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) { - // For now, TimestampData / ZonedTimestampData / LocalZonedTimestampData has no - // difference in its internal representation, so there's no need to do any precision - // conversion. - return originalField; - } else if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE) - && originalType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) { - return originalField; - } else if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) - && originalType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) { - return originalField; - } else if (destinationType.is(DataTypeFamily.TIMESTAMP) - && originalType.is(DataTypeFamily.TIMESTAMP)) { - return castToTimestamp(originalField, timezone); - } else { - return fail( - new IllegalArgumentException( - String.format( - "Column type \"%s\" doesn't support type coercion", - destinationType))); - } - } - } - - @Override - public void snapshotState(StateSnapshotContext context) throws Exception { - // Needless to do anything, since AbstractStreamOperator#snapshotState and #processElement - // is guaranteed not to be mixed together. - } - - private static TimestampData castToTimestamp(Object object, String timezone) { - if (object == null) { - return null; - } - if (object instanceof LocalZonedTimestampData) { - return TimestampData.fromLocalDateTime( - LocalDateTime.ofInstant( - ((LocalZonedTimestampData) object).toInstant(), ZoneId.of(timezone))); - } else if (object instanceof ZonedTimestampData) { - return TimestampData.fromLocalDateTime( - LocalDateTime.ofInstant( - ((ZonedTimestampData) object).toInstant(), ZoneId.of(timezone))); - } else { - throw new IllegalArgumentException( - String.format( - "Unable to implicitly coerce object `%s` as a TIMESTAMP.", object)); - } - } -} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java new file mode 100644 index 000000000..23edeccb4 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.schema.regular; + +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.route.RouteRule; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.SchemaMergingUtils; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator; +import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager; +import org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry; +import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent; +import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest; +import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse; +import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import static org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils.wrap; + +/** Coordinator node for {@link SchemaOperator}. Registry actor in regular topology. */ +public class SchemaCoordinator extends SchemaRegistry { + private static final Logger LOG = LoggerFactory.getLogger(SchemaCoordinator.class); + + /** Executor service to execute schema change. */ + private final ExecutorService schemaChangeThreadPool; + + /** + * Atomic flag indicating if current RequestHandler could accept more schema changes for now. + */ + private transient RequestStatus schemaChangeStatus; + + /** Sink writers which have sent flush success events for the request. */ + private transient ConcurrentHashMap<Integer, Set<Integer>> flushedSinkWriters; + + /** Currently handling request's completable future. */ + private transient CompletableFuture<CoordinationResponse> pendingResponseFuture; + + // Static fields + public SchemaCoordinator( + String operatorName, + OperatorCoordinator.Context context, + ExecutorService coordinatorExecutor, + MetadataApplier metadataApplier, + List<RouteRule> routes, + SchemaChangeBehavior schemaChangeBehavior, + Duration rpcTimeout) { + super( + context, + operatorName, + coordinatorExecutor, + metadataApplier, + routes, + schemaChangeBehavior, + rpcTimeout); + this.schemaChangeThreadPool = Executors.newSingleThreadExecutor(); + } + + @Override + public void start() throws Exception { + super.start(); + this.flushedSinkWriters = new ConcurrentHashMap<>(); + this.schemaChangeStatus = RequestStatus.IDLE; + } + + @Override + public void close() throws Exception { + super.close(); + if (schemaChangeThreadPool != null && !schemaChangeThreadPool.isShutdown()) { + schemaChangeThreadPool.shutdownNow(); + } + } + + @Override + protected void snapshot(CompletableFuture<byte[]> resultFuture) throws Exception { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + // Serialize SchemaManager + int schemaManagerSerializerVersion = SchemaManager.SERIALIZER.getVersion(); + out.writeInt(schemaManagerSerializerVersion); + byte[] serializedSchemaManager; + serializedSchemaManager = SchemaManager.SERIALIZER.serialize(schemaManager); + out.writeInt(serializedSchemaManager.length); + out.write(serializedSchemaManager); + + // Length-bit for SchemaDerivation, which is no longer necessary. + out.writeInt(0); + resultFuture.complete(baos.toByteArray()); + } + } + + @Override + protected void restore(byte[] checkpointData) throws Exception { + try (ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData); + DataInputStream in = new DataInputStream(bais)) { + int schemaManagerSerializerVersion = in.readInt(); + + switch (schemaManagerSerializerVersion) { + case 0: + { + int length = in.readInt(); + byte[] serializedSchemaManager = new byte[length]; + in.readFully(serializedSchemaManager); + schemaManager = + SchemaManager.SERIALIZER.deserialize( + schemaManagerSerializerVersion, serializedSchemaManager); + break; + } + case 1: + case 2: + { + int length = in.readInt(); + byte[] serializedSchemaManager = new byte[length]; + in.readFully(serializedSchemaManager); + schemaManager = + SchemaManager.SERIALIZER.deserialize( + schemaManagerSerializerVersion, serializedSchemaManager); + consumeUnusedSchemaDerivationBytes(in); + break; + } + default: + throw new IOException( + "Unrecognized serialization version " + schemaManagerSerializerVersion); + } + } + } + + @Override + protected void handleCustomCoordinationRequest( + CoordinationRequest request, CompletableFuture<CoordinationResponse> responseFuture) { + if (request instanceof SchemaChangeRequest) { + handleSchemaChangeRequest((SchemaChangeRequest) request, responseFuture); + } else { + throw new UnsupportedOperationException( + "Unknown coordination request type: " + request); + } + } + + @Override + protected void handleFlushSuccessEvent(FlushSuccessEvent event) { + int sinkSubtask = event.getSinkSubTaskId(); + int sourceSubtask = event.getSourceSubTaskId(); + LOG.info( + "Sink subtask {} succeed flushing from source subTask {}.", + sinkSubtask, + sourceSubtask); + if (!flushedSinkWriters.containsKey(sourceSubtask)) { + flushedSinkWriters.put(sourceSubtask, ConcurrentHashMap.newKeySet()); + } + flushedSinkWriters.get(sourceSubtask).add(sinkSubtask); + LOG.info( + "Currently flushed sink writers for source task {} are: {}", + sourceSubtask, + flushedSinkWriters.get(sourceSubtask)); + } + + @Override + protected void handleUnrecoverableError(String taskDescription, Throwable t) { + super.handleUnrecoverableError(taskDescription, t); + + // There's a pending future, release it exceptionally before quitting + if (pendingResponseFuture != null) { + pendingResponseFuture.completeExceptionally(t); + } + } + + /** + * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing. + * + * @param request the received SchemaChangeRequest + */ + public void handleSchemaChangeRequest( + SchemaChangeRequest request, CompletableFuture<CoordinationResponse> responseFuture) { + + // We use subTaskId to identify each schema change request + int subTaskId = request.getSubTaskId(); + + if (schemaChangeStatus == RequestStatus.IDLE) { + if (activeSinkWriters.size() < currentParallelism) { + LOG.info( + "Not all active sink writers have been registered. Current {}, expected {}.", + activeSinkWriters.size(), + currentParallelism); + responseFuture.complete(wrap(SchemaChangeResponse.waitingForFlush())); + return; + } + + if (!activeSinkWriters.equals(flushedSinkWriters.get(subTaskId))) { + LOG.info( + "Not all active sink writers have completed flush. Flushed writers: {}, expected: {}.", + flushedSinkWriters.get(subTaskId), + activeSinkWriters); + responseFuture.complete(wrap(SchemaChangeResponse.waitingForFlush())); + return; + } + + LOG.info( + "All sink writers have flushed for subTaskId {}. Switching to APPLYING state and starting schema evolution...", + subTaskId); + flushedSinkWriters.remove(subTaskId); + schemaChangeStatus = RequestStatus.APPLYING; + pendingResponseFuture = responseFuture; + startSchemaChangesEvolve(request, responseFuture); + } else { + responseFuture.complete(wrap(SchemaChangeResponse.busy())); + } + } + + private void startSchemaChangesEvolve( + SchemaChangeRequest request, CompletableFuture<CoordinationResponse> responseFuture) { + SchemaChangeEvent originalEvent = request.getSchemaChangeEvent(); + TableId originalTableId = originalEvent.tableId(); + Schema currentUpstreamSchema = + schemaManager.getLatestOriginalSchema(originalTableId).orElse(null); + + List<SchemaChangeEvent> deducedSchemaChangeEvents = new ArrayList<>(); + + // For redundant schema change events (possibly coming from duplicate emitted + // CreateTableEvents in snapshot stage), we just skip them. + if (!SchemaUtils.isSchemaChangeEventRedundant(currentUpstreamSchema, originalEvent)) { + schemaManager.applyOriginalSchemaChange(originalEvent); + deducedSchemaChangeEvents.addAll(deduceEvolvedSchemaChanges(originalEvent)); + } else { + LOG.info( + "Schema change event {} is redundant for current schema {}, just skip it.", + originalEvent, + currentUpstreamSchema); + } + + LOG.info( + "All sink subtask have flushed for table {}. Start to apply schema change request: \n\t{}\nthat extracts to:\n\t{}", + request.getTableId().toString(), + request, + deducedSchemaChangeEvents.stream() + .map(SchemaChangeEvent::toString) + .collect(Collectors.joining("\n\t"))); + schemaChangeThreadPool.submit( + () -> { + try { + applySchemaChange(originalEvent, deducedSchemaChangeEvents); + } catch (Throwable t) { + failJob( + "Schema change applying task", + new FlinkRuntimeException( + "Failed to apply schema change event.", t)); + throw t; + } + }); + } + + private List<SchemaChangeEvent> deduceEvolvedSchemaChanges(SchemaChangeEvent event) { + LOG.info("Step 1 - Start deducing evolved schema change for {}", event); + + TableId originalTableId = event.tableId(); + List<SchemaChangeEvent> deducedSchemaChangeEvents = new ArrayList<>(); + Set<TableId> originalTables = schemaManager.getAllOriginalTables(); + + // First, grab all affected evolved tables. + Set<TableId> affectedEvolvedTables = + SchemaDerivator.getAffectedEvolvedTables( + router, Collections.singleton(originalTableId)); + LOG.info("Step 2 - Affected downstream tables are: {}", affectedEvolvedTables); + + // For each affected table, we need to... + for (TableId evolvedTableId : affectedEvolvedTables) { + Schema currentEvolvedSchema = + schemaManager.getLatestEvolvedSchema(evolvedTableId).orElse(null); + LOG.info( + "Step 3.1 - For to-be-evolved table {} with schema {}...", + evolvedTableId, + currentEvolvedSchema); + + // ... reversely look up this affected sink table's upstream dependency + Set<TableId> upstreamDependencies = + SchemaDerivator.reverseLookupDependingUpstreamTables( + router, evolvedTableId, originalTables); + Preconditions.checkArgument( + !upstreamDependencies.isEmpty(), + "An affected sink table's upstream dependency cannot be empty."); + LOG.info("Step 3.2 - upstream dependency tables are: {}", upstreamDependencies); + + List<SchemaChangeEvent> rawSchemaChangeEvents = new ArrayList<>(); + if (upstreamDependencies.size() == 1) { + // If it's a one-by-one routing rule, we can simply forward it to downstream sink. + SchemaChangeEvent rawEvent = event.copy(evolvedTableId); + rawSchemaChangeEvents.add(rawEvent); + LOG.info( + "Step 3.3 - It's an one-by-one routing and could be forwarded as {}.", + rawEvent); + } else { + Set<Schema> toBeMergedSchemas = + SchemaDerivator.reverseLookupDependingUpstreamSchemas( + router, evolvedTableId, schemaManager); + LOG.info("Step 3.3 - Upstream dependency schemas are: {}.", toBeMergedSchemas); + + // We're in a table routing mode now, so we need to infer a widest schema for all + // upstream tables. + Schema mergedSchema = currentEvolvedSchema; + for (Schema toBeMergedSchema : toBeMergedSchemas) { + mergedSchema = + SchemaMergingUtils.getLeastCommonSchema(mergedSchema, toBeMergedSchema); + } + LOG.info("Step 3.4 - Deduced widest schema is: {}.", mergedSchema); + + // Detect what schema changes we need to apply to get expected sink table. + List<SchemaChangeEvent> rawEvents = + SchemaMergingUtils.getSchemaDifference( + evolvedTableId, currentEvolvedSchema, mergedSchema); + LOG.info( + "Step 3.5 - It's an many-to-one routing and causes schema changes: {}.", + rawEvents); + + rawSchemaChangeEvents.addAll(rawEvents); + } + + // Finally, we normalize schema change events, including rewriting events by current + // schema change behavior configuration, dropping explicitly excluded schema change + // event types. + List<SchemaChangeEvent> normalizedEvents = + SchemaDerivator.normalizeSchemaChangeEvents( + currentEvolvedSchema, rawSchemaChangeEvents, behavior, metadataApplier); + LOG.info( + "Step 4 - After being normalized with {} behavior, final schema change events are: {}", + behavior, + normalizedEvents); + + deducedSchemaChangeEvents.addAll(normalizedEvents); + } + + return deducedSchemaChangeEvents; + } + + /** Applies the schema change to the external system. */ + private void applySchemaChange( + SchemaChangeEvent originalEvent, List<SchemaChangeEvent> deducedSchemaChangeEvents) { + if (SchemaChangeBehavior.EXCEPTION.equals(behavior)) { + if (deducedSchemaChangeEvents.stream() + .anyMatch(evt -> !(evt instanceof CreateTableEvent))) { + SchemaChangeEvent unacceptableSchemaChangeEvent = + deducedSchemaChangeEvents.stream() + .filter(evt -> !(evt instanceof CreateTableEvent)) + .findAny() + .get(); + throw new SchemaEvolveException( + unacceptableSchemaChangeEvent, + "Unexpected schema change events occurred in EXCEPTION mode. Job will fail now."); + } + } + + // Tries to apply it to external system + List<SchemaChangeEvent> appliedSchemaChangeEvents = new ArrayList<>(); + for (SchemaChangeEvent event : deducedSchemaChangeEvents) { + if (applyAndUpdateEvolvedSchemaChange(event)) { + appliedSchemaChangeEvents.add(event); + } + } + + Map<TableId, Schema> refreshedEvolvedSchemas = new HashMap<>(); + + // We need to retrieve all possibly modified evolved schemas and refresh SchemaOperator's + // local cache since it might have been altered by another SchemaOperator instance. + // SchemaChangeEvents doesn't need to be emitted to downstream (since it might be broadcast + // from other SchemaOperators) though. + for (TableId tableId : router.route(originalEvent.tableId())) { + refreshedEvolvedSchemas.put( + tableId, schemaManager.getLatestEvolvedSchema(tableId).orElse(null)); + } + + // And returns all successfully applied schema change events to SchemaOperator. + pendingResponseFuture.complete( + wrap( + SchemaChangeResponse.success( + appliedSchemaChangeEvents, refreshedEvolvedSchemas))); + pendingResponseFuture = null; + + Preconditions.checkState( + schemaChangeStatus == RequestStatus.APPLYING, + "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not " + + schemaChangeStatus); + schemaChangeStatus = RequestStatus.IDLE; + LOG.info("SchemaChangeStatus switched from APPLYING to IDLE."); + } + + private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) { + try { + metadataApplier.applySchemaChange(schemaChangeEvent); + schemaManager.applyEvolvedSchemaChange(schemaChangeEvent); + LOG.info( + "Successfully applied schema change event {} to external system.", + schemaChangeEvent); + return true; + } catch (Throwable t) { + if (shouldIgnoreException(t)) { + LOG.warn( + "Failed to apply schema change {}, but keeps running in tolerant mode. Caused by: {}", + schemaChangeEvent, + t); + return false; + } else { + throw t; + } + } + } + + // ------------------------- + // Utilities + // ------------------------- + + private boolean shouldIgnoreException(Throwable throwable) { + // In IGNORE mode, will never try to apply schema change events + // In EVOLVE and LENIENT mode, such failure will not be tolerated + // In EXCEPTION mode, an exception will be thrown once captured + return (throwable instanceof UnsupportedSchemaChangeEventException) + && (SchemaChangeBehavior.TRY_EVOLVE.equals(behavior)); + } + + /** + * {@code IDLE}: Initial idling state, ready for requests. <br> + * {@code APPLYING}: When schema change application finishes (successfully or with exceptions) + */ + private enum RequestStatus { + IDLE, + APPLYING + } + + /** + * Before Flink CDC 3.3, we store routing rules into {@link SchemaCoordinator}'s state, which + * turns out to be unnecessary since data stream topology might change after stateful restarts, + * and stale routing status is both unnecessary and erroneous. This function consumes these + * bytes from the state, but never returns them. + */ + private void consumeUnusedSchemaDerivationBytes(DataInputStream in) throws IOException { + TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE; + int derivationMappingSize = in.readInt(); + Map<TableId, Set<TableId>> derivationMapping = new HashMap<>(derivationMappingSize); + for (int i = 0; i < derivationMappingSize; i++) { + // Routed table ID + TableId routedTableId = + tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in)); + // Original table IDs + int numOriginalTables = in.readInt(); + Set<TableId> originalTableIds = new HashSet<>(numOriginalTables); + for (int j = 0; j < numOriginalTables; j++) { + TableId originalTableId = + tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in)); + originalTableIds.add(originalTableId); + } + derivationMapping.put(routedTableId, originalTableIds); + } + } +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java similarity index 53% copy from flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java copy to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java index 367f65597..253b52cac 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java @@ -15,50 +15,67 @@ * limitations under the License. */ -package org.apache.flink.cdc.runtime.operators.schema; +package org.apache.flink.cdc.runtime.operators.schema.regular; import org.apache.flink.cdc.common.annotation.Internal; -import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider; +import org.apache.flink.cdc.runtime.operators.schema.common.CoordinatorExecutorThreadFactory; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; -import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; -import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; -import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import java.time.Duration; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; -/** Factory to create {@link SchemaOperator}. */ +/** Provider of {@link SchemaCoordinator}. */ @Internal -public class SchemaOperatorFactory extends SimpleOperatorFactory<Event> - implements CoordinatedOperatorFactory<Event>, OneInputStreamOperatorFactory<Event, Event> { - +public class SchemaCoordinatorProvider implements OperatorCoordinator.Provider { private static final long serialVersionUID = 1L; + private final OperatorID operatorID; + private final String operatorName; private final MetadataApplier metadataApplier; private final List<RouteRule> routingRules; private final SchemaChangeBehavior schemaChangeBehavior; + private final Duration rpcTimeout; - public SchemaOperatorFactory( + public SchemaCoordinatorProvider( + OperatorID operatorID, + String operatorName, MetadataApplier metadataApplier, List<RouteRule> routingRules, - Duration rpcTimeOut, SchemaChangeBehavior schemaChangeBehavior, - String timezone) { - super(new SchemaOperator(routingRules, rpcTimeOut, schemaChangeBehavior, timezone)); + Duration rpcTimeout) { + this.operatorID = operatorID; + this.operatorName = operatorName; this.metadataApplier = metadataApplier; this.routingRules = routingRules; this.schemaChangeBehavior = schemaChangeBehavior; + this.rpcTimeout = rpcTimeout; + } + + @Override + public OperatorID getOperatorId() { + return operatorID; } @Override - public OperatorCoordinator.Provider getCoordinatorProvider( - String operatorName, OperatorID operatorID) { - return new SchemaRegistryProvider( - operatorID, operatorName, metadataApplier, routingRules, schemaChangeBehavior); + public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception { + CoordinatorExecutorThreadFactory coordinatorThreadFactory = + new CoordinatorExecutorThreadFactory( + "schema-evolution-coordinator", context.getUserCodeClassloader()); + ExecutorService coordinatorExecutor = + Executors.newSingleThreadExecutor(coordinatorThreadFactory); + return new SchemaCoordinator( + operatorName, + context, + coordinatorExecutor, + metadataApplier, + routingRules, + schemaChangeBehavior, + rpcTimeout); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java new file mode 100644 index 000000000..9ea79d729 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.schema.regular; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.route.RouteRule; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils; +import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator; +import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter; +import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics; +import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest; +import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse; +import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.SerializedValue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; + +/** + * The operator will evolve schemas in {@link + * org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator} for incoming {@link + * SchemaChangeEvent}s and block the stream for tables before their schema changes finish. + */ +@Internal +public class SchemaOperator extends AbstractStreamOperator<Event> + implements OneInputStreamOperator<Event, Event>, Serializable { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class); + + // Final fields that are set in constructor + private final String timezone; + private final Duration rpcTimeout; + private final SchemaChangeBehavior schemaChangeBehavior; + private final List<RouteRule> routingRules; + + // Transient fields that are set during open() + private transient int subTaskId; + private transient TaskOperatorEventGateway toCoordinator; + private transient SchemaOperatorMetrics schemaOperatorMetrics; + private transient volatile Map<TableId, Schema> originalSchemaMap; + private transient volatile Map<TableId, Schema> evolvedSchemaMap; + private transient TableIdRouter router; + private transient SchemaDerivator derivator; + + @VisibleForTesting + public SchemaOperator(List<RouteRule> routingRules) { + this(routingRules, DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT); + } + + @VisibleForTesting + public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut) { + this(routingRules, rpcTimeOut, SchemaChangeBehavior.EVOLVE); + } + + @VisibleForTesting + public SchemaOperator( + List<RouteRule> routingRules, + Duration rpcTimeOut, + SchemaChangeBehavior schemaChangeBehavior) { + this(routingRules, rpcTimeOut, schemaChangeBehavior, "UTC"); + } + + public SchemaOperator( + List<RouteRule> routingRules, + Duration rpcTimeOut, + SchemaChangeBehavior schemaChangeBehavior, + String timezone) { + this.chainingStrategy = ChainingStrategy.ALWAYS; + this.rpcTimeout = rpcTimeOut; + this.schemaChangeBehavior = schemaChangeBehavior; + this.timezone = timezone; + this.routingRules = routingRules; + } + + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<Event>> output) { + super.setup(containingTask, config, output); + this.toCoordinator = containingTask.getEnvironment().getOperatorCoordinatorEventGateway(); + } + + @Override + public void open() throws Exception { + super.open(); + this.schemaOperatorMetrics = + new SchemaOperatorMetrics( + getRuntimeContext().getMetricGroup(), schemaChangeBehavior); + this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + this.originalSchemaMap = new HashMap<>(); + this.evolvedSchemaMap = new HashMap<>(); + this.router = new TableIdRouter(routingRules); + this.derivator = new SchemaDerivator(); + } + + /** + * This method is guaranteed to not be called concurrently with other methods of the operator. + */ + @Override + public void processElement(StreamRecord<Event> streamRecord) throws Exception { + Event event = streamRecord.getValue(); + if (event instanceof SchemaChangeEvent) { + handleSchemaChangeEvent((SchemaChangeEvent) event); + } else if (event instanceof DataChangeEvent) { + handleDataChangeEvent((DataChangeEvent) event); + } else { + throw new RuntimeException("Unknown event type in Stream record: " + event); + } + } + + private void handleSchemaChangeEvent(SchemaChangeEvent originalEvent) throws Exception { + // First, update original schema map unconditionally and it will never fail + TableId tableId = originalEvent.tableId(); + originalSchemaMap.compute( + tableId, + (tId, schema) -> SchemaUtils.applySchemaChangeEvent(schema, originalEvent)); + schemaOperatorMetrics.increaseSchemaChangeEvents(1); + + // First, send FlushEvent or it might be blocked later + LOG.info("{}> Sending the FlushEvent.", subTaskId); + output.collect(new StreamRecord<>(new FlushEvent(subTaskId))); + + // Then, queue to request schema change to SchemaCoordinator. + SchemaChangeResponse response = requestSchemaChange(tableId, originalEvent); + + if (response.isSuccess()) { + LOG.info("{}> Successfully requested schema change.", subTaskId); + LOG.info( + "{}> Finished schema change events: {}", + subTaskId, + response.getAppliedSchemaChangeEvents()); + LOG.info("{}> Refreshed evolved schemas: {}", subTaskId, response.getEvolvedSchemas()); + + // After this request got successfully applied to DBMS, we can... + List<SchemaChangeEvent> finishedSchemaChangeEvents = + response.getAppliedSchemaChangeEvents(); + + // Update local evolved schema map's cache + evolvedSchemaMap.putAll(response.getEvolvedSchemas()); + + // and emit the finished event to downstream + for (SchemaChangeEvent finishedEvent : finishedSchemaChangeEvents) { + output.collect(new StreamRecord<>(finishedEvent)); + } + + schemaOperatorMetrics.increaseFinishedSchemaChangeEvents( + finishedSchemaChangeEvents.size()); + } else if (response.isDuplicate()) { + LOG.info( + "{}> Schema change event {} has been handled in another subTask already.", + subTaskId, + originalEvent); + + schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(1); + } else if (response.isIgnored()) { + LOG.info( + "{}> Schema change event {} has been ignored. No schema evolution needed.", + subTaskId, + originalEvent); + + schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(1); + } else { + throw new IllegalStateException("Unexpected response status: " + response); + } + } + + private void handleDataChangeEvent(DataChangeEvent dataChangeEvent) { + TableId tableId = dataChangeEvent.tableId(); + + // First, we obtain the original schema corresponding to this data change event + Schema originalSchema = originalSchemaMap.get(dataChangeEvent.tableId()); + + // Then, for each routing terminus, coerce data records to the expected schema + for (TableId sinkTableId : router.route(tableId)) { + Schema evolvedSchema = evolvedSchemaMap.get(sinkTableId); + DataChangeEvent coercedDataRecord = + derivator + .coerceDataRecord( + timezone, + DataChangeEvent.route(dataChangeEvent, sinkTableId), + originalSchema, + evolvedSchema) + .orElseThrow( + () -> + new IllegalStateException( + String.format( + "Unable to coerce data record from %s (schema: %s) to %s (schema: %s)", + tableId, + originalSchema, + sinkTableId, + evolvedSchema))); + output.collect(new StreamRecord<>(coercedDataRecord)); + } + } + + private SchemaChangeResponse requestSchemaChange( + TableId tableId, SchemaChangeEvent schemaChangeEvent) + throws InterruptedException, TimeoutException { + long deadline = System.currentTimeMillis() + rpcTimeout.toMillis(); + while (true) { + SchemaChangeResponse response = + sendRequestToCoordinator( + new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId)); + if (System.currentTimeMillis() < deadline) { + if (response.isRegistryBusy()) { + LOG.info( + "{}> Schema Registry is busy now, waiting for next request...", + subTaskId); + Thread.sleep(1000); + } else if (response.isWaitingForFlush()) { + LOG.info( + "{}> Schema change event has not collected enough flush success events from writers, waiting...", + subTaskId); + Thread.sleep(1000); + } else { + return response; + } + } else { + throw new TimeoutException("Timeout when requesting schema change."); + } + } + } + + private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> + RESPONSE sendRequestToCoordinator(REQUEST request) { + try { + CompletableFuture<CoordinationResponse> responseFuture = + toCoordinator.sendRequestToCoordinator( + getOperatorID(), new SerializedValue<>(request)); + return CoordinationResponseUtils.unwrap( + responseFuture.get(rpcTimeout.toMillis(), TimeUnit.MILLISECONDS)); + } catch (Exception e) { + throw new IllegalStateException( + "Failed to send request to coordinator: " + request.toString(), e); + } + } + + /** Visible for mocking in test cases. */ + @VisibleForTesting + protected int getCurrentTimestamp() { + return (int) Instant.now().getEpochSecond(); + } + + @VisibleForTesting + public void registerInitialSchema(TableId tableId, Schema schema) { + originalSchemaMap.put(tableId, schema); + evolvedSchemaMap.put(tableId, schema); + } +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java similarity index 84% rename from flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java rename to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java index 367f65597..630acc904 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java @@ -15,14 +15,13 @@ * limitations under the License. */ -package org.apache.flink.cdc.runtime.operators.schema; +package org.apache.flink.cdc.runtime.operators.schema.regular; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; @@ -42,23 +41,30 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory<Event> private final MetadataApplier metadataApplier; private final List<RouteRule> routingRules; private final SchemaChangeBehavior schemaChangeBehavior; + private final Duration rpcTimeout; public SchemaOperatorFactory( MetadataApplier metadataApplier, List<RouteRule> routingRules, - Duration rpcTimeOut, + Duration rpcTimeout, SchemaChangeBehavior schemaChangeBehavior, String timezone) { - super(new SchemaOperator(routingRules, rpcTimeOut, schemaChangeBehavior, timezone)); + super(new SchemaOperator(routingRules, rpcTimeout, schemaChangeBehavior, timezone)); this.metadataApplier = metadataApplier; this.routingRules = routingRules; this.schemaChangeBehavior = schemaChangeBehavior; + this.rpcTimeout = rpcTimeout; } @Override public OperatorCoordinator.Provider getCoordinatorProvider( String operatorName, OperatorID operatorID) { - return new SchemaRegistryProvider( - operatorID, operatorName, metadataApplier, routingRules, schemaChangeBehavior); + return new SchemaCoordinatorProvider( + operatorID, + operatorName, + metadataApplier, + routingRules, + schemaChangeBehavior, + rpcTimeout); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeRequest.java similarity index 79% rename from flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java rename to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeRequest.java index fbc5e9c40..b2b3336b0 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeRequest.java @@ -15,18 +15,19 @@ * limitations under the License. */ -package org.apache.flink.cdc.runtime.operators.schema.event; +package org.apache.flink.cdc.runtime.operators.schema.regular.event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; +import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator; +import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import java.util.Objects; /** - * The request from {@link SchemaOperator} to {@link SchemaRegistry} to request to change schemas. + * The request from {@link SchemaOperator} to {@link SchemaCoordinator} to request to change + * schemas. */ public class SchemaChangeRequest implements CoordinationRequest { @@ -34,8 +35,10 @@ public class SchemaChangeRequest implements CoordinationRequest { /** The sender of the request. */ private final TableId tableId; + /** The schema changes. */ private final SchemaChangeEvent schemaChangeEvent; + /** The ID of subTask that initiated the request. */ private final int subTaskId; @@ -76,4 +79,16 @@ public class SchemaChangeRequest implements CoordinationRequest { public int hashCode() { return Objects.hash(tableId, schemaChangeEvent, subTaskId); } + + @Override + public String toString() { + return "SchemaChangeRequest{" + + "tableId=" + + tableId + + ", schemaChangeEvent=" + + schemaChangeEvent + + ", subTaskId=" + + subTaskId + + '}'; + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java similarity index 58% rename from flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java rename to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java index 63d57139b..282557552 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java @@ -15,56 +15,73 @@ * limitations under the License. */ -package org.apache.flink.cdc.runtime.operators.schema.event; +package org.apache.flink.cdc.runtime.operators.schema.regular.event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator; +import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; /** - * The response for {@link SchemaChangeRequest} from {@link SchemaRegistry} to {@link + * The response for {@link SchemaChangeRequest} from {@link SchemaCoordinator} to {@link * SchemaOperator}. */ public class SchemaChangeResponse implements CoordinationResponse { private static final long serialVersionUID = 1L; /** - * Whether the SchemaOperator need to buffer data and the SchemaOperatorCoordinator need to wait - * for flushing. + * Actually finished schema change events. This will only be effective if status is {@code + * accepted}. */ - private final List<SchemaChangeEvent> schemaChangeEvents; + private final List<SchemaChangeEvent> appliedSchemaChangeEvents; + + private final Map<TableId, Schema> evolvedSchemas; private final ResponseCode responseCode; - public static SchemaChangeResponse accepted(List<SchemaChangeEvent> schemaChangeEvents) { - return new SchemaChangeResponse(schemaChangeEvents, ResponseCode.ACCEPTED); + public static SchemaChangeResponse success( + List<SchemaChangeEvent> schemaChangeEvents, Map<TableId, Schema> evolvedSchemas) { + return new SchemaChangeResponse(ResponseCode.SUCCESS, schemaChangeEvents, evolvedSchemas); } public static SchemaChangeResponse busy() { - return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.BUSY); + return new SchemaChangeResponse(ResponseCode.BUSY); } public static SchemaChangeResponse duplicate() { - return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.DUPLICATE); + return new SchemaChangeResponse(ResponseCode.DUPLICATE); } public static SchemaChangeResponse ignored() { - return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.IGNORED); + return new SchemaChangeResponse(ResponseCode.IGNORED); + } + + public static SchemaChangeResponse waitingForFlush() { + return new SchemaChangeResponse(ResponseCode.WAITING_FOR_FLUSH); + } + + private SchemaChangeResponse(ResponseCode responseCode) { + this(responseCode, Collections.emptyList(), Collections.emptyMap()); } private SchemaChangeResponse( - List<SchemaChangeEvent> schemaChangeEvents, ResponseCode responseCode) { - this.schemaChangeEvents = schemaChangeEvents; + ResponseCode responseCode, + List<SchemaChangeEvent> appliedSchemaChangeEvents, + Map<TableId, Schema> evolvedSchemas) { this.responseCode = responseCode; + this.appliedSchemaChangeEvents = appliedSchemaChangeEvents; + this.evolvedSchemas = evolvedSchemas; } - public boolean isAccepted() { - return ResponseCode.ACCEPTED.equals(responseCode); + public boolean isSuccess() { + return ResponseCode.SUCCESS.equals(responseCode); } public boolean isRegistryBusy() { @@ -79,8 +96,16 @@ public class SchemaChangeResponse implements CoordinationResponse { return ResponseCode.IGNORED.equals(responseCode); } - public List<SchemaChangeEvent> getSchemaChangeEvents() { - return schemaChangeEvents; + public boolean isWaitingForFlush() { + return ResponseCode.WAITING_FOR_FLUSH.equals(responseCode); + } + + public List<SchemaChangeEvent> getAppliedSchemaChangeEvents() { + return appliedSchemaChangeEvents; + } + + public Map<TableId, Schema> getEvolvedSchemas() { + return evolvedSchemas; } @Override @@ -92,20 +117,20 @@ public class SchemaChangeResponse implements CoordinationResponse { return false; } SchemaChangeResponse response = (SchemaChangeResponse) o; - return Objects.equals(schemaChangeEvents, response.schemaChangeEvents) + return Objects.equals(appliedSchemaChangeEvents, response.appliedSchemaChangeEvents) && responseCode == response.responseCode; } @Override public int hashCode() { - return Objects.hash(schemaChangeEvents, responseCode); + return Objects.hash(appliedSchemaChangeEvents, responseCode); } @Override public String toString() { return "SchemaChangeResponse{" + "schemaChangeEvents=" - + schemaChangeEvents + + appliedSchemaChangeEvents + ", responseCode=" + responseCode + '}'; @@ -126,9 +151,10 @@ public class SchemaChangeResponse implements CoordinationResponse { * required. Possibly caused by LENIENT mode or merging table strategies. */ public enum ResponseCode { - ACCEPTED, + SUCCESS, BUSY, DUPLICATE, - IGNORED + IGNORED, + WAITING_FOR_FLUSH } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java similarity index 95% rename from flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java rename to flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java index e1f6a94c9..53895d56c 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.cdc.runtime.operators.schema; +package org.apache.flink.cdc.runtime.operators.schema.regular; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; @@ -30,6 +30,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Column; @@ -37,9 +38,10 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.RowType; -import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness; +import org.apache.flink.cdc.runtime.testutils.operators.RegularEventOperatorTestHarness; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; @@ -53,6 +55,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; /** Unit tests for the {@link SchemaOperator} to handle evolved schema. */ @@ -85,8 +88,9 @@ public class SchemaEvolveTest { SchemaOperator schemaOperator = new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); - EventOperatorTestHarness<SchemaOperator, Event> harness = - new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); + RegularEventOperatorTestHarness<SchemaOperator, Event> harness = + RegularEventOperatorTestHarness.withDurationAndBehavior( + schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); // Test CreateTableEvent @@ -108,7 +112,7 @@ public class SchemaEvolveTest { Assertions.assertThat( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), createAndInsertDataEvents)) .isEqualTo( harness.getOutputRecords().stream() @@ -168,8 +172,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), - addColumnEvents)); + Collections.singletonList(new FlushEvent(0)), addColumnEvents)); Schema schemaV2 = Schema.newBuilder() @@ -227,7 +230,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), renameColumnEvents)); Schema schemaV3 = @@ -272,7 +275,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), alterColumnTypeEvents)); Schema schemaV4 = @@ -308,7 +311,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), dropColumnEvents)); Schema schemaV5 = @@ -342,8 +345,9 @@ public class SchemaEvolveTest { SchemaOperator schemaOperator = new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); - EventOperatorTestHarness<SchemaOperator, Event> harness = - new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); + RegularEventOperatorTestHarness<SchemaOperator, Event> harness = + RegularEventOperatorTestHarness.withDurationAndBehavior( + schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); // Test CreateTableEvent @@ -369,7 +373,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -425,8 +429,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), - addColumnEvents)); + Collections.singletonList(new FlushEvent(0)), addColumnEvents)); Schema schemaV2 = Schema.newBuilder() @@ -484,7 +487,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), renameColumnEvents)); Schema schemaV3 = @@ -529,7 +532,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), alterColumnTypeEvents)); Schema schemaV4 = @@ -565,7 +568,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), dropColumnEvents)); Schema schemaV5 = @@ -599,8 +602,9 @@ public class SchemaEvolveTest { SchemaOperator schemaOperator = new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); - EventOperatorTestHarness<SchemaOperator, Event> harness = - new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); + RegularEventOperatorTestHarness<SchemaOperator, Event> harness = + RegularEventOperatorTestHarness.withDurationAndBehavior( + schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); // Test CreateTableEvent @@ -626,7 +630,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -648,45 +652,25 @@ public class SchemaEvolveTest { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn( "height", DOUBLE, "Height data"))))); - Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents)); + Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents)) + .isExactlyInstanceOf(IllegalStateException.class) + .cause() + .isExactlyInstanceOf(ExecutionException.class) + .cause() + .isExactlyInstanceOf(FlinkRuntimeException.class) + .hasMessage("Failed to apply schema change event.") + .cause() + .isExactlyInstanceOf(SchemaEvolveException.class) + .extracting("applyingEvent", "exceptionMessage") + .containsExactly( + addColumnEvents.get(0), + "Unexpected schema change events occurred in EXCEPTION mode. Job will fail now."); // No schema change events should be sent to downstream - Assertions.assertThat(harness.getOutputRecords()).isEmpty(); - } - - // Test RenameColumnEvent (expected to fail) - { - List<Event> addColumnEvents = - Collections.singletonList( - new RenameColumnEvent( - tableId, ImmutableMap.of("name", "namae", "age", "toshi"))); - Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents)); - - // No schema change events should be sent to downstream - Assertions.assertThat(harness.getOutputRecords()).isEmpty(); - } - - // Test AlterColumnTypeEvent (expected to fail) - { - List<Event> addColumnEvents = - Collections.singletonList( - new AlterColumnTypeEvent( - tableId, ImmutableMap.of("score", BIGINT, "toshi", FLOAT))); - Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents)); - - // No schema change events should be sent to downstream - Assertions.assertThat(harness.getOutputRecords()).isEmpty(); - } - - // Test DropColumnEvent (expected to fail) - { - List<Event> addColumnEvents = - Collections.singletonList( - new DropColumnEvent(tableId, Arrays.asList("score", "height"))); - Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents)); - - // No schema change events should be sent to downstream - Assertions.assertThat(harness.getOutputRecords()).isEmpty(); + Assertions.assertThat(harness.getOutputRecords()) + .map(StreamRecord::getValue) + .map(e -> e.getClass().getName()) + .containsExactly("org.apache.flink.cdc.common.event.FlushEvent"); } harness.close(); @@ -708,8 +692,9 @@ public class SchemaEvolveTest { SchemaOperator schemaOperator = new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); - EventOperatorTestHarness<SchemaOperator, Event> harness = - new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); + RegularEventOperatorTestHarness<SchemaOperator, Event> harness = + RegularEventOperatorTestHarness.withDurationAndBehavior( + schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); // Test CreateTableEvent @@ -735,7 +720,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -787,7 +772,7 @@ public class SchemaEvolveTest { List<Event> expectedEvents = Arrays.asList( - new FlushEvent(tableId), + new FlushEvent(0), DataChangeEvent.insertEvent( tableId, buildRecord(INT, 4, STRING, "Derrida", SMALLINT, (short) 20)), @@ -855,7 +840,7 @@ public class SchemaEvolveTest { List<Event> expectedEvents = Arrays.asList( - new FlushEvent(tableId), + new FlushEvent(0), DataChangeEvent.insertEvent( tableId, buildRecord(INT, 6, STRING, null, SMALLINT, (short) 22)), @@ -904,7 +889,7 @@ public class SchemaEvolveTest { List<Event> expectedEvents = Arrays.asList( - new FlushEvent(tableId), + new FlushEvent(0), DataChangeEvent.insertEvent( tableId, buildRecord(INT, 8, STRING, null, SMALLINT, null)), DataChangeEvent.insertEvent( @@ -945,7 +930,7 @@ public class SchemaEvolveTest { List<Event> expectedEvents = Arrays.asList( - new FlushEvent(tableId), + new FlushEvent(0), DataChangeEvent.insertEvent( tableId, buildRecord(INT, 12, STRING, null, DOUBLE, null)), DataChangeEvent.insertEvent( @@ -987,8 +972,8 @@ public class SchemaEvolveTest { SchemaOperator schemaOperator = new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); - EventOperatorTestHarness<SchemaOperator, Event> harness = - new EventOperatorTestHarness<>( + RegularEventOperatorTestHarness<SchemaOperator, Event> harness = + RegularEventOperatorTestHarness.withDurationAndFineGrainedBehaviorWithError( schemaOperator, 17, Duration.ofSeconds(3), @@ -1021,7 +1006,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -1040,7 +1025,19 @@ public class SchemaEvolveTest { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn( "height", DOUBLE, "Height data"))))); - processEvent(schemaOperator, addColumnEvents); + Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents)) + .isExactlyInstanceOf(IllegalStateException.class) + .cause() + .isExactlyInstanceOf(ExecutionException.class) + .cause() + .isExactlyInstanceOf(FlinkRuntimeException.class) + .hasMessage("Failed to apply schema change event.") + .cause() + .isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class) + .extracting("applyingEvent", "exceptionMessage") + .containsExactly( + addColumnEvents.get(0), "Sink doesn't support such schema change event."); + Assertions.assertThat(harness.isJobFailed()).isEqualTo(true); Assertions.assertThat(harness.getJobFailureCause()) .cause() @@ -1072,8 +1069,8 @@ public class SchemaEvolveTest { // All types of schema change events will be sent to the sink // AddColumn and RenameColumn events will always fail - EventOperatorTestHarness<SchemaOperator, Event> harness = - new EventOperatorTestHarness<>( + RegularEventOperatorTestHarness<SchemaOperator, Event> harness = + RegularEventOperatorTestHarness.withDurationAndFineGrainedBehaviorWithError( schemaOperator, 17, Duration.ofSeconds(3), @@ -1108,7 +1105,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -1159,7 +1156,7 @@ public class SchemaEvolveTest { processEvent(schemaOperator, addColumnEvents); List<Event> expectedEvents = new ArrayList<>(); - expectedEvents.add(new FlushEvent(tableId)); + expectedEvents.add(new FlushEvent(0)); expectedEvents.addAll(addColumnEvents); Assertions.assertThat( @@ -1221,7 +1218,7 @@ public class SchemaEvolveTest { processEvent(schemaOperator, renameColumnEvents); List<Event> expectedEvents = new ArrayList<>(); - expectedEvents.add(new FlushEvent(tableId)); + expectedEvents.add(new FlushEvent(0)); expectedEvents.addAll(renameColumnEvents); Assertions.assertThat( @@ -1266,7 +1263,7 @@ public class SchemaEvolveTest { List<Event> expectedEvents = Arrays.asList( - new FlushEvent(tableId), + new FlushEvent(0), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1324,7 +1321,7 @@ public class SchemaEvolveTest { List<Event> expectedEvents = Arrays.asList( - new FlushEvent(tableId), + new FlushEvent(0), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1384,8 +1381,8 @@ public class SchemaEvolveTest { // All types of schema change events will be sent to the sink // AddColumn and RenameColumn events will always fail - EventOperatorTestHarness<SchemaOperator, Event> harness = - new EventOperatorTestHarness<>( + RegularEventOperatorTestHarness<SchemaOperator, Event> harness = + RegularEventOperatorTestHarness.withDurationAndFineGrainedBehavior( schemaOperator, 17, Duration.ofSeconds(3), @@ -1420,7 +1417,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -1471,7 +1468,8 @@ public class SchemaEvolveTest { processEvent(schemaOperator, addColumnEvents); List<Event> expectedEvents = new ArrayList<>(); - expectedEvents.add(new FlushEvent(tableId)); + expectedEvents.add(new FlushEvent(0)); + expectedEvents.addAll(addColumnEvents); Assertions.assertThat( @@ -1533,7 +1531,7 @@ public class SchemaEvolveTest { processEvent(schemaOperator, renameColumnEvents); List<Event> expectedEvents = new ArrayList<>(); - expectedEvents.add(new FlushEvent(tableId)); + expectedEvents.add(new FlushEvent(0)); expectedEvents.addAll(renameColumnEvents); Assertions.assertThat( @@ -1578,7 +1576,7 @@ public class SchemaEvolveTest { List<Event> expectedEvents = Arrays.asList( - new FlushEvent(tableId), + new FlushEvent(0), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1634,9 +1632,11 @@ public class SchemaEvolveTest { processEvent(schemaOperator, dropColumnEvents); + FlushEvent result; + result = new FlushEvent(0); List<Event> expectedEvents = Arrays.asList( - new FlushEvent(tableId), + result, DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1693,8 +1693,9 @@ public class SchemaEvolveTest { SchemaOperator schemaOperator = new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); - EventOperatorTestHarness<SchemaOperator, Event> harness = - new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); + RegularEventOperatorTestHarness<SchemaOperator, Event> harness = + RegularEventOperatorTestHarness.withDurationAndBehavior( + schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); // Test CreateTableEvent @@ -1714,14 +1715,17 @@ public class SchemaEvolveTest { processEvent(schemaOperator, createAndInsertDataEvents); + FlushEvent result; + TableId tableId1 = tableId; + Event schemaChangeEvent = createAndInsertDataEvents.get(0); + result = new FlushEvent(0); Assertions.assertThat( harness.getOutputRecords().stream() .map(StreamRecord::getValue) .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), - createAndInsertDataEvents)); + Collections.singletonList(result), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV1); @@ -1776,8 +1780,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), - addColumnEvents)); + Collections.singletonList(new FlushEvent(0)), addColumnEvents)); Schema schemaV2 = Schema.newBuilder() @@ -1878,14 +1881,18 @@ public class SchemaEvolveTest { SMALLINT, (short) 23))); + FlushEvent result; + int subTaskId = 0; + TableId tableId1 = tableId; + Event schemaChangeEvent = renameColumnEvents.get(0); + result = new FlushEvent(subTaskId); Assertions.assertThat( harness.getOutputRecords().stream() .map(StreamRecord::getValue) .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), - lenientRenameColumnEvents)); + Collections.singletonList(result), lenientRenameColumnEvents)); Schema schemaV3 = Schema.newBuilder() @@ -1959,7 +1966,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), lenientAlterColumnTypeEvents)); Schema schemaV4 = @@ -2018,7 +2025,10 @@ public class SchemaEvolveTest { harness.getOutputRecords().stream() .map(StreamRecord::getValue) .collect(Collectors.toList())) - .isEqualTo(lenientDropColumnEvents); + .isEqualTo( + ListUtils.union( + Collections.singletonList(new FlushEvent(0)), + lenientDropColumnEvents)); Schema schemaV5 = Schema.newBuilder() @@ -2063,8 +2073,9 @@ public class SchemaEvolveTest { SchemaOperator schemaOperator = new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); - EventOperatorTestHarness<SchemaOperator, Event> harness = - new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); + RegularEventOperatorTestHarness<SchemaOperator, Event> harness = + RegularEventOperatorTestHarness.withDurationAndBehavior( + schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); // Test CreateTableEvent @@ -2094,14 +2105,17 @@ public class SchemaEvolveTest { processEvent(schemaOperator, createAndInsertDataEvents); + FlushEvent result; + TableId tableId1 = tableId; + Event schemaChangeEvent = createAndInsertDataEvents.get(0); + result = new FlushEvent(0); Assertions.assertThat( harness.getOutputRecords().stream() .map(StreamRecord::getValue) .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), - createAndInsertDataEvents)); + Collections.singletonList(result), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV1); @@ -2142,7 +2156,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), lenientDropColumnEvents)); Schema schemaV2 = @@ -2257,7 +2271,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), lenientAddColumnEvents)); Schema schemaV3 = @@ -2375,7 +2389,7 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList(new FlushEvent(0)), lenientRenameColumnEvents)); Schema schemaV4 = diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java similarity index 73% rename from flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java rename to flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java index eb45a2531..9feaf29d3 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.cdc.runtime.operators.schema; +package org.apache.flink.cdc.runtime.operators.schema.regular; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.CreateTableEvent; @@ -26,13 +26,13 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; -import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness; +import org.apache.flink.cdc.runtime.testutils.operators.RegularEventOperatorTestHarness; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.time.Duration; @@ -46,6 +46,8 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; /** Unit tests for the {@link SchemaOperator}. */ public class SchemaOperatorTest { @@ -67,14 +69,21 @@ public class SchemaOperatorTest { final OperatorID opID = new OperatorID(); final TableId tableId = TableId.tableId("testProcessElement"); final RowType rowType = DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING()); + final Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("name", DataTypes.STRING()) + .build(); List<OneInputStreamOperatorTestHarness<Event, Event>> testHarnesses = new ArrayList<>(); for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) { + SchemaOperator operator = new SchemaOperator(new ArrayList<>()); OneInputStreamOperatorTestHarness<Event, Event> testHarness = - createTestHarness(maxParallelism, parallelism, subtaskIndex, opID); + createTestHarness(maxParallelism, parallelism, subtaskIndex, opID, operator); testHarnesses.add(testHarness); testHarness.setup(EventSerializer.INSTANCE); testHarness.open(); + operator.registerInitialSchema(tableId, schema); Map<String, String> meta = new HashMap<>(); meta.put("subtask", String.valueOf(subtaskIndex)); @@ -114,15 +123,18 @@ public class SchemaOperatorTest { void testProcessSchemaChangeEventWithTimeOut() throws Exception { SchemaOperator schemaOperator = new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(1)); - EventOperatorTestHarness<SchemaOperator, Event> harness = - new EventOperatorTestHarness<>(schemaOperator, 1, Duration.ofSeconds(3)); + RegularEventOperatorTestHarness<SchemaOperator, Event> harness = + RegularEventOperatorTestHarness.withDuration( + schemaOperator, 1, Duration.ofSeconds(3)); harness.open(); - Assertions.assertThrowsExactly( - TimeoutException.class, - () -> - schemaOperator.processElement( - new StreamRecord<>( - new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA)))); + assertThatThrownBy( + () -> + schemaOperator.processElement( + new StreamRecord<>( + new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA)))) + .isExactlyInstanceOf(IllegalStateException.class) + .cause() + .isExactlyInstanceOf(TimeoutException.class); harness.close(); } @@ -130,22 +142,28 @@ public class SchemaOperatorTest { void testProcessSchemaChangeEventWithOutTimeOut() throws Exception { SchemaOperator schemaOperator = new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30)); - EventOperatorTestHarness<SchemaOperator, Event> harness = - new EventOperatorTestHarness<>(schemaOperator, 1, Duration.ofSeconds(3)); + RegularEventOperatorTestHarness<SchemaOperator, Event> harness = + RegularEventOperatorTestHarness.withDuration( + schemaOperator, 1, Duration.ofSeconds(3)); harness.open(); - Assertions.assertDoesNotThrow( - () -> - schemaOperator.processElement( - new StreamRecord<>( - new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA)))); + assertThatCode( + () -> + schemaOperator.processElement( + new StreamRecord<>( + new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA)))) + .doesNotThrowAnyException(); harness.close(); } private OneInputStreamOperatorTestHarness<Event, Event> createTestHarness( - int maxParallelism, int parallelism, int subtaskIndex, OperatorID opID) + int maxParallelism, + int parallelism, + int subtaskIndex, + OperatorID opID, + OneInputStreamOperator<Event, Event> operator) throws Exception { return new OneInputStreamOperatorTestHarness<>( - new SchemaOperator(new ArrayList<>()), + operator, maxParallelism, parallelism, subtaskIndex,