This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 73305fb81 [hotfix] Optimize SchemaChangeEventVisitor to reduce class 
inflations (#4350)
73305fb81 is described below

commit 73305fb8103beb4671c6c54216806c820c76d006
Author: yuxiqian <[email protected]>
AuthorDate: Mon Mar 30 13:59:27 2026 +0800

    [hotfix] Optimize SchemaChangeEventVisitor to reduce class inflations 
(#4350)
---
 .../event/visitor/AddColumnEventVisitor.java       | 28 --------
 .../visitor/AlterTableCommentEventVisitor.java     | 28 --------
 .../event/visitor/CreateTableEventVisitor.java     | 28 --------
 .../event/visitor/DropColumnEventVisitor.java      | 28 --------
 .../event/visitor/DropTableEventVisitor.java       | 28 --------
 .../event/visitor/RenameColumnEventVisitor.java    | 28 --------
 .../event/visitor/SchemaChangeEventVisitor.java    | 83 +++++++++++++---------
 ...eTableEventVisitor.java => VisitorHandler.java} |  5 +-
 ...peEventVisitor.java => VoidVisitorHandler.java} |  9 +--
 .../doris/sink/DorisMetadataApplier.java           | 42 +++--------
 .../paimon/sink/PaimonMetadataApplier.java         | 42 +++--------
 .../starrocks/sink/StarRocksMetadataApplier.java   | 38 +++-------
 .../event/SchemaChangeEventSerializer.java         | 11 +--
 13 files changed, 86 insertions(+), 312 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AddColumnEventVisitor.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AddColumnEventVisitor.java
deleted file mode 100644
index e709f7bbe..000000000
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AddColumnEventVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.common.event.visitor;
-
-import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.AddColumnEvent;
-
-/** Visitor for {@link AddColumnEvent}s. */
-@Internal
-@FunctionalInterface
-public interface AddColumnEventVisitor<T, E extends Throwable> {
-    T visit(AddColumnEvent event) throws E;
-}
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterTableCommentEventVisitor.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterTableCommentEventVisitor.java
deleted file mode 100644
index 997e4e067..000000000
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterTableCommentEventVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.common.event.visitor;
-
-import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.AlterTableCommentEvent;
-
-/** Visitor for {@link AlterTableCommentEvent}s. */
-@Internal
-@FunctionalInterface
-public interface AlterTableCommentEventVisitor<T, E extends Throwable> {
-    T visit(AlterTableCommentEvent event) throws E;
-}
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/CreateTableEventVisitor.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/CreateTableEventVisitor.java
deleted file mode 100644
index d20f118c9..000000000
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/CreateTableEventVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.common.event.visitor;
-
-import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.CreateTableEvent;
-
-/** Visitor for {@link CreateTableEvent}s. */
-@Internal
-@FunctionalInterface
-public interface CreateTableEventVisitor<T, E extends Throwable> {
-    T visit(CreateTableEvent event) throws E;
-}
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropColumnEventVisitor.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropColumnEventVisitor.java
deleted file mode 100644
index 1a79baa3d..000000000
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropColumnEventVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.common.event.visitor;
-
-import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.DropColumnEvent;
-
-/** Visitor for {@link DropColumnEvent}s. */
-@Internal
-@FunctionalInterface
-public interface DropColumnEventVisitor<T, E extends Throwable> {
-    T visit(DropColumnEvent event) throws E;
-}
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropTableEventVisitor.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropTableEventVisitor.java
deleted file mode 100644
index 074ae67ee..000000000
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropTableEventVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.common.event.visitor;
-
-import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.DropTableEvent;
-
-/** Visitor for {@link DropTableEvent}s. */
-@Internal
-@FunctionalInterface
-public interface DropTableEventVisitor<T, E extends Throwable> {
-    T visit(DropTableEvent event) throws E;
-}
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/RenameColumnEventVisitor.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/RenameColumnEventVisitor.java
deleted file mode 100644
index 304452528..000000000
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/RenameColumnEventVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.common.event.visitor;
-
-import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.RenameColumnEvent;
-
-/** Visitor for {@link RenameColumnEvent}s. */
-@Internal
-@FunctionalInterface
-public interface RenameColumnEventVisitor<T, E extends Throwable> {
-    T visit(RenameColumnEvent event) throws E;
-}
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java
index 8564a6cfb..0be202715 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java
@@ -27,64 +27,83 @@ import org.apache.flink.cdc.common.event.DropTableEvent;
 import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TruncateTableEvent;
+import org.apache.flink.cdc.common.utils.Preconditions;
+
+import javax.annotation.Nonnull;
 
 /** Visitor clas for all {@link SchemaChangeEvent}s and returns a specific 
typed object. */
 @Internal
 public class SchemaChangeEventVisitor {
     public static <T, E extends Throwable> T visit(
             SchemaChangeEvent event,
-            AddColumnEventVisitor<T, E> addColumnVisitor,
-            AlterColumnTypeEventVisitor<T, E> alterColumnTypeEventVisitor,
-            CreateTableEventVisitor<T, E> createTableEventVisitor,
-            DropColumnEventVisitor<T, E> dropColumnEventVisitor,
-            DropTableEventVisitor<T, E> dropTableEventVisitor,
-            RenameColumnEventVisitor<T, E> renameColumnEventVisitor,
-            TruncateTableEventVisitor<T, E> truncateTableEventVisitor,
-            AlterTableCommentEventVisitor<T, E> alterTableCommentEventVisitor)
+            @Nonnull VisitorHandler<AddColumnEvent, T, E> addColumnVisitor,
+            @Nonnull VisitorHandler<AlterColumnTypeEvent, T, E> 
alterColumnTypeEventVisitor,
+            @Nonnull VisitorHandler<CreateTableEvent, T, E> 
createTableEventVisitor,
+            @Nonnull VisitorHandler<DropColumnEvent, T, E> 
dropColumnEventVisitor,
+            @Nonnull VisitorHandler<DropTableEvent, T, E> 
dropTableEventVisitor,
+            @Nonnull VisitorHandler<RenameColumnEvent, T, E> 
renameColumnEventVisitor,
+            @Nonnull VisitorHandler<TruncateTableEvent, T, E> 
truncateTableEventVisitor,
+            @Nonnull VisitorHandler<AlterTableCommentEvent, T, E> 
alterTableCommentEventVisitor)
             throws E {
         if (event instanceof AddColumnEvent) {
-            if (addColumnVisitor == null) {
-                return null;
-            }
+            Preconditions.checkNotNull(addColumnVisitor);
             return addColumnVisitor.visit((AddColumnEvent) event);
         } else if (event instanceof AlterColumnTypeEvent) {
-            if (alterColumnTypeEventVisitor == null) {
-                return null;
-            }
+            Preconditions.checkNotNull(alterColumnTypeEventVisitor);
             return alterColumnTypeEventVisitor.visit((AlterColumnTypeEvent) 
event);
         } else if (event instanceof CreateTableEvent) {
-            if (createTableEventVisitor == null) {
-                return null;
-            }
+            Preconditions.checkNotNull(createTableEventVisitor);
             return createTableEventVisitor.visit((CreateTableEvent) event);
         } else if (event instanceof DropColumnEvent) {
-            if (dropColumnEventVisitor == null) {
-                return null;
-            }
+            Preconditions.checkNotNull(dropColumnEventVisitor);
             return dropColumnEventVisitor.visit((DropColumnEvent) event);
         } else if (event instanceof DropTableEvent) {
-            if (dropTableEventVisitor == null) {
-                return null;
-            }
+            Preconditions.checkNotNull(dropTableEventVisitor);
             return dropTableEventVisitor.visit((DropTableEvent) event);
         } else if (event instanceof RenameColumnEvent) {
-            if (renameColumnEventVisitor == null) {
-                return null;
-            }
+            Preconditions.checkNotNull(renameColumnEventVisitor);
             return renameColumnEventVisitor.visit((RenameColumnEvent) event);
         } else if (event instanceof TruncateTableEvent) {
-            if (truncateTableEventVisitor == null) {
-                return null;
-            }
+            Preconditions.checkNotNull(truncateTableEventVisitor);
             return truncateTableEventVisitor.visit((TruncateTableEvent) event);
         } else if (event instanceof AlterTableCommentEvent) {
-            if (alterTableCommentEventVisitor == null) {
-                return null;
-            }
+            Preconditions.checkNotNull(alterTableCommentEventVisitor);
             return 
alterTableCommentEventVisitor.visit((AlterTableCommentEvent) event);
         } else {
             throw new IllegalArgumentException(
                     "Unknown schema change event type " + event.getType());
         }
     }
+
+    public static <E extends Throwable> void voidVisit(
+            SchemaChangeEvent event,
+            @Nonnull VoidVisitorHandler<AddColumnEvent, E> addColumnVisitor,
+            @Nonnull VoidVisitorHandler<AlterColumnTypeEvent, E> 
alterColumnTypeEventVisitor,
+            @Nonnull VoidVisitorHandler<CreateTableEvent, E> 
createTableEventVisitor,
+            @Nonnull VoidVisitorHandler<DropColumnEvent, E> 
dropColumnEventVisitor,
+            @Nonnull VoidVisitorHandler<DropTableEvent, E> 
dropTableEventVisitor,
+            @Nonnull VoidVisitorHandler<RenameColumnEvent, E> 
renameColumnEventVisitor,
+            @Nonnull VoidVisitorHandler<TruncateTableEvent, E> 
truncateTableEventVisitor,
+            @Nonnull VoidVisitorHandler<AlterTableCommentEvent, E> 
alterTableCommentEventHandler)
+            throws E {
+        visit(
+                event,
+                wrapVoidVisitor(addColumnVisitor),
+                wrapVoidVisitor(alterColumnTypeEventVisitor),
+                wrapVoidVisitor(createTableEventVisitor),
+                wrapVoidVisitor(dropColumnEventVisitor),
+                wrapVoidVisitor(dropTableEventVisitor),
+                wrapVoidVisitor(renameColumnEventVisitor),
+                wrapVoidVisitor(truncateTableEventVisitor),
+                wrapVoidVisitor(alterTableCommentEventHandler));
+    }
+
+    private static <EVT extends SchemaChangeEvent, E extends Throwable>
+            VisitorHandler<EVT, Void, E> 
wrapVoidVisitor(VoidVisitorHandler<EVT, E> handler) {
+        Preconditions.checkNotNull(handler);
+        return event -> {
+            handler.visit(event);
+            return null;
+        };
+    }
 }
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/TruncateTableEventVisitor.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VisitorHandler.java
similarity index 85%
rename from 
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/TruncateTableEventVisitor.java
rename to 
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VisitorHandler.java
index 020c5e8c9..e70aebc8a 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/TruncateTableEventVisitor.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VisitorHandler.java
@@ -18,11 +18,12 @@
 package org.apache.flink.cdc.common.event.visitor;
 
 import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TruncateTableEvent;
 
 /** Visitor for {@link TruncateTableEvent}s. */
 @Internal
 @FunctionalInterface
-public interface TruncateTableEventVisitor<T, E extends Throwable> {
-    T visit(TruncateTableEvent event) throws E;
+public interface VisitorHandler<EVT extends SchemaChangeEvent, T, E extends 
Throwable> {
+    T visit(EVT event) throws E;
 }
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnTypeEventVisitor.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VoidVisitorHandler.java
similarity index 76%
rename from 
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnTypeEventVisitor.java
rename to 
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VoidVisitorHandler.java
index 027ddc88d..cb6818812 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnTypeEventVisitor.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VoidVisitorHandler.java
@@ -18,11 +18,12 @@
 package org.apache.flink.cdc.common.event.visitor;
 
 import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TruncateTableEvent;
 
-/** Visitor for {@link AlterColumnTypeEvent}s. */
+/** Visitor for {@link TruncateTableEvent}s. */
 @Internal
 @FunctionalInterface
-public interface AlterColumnTypeEventVisitor<T, E extends Throwable> {
-    T visit(AlterColumnTypeEvent event) throws E;
+public interface VoidVisitorHandler<EVT extends SchemaChangeEvent, E extends 
Throwable> {
+    void visit(EVT event) throws E;
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index de03dbc53..0aa910629 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -112,40 +112,16 @@ public class DorisMetadataApplier implements 
MetadataApplier {
 
     @Override
     public void applySchemaChange(SchemaChangeEvent event) {
-        SchemaChangeEventVisitor.<Void, SchemaEvolveException>visit(
+        SchemaChangeEventVisitor.voidVisit(
                 event,
-                addColumnEvent -> {
-                    applyAddColumnEvent(addColumnEvent);
-                    return null;
-                },
-                alterColumnTypeEvent -> {
-                    applyAlterColumnTypeEvent(alterColumnTypeEvent);
-                    return null;
-                },
-                createTableEvent -> {
-                    applyCreateTableEvent(createTableEvent);
-                    return null;
-                },
-                dropColumnEvent -> {
-                    applyDropColumnEvent(dropColumnEvent);
-                    return null;
-                },
-                dropTableEvent -> {
-                    applyDropTableEvent(dropTableEvent);
-                    return null;
-                },
-                renameColumnEvent -> {
-                    applyRenameColumnEvent(renameColumnEvent);
-                    return null;
-                },
-                truncateTableEvent -> {
-                    applyTruncateTableEvent(truncateTableEvent);
-                    return null;
-                },
-                alterTableCommentEvent -> {
-                    applyAlterTableCommentEvent(alterTableCommentEvent);
-                    return null;
-                });
+                this::applyAddColumnEvent,
+                this::applyAlterColumnTypeEvent,
+                this::applyCreateTableEvent,
+                this::applyDropColumnEvent,
+                this::applyDropTableEvent,
+                this::applyRenameColumnEvent,
+                this::applyTruncateTableEvent,
+                this::applyAlterTableCommentEvent);
     }
 
     private void applyCreateTableEvent(CreateTableEvent event) throws 
SchemaEvolveException {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
index b5de3e62a..99ae583f0 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
@@ -120,40 +120,16 @@ public class PaimonMetadataApplier implements 
MetadataApplier {
         if (catalog == null) {
             catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
         }
-        SchemaChangeEventVisitor.visit(
+        SchemaChangeEventVisitor.voidVisit(
                 schemaChangeEvent,
-                addColumnEvent -> {
-                    applyAddColumn(addColumnEvent);
-                    return null;
-                },
-                alterColumnTypeEvent -> {
-                    applyAlterColumnType(alterColumnTypeEvent);
-                    return null;
-                },
-                createTableEvent -> {
-                    applyCreateTable(createTableEvent);
-                    return null;
-                },
-                dropColumnEvent -> {
-                    applyDropColumn(dropColumnEvent);
-                    return null;
-                },
-                dropTableEvent -> {
-                    applyDropTable(dropTableEvent);
-                    return null;
-                },
-                renameColumnEvent -> {
-                    applyRenameColumn(renameColumnEvent);
-                    return null;
-                },
-                truncateTableEvent -> {
-                    applyTruncateTable(truncateTableEvent);
-                    return null;
-                },
-                alterTableCommentEvent -> {
-                    applyAlterTableComment(alterTableCommentEvent);
-                    return null;
-                });
+                this::applyAddColumn,
+                this::applyAlterColumnType,
+                this::applyCreateTable,
+                this::applyDropColumn,
+                this::applyDropTable,
+                this::applyRenameColumn,
+                this::applyTruncateTable,
+                this::applyAlterTableComment);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
index c63b7e363..00dcb67b6 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
@@ -104,36 +104,15 @@ public class StarRocksMetadataApplier implements 
MetadataApplier {
             catalog.open();
         }
 
-        SchemaChangeEventVisitor.visit(
+        SchemaChangeEventVisitor.voidVisit(
                 schemaChangeEvent,
-                addColumnEvent -> {
-                    applyAddColumn(addColumnEvent);
-                    return null;
-                },
-                alterColumnTypeEvent -> {
-                    applyAlterColumnType(alterColumnTypeEvent);
-                    return null;
-                },
-                createTableEvent -> {
-                    applyCreateTable(createTableEvent);
-                    return null;
-                },
-                dropColumnEvent -> {
-                    applyDropColumn(dropColumnEvent);
-                    return null;
-                },
-                dropTableEvent -> {
-                    applyDropTable(dropTableEvent);
-                    return null;
-                },
-                renameColumnEvent -> {
-                    applyRenameColumn(renameColumnEvent);
-                    return null;
-                },
-                truncateTableEvent -> {
-                    applyTruncateTable(truncateTableEvent);
-                    return null;
-                },
+                this::applyAddColumn,
+                this::applyAlterColumnType,
+                this::applyCreateTable,
+                this::applyDropColumn,
+                this::applyDropTable,
+                this::applyRenameColumn,
+                this::applyTruncateTable,
                 alterTableCommentEvent -> {
                     // TODO Currently, table comments cannot be modified.
                     // See
@@ -141,7 +120,6 @@ public class StarRocksMetadataApplier implements 
MetadataApplier {
                     LOG.warn(
                             "AlterTableCommentEvent is not supported by 
StarRocks connector yet. Event: {}",
                             alterTableCommentEvent);
-                    return null;
                 });
     }
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java
index f7a66b4d2..88cdbbed4 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java
@@ -102,49 +102,40 @@ public final class SchemaChangeEventSerializer extends 
TypeSerializerSingleton<S
 
     @Override
     public void serialize(SchemaChangeEvent record, DataOutputView target) 
throws IOException {
-
-        SchemaChangeEventVisitor.<Void, IOException>visit(
+        SchemaChangeEventVisitor.voidVisit(
                 record,
                 addColumnEvent -> {
                     enumSerializer.serialize(ADD_COLUMN, target);
                     
AddColumnEventSerializer.INSTANCE.serialize(addColumnEvent, target);
-                    return null;
                 },
                 alterColumnTypeEvent -> {
                     enumSerializer.serialize(ALTER_COLUMN_TYPE, target);
                     
AlterColumnTypeEventSerializer.INSTANCE.serialize(alterColumnTypeEvent, target);
-                    return null;
                 },
                 createTableEvent -> {
                     enumSerializer.serialize(CREATE_TABLE, target);
                     
CreateTableEventSerializer.INSTANCE.serialize(createTableEvent, target);
-                    return null;
                 },
                 dropColumnEvent -> {
                     enumSerializer.serialize(DROP_COLUMN, target);
                     
DropColumnEventSerializer.INSTANCE.serialize(dropColumnEvent, target);
-                    return null;
                 },
                 dropTableEvent -> {
                     enumSerializer.serialize(DROP_TABLE, target);
                     
DropTableEventSerializer.INSTANCE.serialize(dropTableEvent, target);
-                    return null;
                 },
                 renameColumnEvent -> {
                     enumSerializer.serialize(RENAME_COLUMN, target);
                     
RenameColumnEventSerializer.INSTANCE.serialize(renameColumnEvent, target);
-                    return null;
                 },
                 truncateTableEvent -> {
                     enumSerializer.serialize(TRUNCATE_TABLE, target);
                     
TruncateTableEventSerializer.INSTANCE.serialize(truncateTableEvent, target);
-                    return null;
                 },
                 alterTableCommentEvent -> {
                     enumSerializer.serialize(ALTER_TABLE_COMMENT, target);
                     AlterTableCommentEventSerializer.INSTANCE.serialize(
                             alterTableCommentEvent, target);
-                    return null;
                 });
     }
 

Reply via email to