This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new fc71888d7 [hotfix][cdc-common] Remove duplicated code to improve performance fc71888d7 is described below commit fc71888d7a9a84b73f1f6a16f7c755e2b1b40c02 Author: moses <72908278+chaomingzhan...@users.noreply.github.com> AuthorDate: Wed Jan 8 14:53:39 2025 +0800 [hotfix][cdc-common] Remove duplicated code to improve performance This closes #3840. Co-authored-by: zhangchaoming.zcm <zhangchaoming....@antgroup.com> --- .../main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java | 6 ++++++ .../flink/cdc/runtime/operators/schema/common/SchemaDerivator.java | 1 - 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 51fdb46e4..966f0b6d5 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -37,6 +37,7 @@ import org.apache.flink.cdc.common.types.LocalZonedTimestampType; import org.apache.flink.cdc.common.types.TimestampType; import org.apache.flink.cdc.common.types.ZonedTimestampType; +import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; import java.util.ArrayList; @@ -56,6 +57,7 @@ public class SchemaUtils { * create a list of {@link RecordData.FieldGetter} from given {@link Schema} to get Object from * RecordData. */ + @CheckReturnValue public static List<RecordData.FieldGetter> createFieldGetters(Schema schema) { return createFieldGetters(schema.getColumns()); } @@ -64,6 +66,7 @@ public class SchemaUtils { * create a list of {@link RecordData.FieldGetter} from given {@link Column} to get Object from * RecordData. */ + @CheckReturnValue public static List<RecordData.FieldGetter> createFieldGetters(List<Column> columns) { List<RecordData.FieldGetter> fieldGetters = new ArrayList<>(columns.size()); for (int i = 0; i < columns.size(); i++) { @@ -73,6 +76,7 @@ public class SchemaUtils { } /** Restore original data fields from RecordData structure. */ + @CheckReturnValue public static List<Object> restoreOriginalData( @Nullable RecordData recordData, List<RecordData.FieldGetter> fieldGetters) { if (recordData == null) { @@ -86,6 +90,7 @@ public class SchemaUtils { } /** apply SchemaChangeEvent to the old schema and return the schema after changing. */ + @CheckReturnValue public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) { return SchemaChangeEventVisitor.visit( event, @@ -210,6 +215,7 @@ public class SchemaUtils { * position indicators. This is necessary since extra calculated columns might be added, and * `FIRST` / `LAST` position might differ. */ + @CheckReturnValue public static Optional<SchemaChangeEvent> transformSchemaChangeEvent( boolean hasAsterisk, List<String> referencedColumns, SchemaChangeEvent event) { Optional<SchemaChangeEvent> evolvedSchemaChangeEvent = Optional.empty(); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java index 445387b02..de0ef182d 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java @@ -311,7 +311,6 @@ public class SchemaDerivator { List<RecordData.FieldGetter> upstreamSchemaReader = upstreamRecordGetterCache.getUnchecked(upstreamSchema); - SchemaUtils.createFieldGetters(upstreamSchema); BinaryRecordDataGenerator evolvedSchemaWriter = evolvedRecordWriterCache.getUnchecked(evolvedSchema);