This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new c1e1aa48d [FLINK-34648][cdc] Avoid RPC timeout when applying SchemaChangeEvent to downstream external systems c1e1aa48d is described below commit c1e1aa48d93933c11ce9629d6f98041338f4d7a0 Author: Kunni <lvyanquan....@alibaba-inc.com> AuthorDate: Wed Apr 24 15:57:03 2024 +0800 [FLINK-34648][cdc] Avoid RPC timeout when applying SchemaChangeEvent to downstream external systems This closes #3128. --- .../parser/YamlPipelineDefinitionParserTest.java | 25 +++++++ .../definitions/pipeline-definition-full.yaml | 1 + .../flink/cdc/common/pipeline/PipelineOptions.java | 11 +++ .../cdc/composer/flink/FlinkPipelineComposer.java | 5 +- .../flink/translator/SchemaOperatorTranslator.java | 10 ++- .../runtime/operators/schema/SchemaOperator.java | 48 ++++++++++-- .../operators/schema/SchemaOperatorFactory.java | 7 +- .../schema/coordinator/SchemaRegistry.java | 7 ++ .../coordinator/SchemaRegistryRequestHandler.java | 86 ++++++++++++++++++---- .../schema/event/RefreshPendingListsRequest.java} | 26 ++----- .../schema/event/RefreshPendingListsResponse.java} | 25 ++----- .../event/SchemaChangeProcessingResponse.java} | 25 ++----- .../schema/event/SchemaChangeResultRequest.java} | 26 ++----- .../operators/schema/SchemaOperatorTest.java | 46 ++++++++++++ .../operators/EventOperatorTestHarness.java | 49 ++++++++++-- .../schema/CollectingMetadataApplier.java | 14 ++++ 16 files changed, 308 insertions(+), 103 deletions(-) diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index e29ea332a..863acbdd3 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.cli.parser; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.composer.definition.PipelineDef; import org.apache.flink.cdc.composer.definition.RouteDef; import org.apache.flink.cdc.composer.definition.SinkDef; @@ -31,6 +32,7 @@ import org.junit.jupiter.api.Test; import java.net.URL; import java.nio.file.Paths; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -89,6 +91,27 @@ class YamlPipelineDefinitionParserTest { .isNotEqualTo(PIPELINE_LOCAL_TIME_ZONE.defaultValue()); } + @Test + void testEvaluateDefaultRpcTimeOut() throws Exception { + URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml"); + YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = + parser.parse( + Paths.get(resource.toURI()), + Configuration.fromMap( + ImmutableMap.<String, String>builder() + .put( + PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT + .key(), + "1h") + .build())); + assertThat( + pipelineDef + .getConfig() + .get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT)) + .isEqualTo(Duration.ofSeconds(60 * 60)); + } + @Test void testValidTimeZone() throws Exception { URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml"); @@ -202,6 +225,7 @@ class YamlPipelineDefinitionParserTest { .put("name", "source-database-sync-pipe") .put("parallelism", "4") .put("enable-schema-evolution", "false") + .put("schema-operator.rpc-timeout", "1 h") .build())); private final PipelineDef fullDefWithGlobalConf = @@ -262,6 +286,7 @@ class YamlPipelineDefinitionParserTest { .put("name", "source-database-sync-pipe") .put("parallelism", "4") .put("enable-schema-evolution", "false") + .put("schema-operator.rpc-timeout", "1 h") .put("foo", "bar") .build())); diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml index 5f0f6f8f9..e06ad904b 100644 --- a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml @@ -56,3 +56,4 @@ pipeline: name: source-database-sync-pipe parallelism: 4 enable-schema-evolution: false + schema-operator.rpc-timeout: 1 h diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java index 9ffabbfe2..ed5f6e9c0 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java @@ -23,12 +23,16 @@ import org.apache.flink.cdc.common.configuration.ConfigOptions; import org.apache.flink.cdc.common.configuration.description.Description; import org.apache.flink.cdc.common.configuration.description.ListElement; +import java.time.Duration; + import static org.apache.flink.cdc.common.configuration.description.TextElement.text; /** Predefined pipeline configuration options. */ @PublicEvolving public class PipelineOptions { + public static final Duration DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT = Duration.ofMinutes(3); + public static final ConfigOption<String> PIPELINE_NAME = ConfigOptions.key("name") .stringType() @@ -85,5 +89,12 @@ public class PipelineOptions { .withDescription( "The unique ID for schema operator. This ID will be used for inter-operator communications and must be unique across operators."); + public static final ConfigOption<Duration> PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT = + ConfigOptions.key("schema-operator.rpc-timeout") + .durationType() + .defaultValue(DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT) + .withDescription( + "The timeout time for SchemaOperator to wait downstream SchemaChangeEvent applying finished, the default value is 3 minutes."); + private PipelineOptions() {} } diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index 30ce1ff5e..73910d59b 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -110,7 +110,10 @@ public class FlinkPipelineComposer implements PipelineComposer { pipelineDef .getConfig() .get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR), - pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID)); + pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID), + pipelineDef + .getConfig() + .get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT)); OperatorIDGenerator schemaOperatorIDGenerator = new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java index 9513c0349..a69741c7b 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java @@ -31,6 +31,7 @@ import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -40,10 +41,15 @@ public class SchemaOperatorTranslator { private final SchemaChangeBehavior schemaChangeBehavior; private final String schemaOperatorUid; + private final Duration rpcTimeOut; + public SchemaOperatorTranslator( - SchemaChangeBehavior schemaChangeBehavior, String schemaOperatorUid) { + SchemaChangeBehavior schemaChangeBehavior, + String schemaOperatorUid, + Duration rpcTimeOut) { this.schemaChangeBehavior = schemaChangeBehavior; this.schemaOperatorUid = schemaOperatorUid; + this.rpcTimeOut = rpcTimeOut; } public DataStream<Event> translate( @@ -83,7 +89,7 @@ public class SchemaOperatorTranslator { input.transform( "SchemaOperator", new EventTypeInfo(), - new SchemaOperatorFactory(metadataApplier, routingRules)); + new SchemaOperatorFactory(metadataApplier, routingRules, rpcTimeOut)); stream.uid(schemaOperatorUid).setParallelism(parallelism); return stream; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index 3648a1e7a..d31fb026d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -35,15 +35,18 @@ import org.apache.flink.cdc.common.types.DataTypeRoot; import org.apache.flink.cdc.common.utils.ChangeEventUtils; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils; +import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest; import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse; +import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse; +import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest; import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -67,8 +70,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; + /** * The operator will evolve schemas in {@link SchemaRegistry} for incoming {@link * SchemaChangeEvent}s and block the stream for tables before their schema changes finish. @@ -89,9 +95,18 @@ public class SchemaOperator extends AbstractStreamOperator<Event> private transient SchemaEvolutionClient schemaEvolutionClient; private transient LoadingCache<TableId, Schema> cachedSchemas; + private final long rpcTimeOutInMillis; + public SchemaOperator(List<Tuple2<String, TableId>> routingRules) { this.routingRules = routingRules; this.chainingStrategy = ChainingStrategy.ALWAYS; + this.rpcTimeOutInMillis = DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis(); + } + + public SchemaOperator(List<Tuple2<String, TableId>> routingRules, Duration rpcTimeOut) { + this.routingRules = routingRules; + this.chainingStrategy = ChainingStrategy.ALWAYS; + this.rpcTimeOutInMillis = rpcTimeOut.toMillis(); } @Override @@ -127,11 +142,23 @@ public class SchemaOperator extends AbstractStreamOperator<Event> }); } + @Override + public void initializeState(StateInitializationContext context) throws Exception { + if (context.isRestored()) { + // Multiple operators may appear during a restart process, + // only clear the pendingSchemaChanges when the first operator starts. + if (getRuntimeContext().getIndexOfThisSubtask() == 0) { + sendRequestToCoordinator(new RefreshPendingListsRequest()); + } + } + } + /** * This method is guaranteed to not be called concurrently with other methods of the operator. */ @Override - public void processElement(StreamRecord<Event> streamRecord) { + public void processElement(StreamRecord<Event> streamRecord) + throws InterruptedException, TimeoutException { Event event = streamRecord.getValue(); // Schema changes if (event instanceof SchemaChangeEvent) { @@ -247,7 +274,8 @@ public class SchemaOperator extends AbstractStreamOperator<Event> return Optional.empty(); } - private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) { + private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) + throws InterruptedException, TimeoutException { // The request will need to send a FlushEvent or block until flushing finished SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent); if (!response.getSchemaChangeEvents().isEmpty()) { @@ -267,8 +295,18 @@ public class SchemaOperator extends AbstractStreamOperator<Event> return sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent)); } - private ReleaseUpstreamResponse requestReleaseUpstream() { - return sendRequestToCoordinator(new ReleaseUpstreamRequest()); + private void requestReleaseUpstream() throws InterruptedException, TimeoutException { + CoordinationResponse coordinationResponse = + sendRequestToCoordinator(new ReleaseUpstreamRequest()); + long nextRpcTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis; + while (coordinationResponse instanceof SchemaChangeProcessingResponse) { + if (System.currentTimeMillis() < nextRpcTimeOutMillis) { + Thread.sleep(1000); + coordinationResponse = sendRequestToCoordinator(new SchemaChangeResultRequest()); + } else { + throw new TimeoutException("TimeOut when requesting release upstream"); + } + } } private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java index 7ee348e79..ecf500171 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import java.time.Duration; import java.util.List; /** Factory to create {@link SchemaOperator}. */ @@ -42,8 +43,10 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory<Event> private final List<Tuple2<String, TableId>> routingRules; public SchemaOperatorFactory( - MetadataApplier metadataApplier, List<Tuple2<String, TableId>> routingRules) { - super(new SchemaOperator(routingRules)); + MetadataApplier metadataApplier, + List<Tuple2<String, TableId>> routingRules, + Duration rpcTimeOut) { + super(new SchemaOperator(routingRules, rpcTimeOut)); this.metadataApplier = metadataApplier; this.routingRules = routingRules; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java index 82ae21b86..3361c9454 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java @@ -25,8 +25,10 @@ import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent; import org.apache.flink.cdc.runtime.operators.schema.event.GetSchemaRequest; import org.apache.flink.cdc.runtime.operators.schema.event.GetSchemaResponse; +import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest; import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest; +import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest; import org.apache.flink.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler; @@ -122,6 +124,7 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH @Override public void close() throws Exception { LOG.info("SchemaRegistry for {} closed.", operatorName); + requestHandler.close(); } @Override @@ -175,6 +178,10 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH } else if (request instanceof GetSchemaRequest) { return CompletableFuture.completedFuture( wrap(handleGetSchemaRequest(((GetSchemaRequest) request)))); + } else if (request instanceof SchemaChangeResultRequest) { + return requestHandler.getSchemaChangeResult(); + } else if (request instanceof RefreshPendingListsRequest) { + return requestHandler.refreshPendingLists(); } else { throw new IllegalArgumentException("Unrecognized CoordinationRequest type: " + request); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index f9dfdb781..1ee06a7e5 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -22,8 +22,10 @@ import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsResponse; import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse; +import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; @@ -33,12 +35,16 @@ import org.slf4j.LoggerFactory; import javax.annotation.concurrent.NotThreadSafe; +import java.io.Closeable; +import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.RequestStatus.RECEIVED_RELEASE_REQUEST; import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap; @@ -46,7 +52,7 @@ import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationRe /** A handler to deal with all requests and events for {@link SchemaRegistry}. */ @Internal @NotThreadSafe -public class SchemaRegistryRequestHandler { +public class SchemaRegistryRequestHandler implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryRequestHandler.class); /** The {@link MetadataApplier} for every table. */ @@ -66,6 +72,13 @@ public class SchemaRegistryRequestHandler { /** Sink writers which have sent flush success events for the request. */ private final Set<Integer> flushedSinkWriters; + /** Status of the execution of current schema change request. */ + private boolean isSchemaChangeApplying; + /** Actual exception if failed to apply schema change. */ + private Exception schemaChangeException; + /** Executor service to execute schema change. */ + private final ExecutorService schemaChangeThreadPool; + public SchemaRegistryRequestHandler( MetadataApplier metadataApplier, SchemaManager schemaManager, @@ -76,17 +89,34 @@ public class SchemaRegistryRequestHandler { this.pendingSchemaChanges = new LinkedList<>(); this.schemaManager = schemaManager; this.schemaDerivation = schemaDerivation; + schemaChangeThreadPool = Executors.newSingleThreadExecutor(); + isSchemaChangeApplying = false; } /** * Apply the schema change to the external system. * * @param tableId the table need to change schema - * @param changeEvent the schema change + * @param derivedSchemaChangeEvents list of the schema changes */ - private void applySchemaChange(TableId tableId, SchemaChangeEvent changeEvent) { - LOG.debug("Apply schema change {} to table {}.", changeEvent, tableId); - metadataApplier.applySchemaChange(changeEvent); + private void applySchemaChange( + TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) { + isSchemaChangeApplying = true; + schemaChangeException = null; + try { + for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) { + metadataApplier.applySchemaChange(changeEvent); + LOG.debug("Apply schema change {} to table {}.", changeEvent, tableId); + } + PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0); + if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) { + startNextSchemaChangeRequest(); + } + } catch (Exception e) { + this.schemaChangeException = e; + } finally { + this.isSchemaChangeApplying = false; + } } /** @@ -131,7 +161,7 @@ public class SchemaRegistryRequestHandler { public CompletableFuture<CoordinationResponse> handleReleaseUpstreamRequest() { CompletableFuture<CoordinationResponse> response = pendingSchemaChanges.get(0).getResponseFuture(); - if (response.isDone()) { + if (response.isDone() && !isSchemaChangeApplying) { startNextSchemaChangeRequest(); } else { pendingSchemaChanges.get(0).receiveReleaseRequest(); @@ -155,19 +185,25 @@ public class SchemaRegistryRequestHandler { * @param tableId the subtask in SchemaOperator and table that the FlushEvent is about * @param sinkSubtask the sink subtask succeed flushing */ - public void flushSuccess(TableId tableId, int sinkSubtask) { + public void flushSuccess(TableId tableId, int sinkSubtask) throws InterruptedException { flushedSinkWriters.add(sinkSubtask); if (flushedSinkWriters.equals(activeSinkWriters)) { LOG.info( "All sink subtask have flushed for table {}. Start to apply schema change.", tableId.toString()); PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0); - waitFlushSuccess.derivedSchemaChangeEvents.forEach( - schemaChangeEvent -> applySchemaChange(tableId, schemaChangeEvent)); - waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse())); - - if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) { - startNextSchemaChangeRequest(); + schemaChangeThreadPool.submit( + () -> applySchemaChange(tableId, waitFlushSuccess.derivedSchemaChangeEvents)); + Thread.sleep(1000); + if (schemaChangeException != null) { + throw new RuntimeException("failed to apply schema change.", schemaChangeException); + } + if (isSchemaChangeApplying) { + waitFlushSuccess + .getResponseFuture() + .complete(wrap(new SchemaChangeProcessingResponse())); + } else { + waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse())); } } } @@ -200,6 +236,30 @@ public class SchemaRegistryRequestHandler { } } + public CompletableFuture<CoordinationResponse> refreshPendingLists() { + pendingSchemaChanges.clear(); + flushedSinkWriters.clear(); + return CompletableFuture.completedFuture(wrap(new RefreshPendingListsResponse())); + } + + public CompletableFuture<CoordinationResponse> getSchemaChangeResult() { + if (schemaChangeException != null) { + throw new RuntimeException("failed to apply schema change.", schemaChangeException); + } + if (isSchemaChangeApplying) { + return CompletableFuture.supplyAsync(() -> wrap(new SchemaChangeProcessingResponse())); + } else { + return CompletableFuture.supplyAsync(() -> wrap(new ReleaseUpstreamResponse())); + } + } + + @Override + public void close() throws IOException { + if (schemaChangeThreadPool != null) { + schemaChangeThreadPool.shutdown(); + } + } + private static class PendingSchemaChange { private final SchemaChangeRequest changeRequest; private List<SchemaChangeEvent> derivedSchemaChangeEvents; diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java similarity index 52% copy from flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java copy to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java index 7b403e130..a0496c935 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java @@ -15,27 +15,13 @@ * limitations under the License. */ -package org.apache.flink.cdc.runtime.testutils.schema; +package org.apache.flink.cdc.runtime.operators.schema.event; -import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; -import java.util.ArrayList; -import java.util.List; +/** Request to refresh the pendingSchemaChanges of {@link SchemaRegistryRequestHandler}. */ +public class RefreshPendingListsRequest implements CoordinationRequest { -/** - * A {@link MetadataApplier} for testing that holds all schema change events in a list for further - * examination. - */ -public class CollectingMetadataApplier implements MetadataApplier { - private final List<SchemaChangeEvent> schemaChangeEvents = new ArrayList<>(); - - @Override - public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { - schemaChangeEvents.add(schemaChangeEvent); - } - - public List<SchemaChangeEvent> getSchemaChangeEvents() { - return schemaChangeEvents; - } + private static final long serialVersionUID = 1L; } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java similarity index 52% copy from flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java copy to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java index 7b403e130..ff0221deb 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java @@ -15,27 +15,12 @@ * limitations under the License. */ -package org.apache.flink.cdc.runtime.testutils.schema; +package org.apache.flink.cdc.runtime.operators.schema.event; -import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; -import java.util.ArrayList; -import java.util.List; +/** Response to refresh the pendingSchemaChanges of {@link RefreshPendingListsRequest}. */ +public class RefreshPendingListsResponse implements CoordinationResponse { -/** - * A {@link MetadataApplier} for testing that holds all schema change events in a list for further - * examination. - */ -public class CollectingMetadataApplier implements MetadataApplier { - private final List<SchemaChangeEvent> schemaChangeEvents = new ArrayList<>(); - - @Override - public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { - schemaChangeEvents.add(schemaChangeEvent); - } - - public List<SchemaChangeEvent> getSchemaChangeEvents() { - return schemaChangeEvents; - } + private static final long serialVersionUID = 1L; } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java similarity index 56% copy from flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java copy to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java index 7b403e130..b679ab9e5 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java @@ -15,27 +15,18 @@ * limitations under the License. */ -package org.apache.flink.cdc.runtime.testutils.schema; +package org.apache.flink.cdc.runtime.operators.schema.event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.sink.MetadataApplier; - -import java.util.ArrayList; -import java.util.List; +import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; +import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; /** - * A {@link MetadataApplier} for testing that holds all schema change events in a list for further - * examination. + * The response for {@link SchemaChangeResultRequest} or {@link ReleaseUpstreamRequest} from {@link + * SchemaRegistry} to {@link SchemaOperator} if not apply {@link SchemaChangeEvent} in time. */ -public class CollectingMetadataApplier implements MetadataApplier { - private final List<SchemaChangeEvent> schemaChangeEvents = new ArrayList<>(); - - @Override - public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { - schemaChangeEvents.add(schemaChangeEvent); - } +public class SchemaChangeProcessingResponse implements CoordinationResponse { - public List<SchemaChangeEvent> getSchemaChangeEvents() { - return schemaChangeEvents; - } + private static final long serialVersionUID = 1L; } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java similarity index 52% copy from flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java copy to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java index 7b403e130..e53762c19 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java @@ -15,27 +15,17 @@ * limitations under the License. */ -package org.apache.flink.cdc.runtime.testutils.schema; +package org.apache.flink.cdc.runtime.operators.schema.event; -import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.sink.MetadataApplier; - -import java.util.ArrayList; -import java.util.List; +import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; +import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; /** - * A {@link MetadataApplier} for testing that holds all schema change events in a list for further - * examination. + * request from {@link SchemaOperator} to {@link SchemaRegistry} for getting result of applying + * schema change. */ -public class CollectingMetadataApplier implements MetadataApplier { - private final List<SchemaChangeEvent> schemaChangeEvents = new ArrayList<>(); - - @Override - public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { - schemaChangeEvents.add(schemaChangeEvent); - } +public class SchemaChangeResultRequest implements CoordinationRequest { - public List<SchemaChangeEvent> getSchemaChangeEvents() { - return schemaChangeEvents; - } + private static final long serialVersionUID = 1L; } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java index 24ee75f48..3802d14ec 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java @@ -18,31 +18,48 @@ package org.apache.flink.cdc.runtime.operators.schema; import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; +import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for the {@link SchemaOperator}. */ public class SchemaOperatorTest { + + private static final TableId CUSTOMERS = + TableId.tableId("my_company", "my_branch", "customers"); + private static final Schema CUSTOMERS_SCHEMA = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("phone", DataTypes.BIGINT()) + .primaryKey("id") + .build(); + @Test void testProcessElement() throws Exception { final int maxParallelism = 4; @@ -93,6 +110,35 @@ public class SchemaOperatorTest { } } + @Test + void testProcessSchemaChangeEventWithTimeOut() throws Exception { + SchemaOperator schemaOperator = + new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(1)); + EventOperatorTestHarness<SchemaOperator, Event> harness = + new EventOperatorTestHarness<>(schemaOperator, 1, Duration.ofSeconds(3)); + harness.open(); + Assertions.assertThrowsExactly( + TimeoutException.class, + () -> + schemaOperator.processElement( + new StreamRecord<>( + new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA)))); + } + + @Test + void testProcessSchemaChangeEventWithOutTimeOut() throws Exception { + SchemaOperator schemaOperator = + new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30)); + EventOperatorTestHarness<SchemaOperator, Event> harness = + new EventOperatorTestHarness<>(schemaOperator, 1, Duration.ofSeconds(3)); + harness.open(); + Assertions.assertDoesNotThrow( + () -> + schemaOperator.processElement( + new StreamRecord<>( + new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA)))); + } + private OneInputStreamOperatorTestHarness<Event, Event> createTestHarness( int maxParallelism, int parallelism, int subtaskIndex, OperatorID opID) throws Exception { diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java index 240eccada..b96a17b56 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java @@ -19,10 +19,13 @@ package org.apache.flink.cdc.runtime.testutils.operators; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; +import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest; +import org.apache.flink.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent; import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; import org.apache.flink.cdc.runtime.testutils.schema.CollectingMetadataApplier; import org.apache.flink.cdc.runtime.testutils.schema.TestingSchemaRegistryGateway; @@ -40,8 +43,11 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.streaming.util.MockStreamConfig; import org.apache.flink.util.OutputTag; +import org.apache.flink.util.SerializedValue; -import java.util.Collections; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; import java.util.LinkedList; /** @@ -59,6 +65,8 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex implements AutoCloseable { public static final OperatorID SCHEMA_OPERATOR_ID = new OperatorID(15213L, 15513L); + public static final OperatorID SINK_OPERATOR_ID = new OperatorID(15214L, 15514L); + private final OP operator; private final int numOutputs; private final SchemaRegistry schemaRegistry; @@ -73,8 +81,21 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex "SchemaOperator", new MockOperatorCoordinatorContext( SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()), - new CollectingMetadataApplier(), - Collections.emptyList()); + new CollectingMetadataApplier(null), + new ArrayList<>()); + schemaRegistryGateway = new TestingSchemaRegistryGateway(schemaRegistry); + } + + public EventOperatorTestHarness(OP operator, int numOutputs, Duration duration) { + this.operator = operator; + this.numOutputs = numOutputs; + schemaRegistry = + new SchemaRegistry( + "SchemaOperator", + new MockOperatorCoordinatorContext( + SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()), + new CollectingMetadataApplier(duration), + new ArrayList<>()); schemaRegistryGateway = new TestingSchemaRegistryGateway(schemaRegistry); } @@ -107,7 +128,9 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex operator.setup( new MockStreamTask(schemaRegistryGateway), new MockStreamConfig(new Configuration(), numOutputs), - new EventCollectingOutput<>(outputRecords)); + new EventCollectingOutput<>(outputRecords, schemaRegistryGateway)); + schemaRegistryGateway.sendOperatorEventToCoordinator( + SINK_OPERATOR_ID, new SerializedValue<>(new SinkWriterRegisterEvent(0))); } // ---------------------------------------- Helper classes --------------------------------- @@ -115,13 +138,29 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex private static class EventCollectingOutput<E extends Event> implements Output<StreamRecord<E>> { private final LinkedList<StreamRecord<E>> outputRecords; - public EventCollectingOutput(LinkedList<StreamRecord<E>> outputRecords) { + private final TestingSchemaRegistryGateway schemaRegistryGateway; + + public EventCollectingOutput( + LinkedList<StreamRecord<E>> outputRecords, + TestingSchemaRegistryGateway schemaRegistryGateway) { this.outputRecords = outputRecords; + this.schemaRegistryGateway = schemaRegistryGateway; } @Override public void collect(StreamRecord<E> record) { outputRecords.add(record); + Event event = record.getValue(); + if (event instanceof FlushEvent) { + try { + schemaRegistryGateway.sendOperatorEventToCoordinator( + SINK_OPERATOR_ID, + new SerializedValue<>( + new FlushSuccessEvent(0, ((FlushEvent) event).getTableId()))); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } @Override diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java index 7b403e130..1ebb5d449 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java @@ -20,6 +20,7 @@ package org.apache.flink.cdc.runtime.testutils.schema; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.sink.MetadataApplier; +import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -30,9 +31,22 @@ import java.util.List; public class CollectingMetadataApplier implements MetadataApplier { private final List<SchemaChangeEvent> schemaChangeEvents = new ArrayList<>(); + private final Duration duration; + + public CollectingMetadataApplier(Duration duration) { + this.duration = duration; + } + @Override public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { schemaChangeEvents.add(schemaChangeEvent); + if (duration != null) { + try { + Thread.sleep(duration.toMillis()); + } catch (Exception ignore) { + + } + } } public List<SchemaChangeEvent> getSchemaChangeEvents() {