This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 73305fb81 [hotfix] Optimize SchemaChangeEventVisitor to reduce class
inflations (#4350)
73305fb81 is described below
commit 73305fb8103beb4671c6c54216806c820c76d006
Author: yuxiqian <[email protected]>
AuthorDate: Mon Mar 30 13:59:27 2026 +0800
[hotfix] Optimize SchemaChangeEventVisitor to reduce class inflations
(#4350)
---
.../event/visitor/AddColumnEventVisitor.java | 28 --------
.../visitor/AlterTableCommentEventVisitor.java | 28 --------
.../event/visitor/CreateTableEventVisitor.java | 28 --------
.../event/visitor/DropColumnEventVisitor.java | 28 --------
.../event/visitor/DropTableEventVisitor.java | 28 --------
.../event/visitor/RenameColumnEventVisitor.java | 28 --------
.../event/visitor/SchemaChangeEventVisitor.java | 83 +++++++++++++---------
...eTableEventVisitor.java => VisitorHandler.java} | 5 +-
...peEventVisitor.java => VoidVisitorHandler.java} | 9 +--
.../doris/sink/DorisMetadataApplier.java | 42 +++--------
.../paimon/sink/PaimonMetadataApplier.java | 42 +++--------
.../starrocks/sink/StarRocksMetadataApplier.java | 38 +++-------
.../event/SchemaChangeEventSerializer.java | 11 +--
13 files changed, 86 insertions(+), 312 deletions(-)
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AddColumnEventVisitor.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AddColumnEventVisitor.java
deleted file mode 100644
index e709f7bbe..000000000
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AddColumnEventVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.common.event.visitor;
-
-import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.AddColumnEvent;
-
-/** Visitor for {@link AddColumnEvent}s. */
-@Internal
-@FunctionalInterface
-public interface AddColumnEventVisitor<T, E extends Throwable> {
- T visit(AddColumnEvent event) throws E;
-}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterTableCommentEventVisitor.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterTableCommentEventVisitor.java
deleted file mode 100644
index 997e4e067..000000000
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterTableCommentEventVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.common.event.visitor;
-
-import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.AlterTableCommentEvent;
-
-/** Visitor for {@link AlterTableCommentEvent}s. */
-@Internal
-@FunctionalInterface
-public interface AlterTableCommentEventVisitor<T, E extends Throwable> {
- T visit(AlterTableCommentEvent event) throws E;
-}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/CreateTableEventVisitor.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/CreateTableEventVisitor.java
deleted file mode 100644
index d20f118c9..000000000
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/CreateTableEventVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.common.event.visitor;
-
-import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.CreateTableEvent;
-
-/** Visitor for {@link CreateTableEvent}s. */
-@Internal
-@FunctionalInterface
-public interface CreateTableEventVisitor<T, E extends Throwable> {
- T visit(CreateTableEvent event) throws E;
-}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropColumnEventVisitor.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropColumnEventVisitor.java
deleted file mode 100644
index 1a79baa3d..000000000
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropColumnEventVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.common.event.visitor;
-
-import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.DropColumnEvent;
-
-/** Visitor for {@link DropColumnEvent}s. */
-@Internal
-@FunctionalInterface
-public interface DropColumnEventVisitor<T, E extends Throwable> {
- T visit(DropColumnEvent event) throws E;
-}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropTableEventVisitor.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropTableEventVisitor.java
deleted file mode 100644
index 074ae67ee..000000000
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropTableEventVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.common.event.visitor;
-
-import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.DropTableEvent;
-
-/** Visitor for {@link DropTableEvent}s. */
-@Internal
-@FunctionalInterface
-public interface DropTableEventVisitor<T, E extends Throwable> {
- T visit(DropTableEvent event) throws E;
-}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/RenameColumnEventVisitor.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/RenameColumnEventVisitor.java
deleted file mode 100644
index 304452528..000000000
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/RenameColumnEventVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.common.event.visitor;
-
-import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.RenameColumnEvent;
-
-/** Visitor for {@link RenameColumnEvent}s. */
-@Internal
-@FunctionalInterface
-public interface RenameColumnEventVisitor<T, E extends Throwable> {
- T visit(RenameColumnEvent event) throws E;
-}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java
index 8564a6cfb..0be202715 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java
@@ -27,64 +27,83 @@ import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
+import org.apache.flink.cdc.common.utils.Preconditions;
+
+import javax.annotation.Nonnull;
/** Visitor clas for all {@link SchemaChangeEvent}s and returns a specific
typed object. */
@Internal
public class SchemaChangeEventVisitor {
public static <T, E extends Throwable> T visit(
SchemaChangeEvent event,
- AddColumnEventVisitor<T, E> addColumnVisitor,
- AlterColumnTypeEventVisitor<T, E> alterColumnTypeEventVisitor,
- CreateTableEventVisitor<T, E> createTableEventVisitor,
- DropColumnEventVisitor<T, E> dropColumnEventVisitor,
- DropTableEventVisitor<T, E> dropTableEventVisitor,
- RenameColumnEventVisitor<T, E> renameColumnEventVisitor,
- TruncateTableEventVisitor<T, E> truncateTableEventVisitor,
- AlterTableCommentEventVisitor<T, E> alterTableCommentEventVisitor)
+ @Nonnull VisitorHandler<AddColumnEvent, T, E> addColumnVisitor,
+ @Nonnull VisitorHandler<AlterColumnTypeEvent, T, E>
alterColumnTypeEventVisitor,
+ @Nonnull VisitorHandler<CreateTableEvent, T, E>
createTableEventVisitor,
+ @Nonnull VisitorHandler<DropColumnEvent, T, E>
dropColumnEventVisitor,
+ @Nonnull VisitorHandler<DropTableEvent, T, E>
dropTableEventVisitor,
+ @Nonnull VisitorHandler<RenameColumnEvent, T, E>
renameColumnEventVisitor,
+ @Nonnull VisitorHandler<TruncateTableEvent, T, E>
truncateTableEventVisitor,
+ @Nonnull VisitorHandler<AlterTableCommentEvent, T, E>
alterTableCommentEventVisitor)
throws E {
if (event instanceof AddColumnEvent) {
- if (addColumnVisitor == null) {
- return null;
- }
+ Preconditions.checkNotNull(addColumnVisitor);
return addColumnVisitor.visit((AddColumnEvent) event);
} else if (event instanceof AlterColumnTypeEvent) {
- if (alterColumnTypeEventVisitor == null) {
- return null;
- }
+ Preconditions.checkNotNull(alterColumnTypeEventVisitor);
return alterColumnTypeEventVisitor.visit((AlterColumnTypeEvent)
event);
} else if (event instanceof CreateTableEvent) {
- if (createTableEventVisitor == null) {
- return null;
- }
+ Preconditions.checkNotNull(createTableEventVisitor);
return createTableEventVisitor.visit((CreateTableEvent) event);
} else if (event instanceof DropColumnEvent) {
- if (dropColumnEventVisitor == null) {
- return null;
- }
+ Preconditions.checkNotNull(dropColumnEventVisitor);
return dropColumnEventVisitor.visit((DropColumnEvent) event);
} else if (event instanceof DropTableEvent) {
- if (dropTableEventVisitor == null) {
- return null;
- }
+ Preconditions.checkNotNull(dropTableEventVisitor);
return dropTableEventVisitor.visit((DropTableEvent) event);
} else if (event instanceof RenameColumnEvent) {
- if (renameColumnEventVisitor == null) {
- return null;
- }
+ Preconditions.checkNotNull(renameColumnEventVisitor);
return renameColumnEventVisitor.visit((RenameColumnEvent) event);
} else if (event instanceof TruncateTableEvent) {
- if (truncateTableEventVisitor == null) {
- return null;
- }
+ Preconditions.checkNotNull(truncateTableEventVisitor);
return truncateTableEventVisitor.visit((TruncateTableEvent) event);
} else if (event instanceof AlterTableCommentEvent) {
- if (alterTableCommentEventVisitor == null) {
- return null;
- }
+ Preconditions.checkNotNull(alterTableCommentEventVisitor);
return
alterTableCommentEventVisitor.visit((AlterTableCommentEvent) event);
} else {
throw new IllegalArgumentException(
"Unknown schema change event type " + event.getType());
}
}
+
+ public static <E extends Throwable> void voidVisit(
+ SchemaChangeEvent event,
+ @Nonnull VoidVisitorHandler<AddColumnEvent, E> addColumnVisitor,
+ @Nonnull VoidVisitorHandler<AlterColumnTypeEvent, E>
alterColumnTypeEventVisitor,
+ @Nonnull VoidVisitorHandler<CreateTableEvent, E>
createTableEventVisitor,
+ @Nonnull VoidVisitorHandler<DropColumnEvent, E>
dropColumnEventVisitor,
+ @Nonnull VoidVisitorHandler<DropTableEvent, E>
dropTableEventVisitor,
+ @Nonnull VoidVisitorHandler<RenameColumnEvent, E>
renameColumnEventVisitor,
+ @Nonnull VoidVisitorHandler<TruncateTableEvent, E>
truncateTableEventVisitor,
+ @Nonnull VoidVisitorHandler<AlterTableCommentEvent, E>
alterTableCommentEventHandler)
+ throws E {
+ visit(
+ event,
+ wrapVoidVisitor(addColumnVisitor),
+ wrapVoidVisitor(alterColumnTypeEventVisitor),
+ wrapVoidVisitor(createTableEventVisitor),
+ wrapVoidVisitor(dropColumnEventVisitor),
+ wrapVoidVisitor(dropTableEventVisitor),
+ wrapVoidVisitor(renameColumnEventVisitor),
+ wrapVoidVisitor(truncateTableEventVisitor),
+ wrapVoidVisitor(alterTableCommentEventHandler));
+ }
+
+ private static <EVT extends SchemaChangeEvent, E extends Throwable>
+ VisitorHandler<EVT, Void, E>
wrapVoidVisitor(VoidVisitorHandler<EVT, E> handler) {
+ Preconditions.checkNotNull(handler);
+ return event -> {
+ handler.visit(event);
+ return null;
+ };
+ }
}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/TruncateTableEventVisitor.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VisitorHandler.java
similarity index 85%
rename from
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/TruncateTableEventVisitor.java
rename to
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VisitorHandler.java
index 020c5e8c9..e70aebc8a 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/TruncateTableEventVisitor.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VisitorHandler.java
@@ -18,11 +18,12 @@
package org.apache.flink.cdc.common.event.visitor;
import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
/** Visitor for {@link TruncateTableEvent}s. */
@Internal
@FunctionalInterface
-public interface TruncateTableEventVisitor<T, E extends Throwable> {
- T visit(TruncateTableEvent event) throws E;
+public interface VisitorHandler<EVT extends SchemaChangeEvent, T, E extends
Throwable> {
+ T visit(EVT event) throws E;
}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnTypeEventVisitor.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VoidVisitorHandler.java
similarity index 76%
rename from
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnTypeEventVisitor.java
rename to
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VoidVisitorHandler.java
index 027ddc88d..cb6818812 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnTypeEventVisitor.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VoidVisitorHandler.java
@@ -18,11 +18,12 @@
package org.apache.flink.cdc.common.event.visitor;
import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TruncateTableEvent;
-/** Visitor for {@link AlterColumnTypeEvent}s. */
+/** Visitor for {@link TruncateTableEvent}s. */
@Internal
@FunctionalInterface
-public interface AlterColumnTypeEventVisitor<T, E extends Throwable> {
- T visit(AlterColumnTypeEvent event) throws E;
+public interface VoidVisitorHandler<EVT extends SchemaChangeEvent, E extends
Throwable> {
+ void visit(EVT event) throws E;
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index de03dbc53..0aa910629 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -112,40 +112,16 @@ public class DorisMetadataApplier implements
MetadataApplier {
@Override
public void applySchemaChange(SchemaChangeEvent event) {
- SchemaChangeEventVisitor.<Void, SchemaEvolveException>visit(
+ SchemaChangeEventVisitor.voidVisit(
event,
- addColumnEvent -> {
- applyAddColumnEvent(addColumnEvent);
- return null;
- },
- alterColumnTypeEvent -> {
- applyAlterColumnTypeEvent(alterColumnTypeEvent);
- return null;
- },
- createTableEvent -> {
- applyCreateTableEvent(createTableEvent);
- return null;
- },
- dropColumnEvent -> {
- applyDropColumnEvent(dropColumnEvent);
- return null;
- },
- dropTableEvent -> {
- applyDropTableEvent(dropTableEvent);
- return null;
- },
- renameColumnEvent -> {
- applyRenameColumnEvent(renameColumnEvent);
- return null;
- },
- truncateTableEvent -> {
- applyTruncateTableEvent(truncateTableEvent);
- return null;
- },
- alterTableCommentEvent -> {
- applyAlterTableCommentEvent(alterTableCommentEvent);
- return null;
- });
+ this::applyAddColumnEvent,
+ this::applyAlterColumnTypeEvent,
+ this::applyCreateTableEvent,
+ this::applyDropColumnEvent,
+ this::applyDropTableEvent,
+ this::applyRenameColumnEvent,
+ this::applyTruncateTableEvent,
+ this::applyAlterTableCommentEvent);
}
private void applyCreateTableEvent(CreateTableEvent event) throws
SchemaEvolveException {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
index b5de3e62a..99ae583f0 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
@@ -120,40 +120,16 @@ public class PaimonMetadataApplier implements
MetadataApplier {
if (catalog == null) {
catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
}
- SchemaChangeEventVisitor.visit(
+ SchemaChangeEventVisitor.voidVisit(
schemaChangeEvent,
- addColumnEvent -> {
- applyAddColumn(addColumnEvent);
- return null;
- },
- alterColumnTypeEvent -> {
- applyAlterColumnType(alterColumnTypeEvent);
- return null;
- },
- createTableEvent -> {
- applyCreateTable(createTableEvent);
- return null;
- },
- dropColumnEvent -> {
- applyDropColumn(dropColumnEvent);
- return null;
- },
- dropTableEvent -> {
- applyDropTable(dropTableEvent);
- return null;
- },
- renameColumnEvent -> {
- applyRenameColumn(renameColumnEvent);
- return null;
- },
- truncateTableEvent -> {
- applyTruncateTable(truncateTableEvent);
- return null;
- },
- alterTableCommentEvent -> {
- applyAlterTableComment(alterTableCommentEvent);
- return null;
- });
+ this::applyAddColumn,
+ this::applyAlterColumnType,
+ this::applyCreateTable,
+ this::applyDropColumn,
+ this::applyDropTable,
+ this::applyRenameColumn,
+ this::applyTruncateTable,
+ this::applyAlterTableComment);
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
index c63b7e363..00dcb67b6 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
@@ -104,36 +104,15 @@ public class StarRocksMetadataApplier implements
MetadataApplier {
catalog.open();
}
- SchemaChangeEventVisitor.visit(
+ SchemaChangeEventVisitor.voidVisit(
schemaChangeEvent,
- addColumnEvent -> {
- applyAddColumn(addColumnEvent);
- return null;
- },
- alterColumnTypeEvent -> {
- applyAlterColumnType(alterColumnTypeEvent);
- return null;
- },
- createTableEvent -> {
- applyCreateTable(createTableEvent);
- return null;
- },
- dropColumnEvent -> {
- applyDropColumn(dropColumnEvent);
- return null;
- },
- dropTableEvent -> {
- applyDropTable(dropTableEvent);
- return null;
- },
- renameColumnEvent -> {
- applyRenameColumn(renameColumnEvent);
- return null;
- },
- truncateTableEvent -> {
- applyTruncateTable(truncateTableEvent);
- return null;
- },
+ this::applyAddColumn,
+ this::applyAlterColumnType,
+ this::applyCreateTable,
+ this::applyDropColumn,
+ this::applyDropTable,
+ this::applyRenameColumn,
+ this::applyTruncateTable,
alterTableCommentEvent -> {
// TODO Currently, table comments cannot be modified.
// See
@@ -141,7 +120,6 @@ public class StarRocksMetadataApplier implements
MetadataApplier {
LOG.warn(
"AlterTableCommentEvent is not supported by
StarRocks connector yet. Event: {}",
alterTableCommentEvent);
- return null;
});
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java
index f7a66b4d2..88cdbbed4 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java
@@ -102,49 +102,40 @@ public final class SchemaChangeEventSerializer extends
TypeSerializerSingleton<S
@Override
public void serialize(SchemaChangeEvent record, DataOutputView target)
throws IOException {
-
- SchemaChangeEventVisitor.<Void, IOException>visit(
+ SchemaChangeEventVisitor.voidVisit(
record,
addColumnEvent -> {
enumSerializer.serialize(ADD_COLUMN, target);
AddColumnEventSerializer.INSTANCE.serialize(addColumnEvent, target);
- return null;
},
alterColumnTypeEvent -> {
enumSerializer.serialize(ALTER_COLUMN_TYPE, target);
AlterColumnTypeEventSerializer.INSTANCE.serialize(alterColumnTypeEvent, target);
- return null;
},
createTableEvent -> {
enumSerializer.serialize(CREATE_TABLE, target);
CreateTableEventSerializer.INSTANCE.serialize(createTableEvent, target);
- return null;
},
dropColumnEvent -> {
enumSerializer.serialize(DROP_COLUMN, target);
DropColumnEventSerializer.INSTANCE.serialize(dropColumnEvent, target);
- return null;
},
dropTableEvent -> {
enumSerializer.serialize(DROP_TABLE, target);
DropTableEventSerializer.INSTANCE.serialize(dropTableEvent, target);
- return null;
},
renameColumnEvent -> {
enumSerializer.serialize(RENAME_COLUMN, target);
RenameColumnEventSerializer.INSTANCE.serialize(renameColumnEvent, target);
- return null;
},
truncateTableEvent -> {
enumSerializer.serialize(TRUNCATE_TABLE, target);
TruncateTableEventSerializer.INSTANCE.serialize(truncateTableEvent, target);
- return null;
},
alterTableCommentEvent -> {
enumSerializer.serialize(ALTER_TABLE_COMMENT, target);
AlterTableCommentEventSerializer.INSTANCE.serialize(
alterTableCommentEvent, target);
- return null;
});
}