leonardBang commented on code in PR #2937: URL: https://github.com/apache/flink-cdc/pull/2937#discussion_r1512676160
########## flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/transform/TransformFilterProcessor.java: ########## @@ -0,0 +1,127 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.runtime.operators.transform; + +import com.ververica.cdc.common.data.RecordData; +import com.ververica.cdc.common.data.binary.BinaryRecordData; +import com.ververica.cdc.common.schema.Column; +import com.ververica.cdc.runtime.parser.JaninoCompiler; +import com.ververica.cdc.runtime.parser.TransformParser; +import com.ververica.cdc.runtime.typeutils.DataTypeConverter; +import org.codehaus.janino.ExpressionEvaluator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; + +/** The processor of the transform filter. It processes the data change event of matched table. */ +public class TransformFilterProcessor { + private static final Logger LOG = LoggerFactory.getLogger(TransformFilterProcessor.class); + + public static boolean process( + BinaryRecordData after, TableInfo tableInfo, TransformFilter transformFilter) { + TransformExpressionKey transformExpressionKey = + generateTransformExpressionKey(tableInfo, transformFilter); + ExpressionEvaluator expressionEvaluator = + TransformExpressionCompiler.compileExpression(transformExpressionKey); + try { + return (Boolean) + expressionEvaluator.evaluate(generateParams(after, tableInfo, transformFilter)); Review Comment: Could one `TransformFilterProcessor` instance holds a `expressionEvaluator` which built in construction? and thus we can offer a method like `private boolean process( BinaryRecordData after)`. In this way, we can avoid compile expression per record. ########## flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/transform/TransformProjectionProcessor.java: ########## @@ -0,0 +1,137 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.runtime.operators.transform; + +import com.ververica.cdc.common.data.RecordData; +import com.ververica.cdc.common.data.binary.BinaryRecordData; +import com.ververica.cdc.common.event.CreateTableEvent; +import com.ververica.cdc.common.schema.Column; +import com.ververica.cdc.common.schema.Schema; +import com.ververica.cdc.runtime.parser.TransformParser; +import com.ververica.cdc.runtime.typeutils.DataTypeConverter; + +import java.util.ArrayList; +import java.util.List; + +/** + * The processor of transform projection applies to process a row of filtering tables. + * + * <p>A transform projection processor contains: + * + * <ul> + * <li>CreateTableEvent: add the user-defined computed columns into Schema. + * <li>SchemaChangeEvent: update the columns of TransformProjection. + * <li>DataChangeEvent: Fill data field to row in TransformSchemaOperator. Process the data column + * and the user-defined expression computed columns. + * </ul> + */ +public class TransformProjectionProcessor { + public static CreateTableEvent processCreateTableEvent( + CreateTableEvent createTableEvent, TransformProjection transformProjection) { + List<ProjectionColumn> projectionColumns = + TransformParser.generateProjectionColumns( + transformProjection.getProjection(), + createTableEvent.getSchema().getColumns()); + transformProjection.setProjectionColumns(projectionColumns); + List<Column> allColumnList = transformProjection.getAllColumnList(); + // add the column of projection into Schema + Schema schema = createTableEvent.getSchema().copy(allColumnList); + return new CreateTableEvent(createTableEvent.tableId(), schema); + } + + public static void processSchemaChangeEvent( + Schema schema, TransformProjection transformProjection) { + List<ProjectionColumn> projectionColumns = + TransformParser.generateProjectionColumns( + transformProjection.getProjection(), schema.getColumns()); + transformProjection.setProjectionColumns(projectionColumns); + } + + public static BinaryRecordData processFillDataField( + BinaryRecordData data, + TableChangeInfo tableChangeInfo, + TransformProjection transformProjection) { + List<Object> valueList = new ArrayList<>(); + for (Column column : tableChangeInfo.getTransformedSchema().getColumns()) { + boolean isProjectionColumn = false; + for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) { + if (column.getName().equals(projectionColumn.getColumnName()) + && projectionColumn.isValidTransformedProjectionColumn()) { + valueList.add(null); + isProjectionColumn = true; + break; + } + } + if (!isProjectionColumn) { + valueList.add( + getValueFromBinaryRecordData( + column.getName(), + data, + tableChangeInfo.getOriginalSchema().getColumns(), + tableChangeInfo.getFieldGetters())); + } + } + return tableChangeInfo + .getRecordDataGenerator() + .generate(valueList.toArray(new Object[valueList.size()])); + } + + public static BinaryRecordData processData( + BinaryRecordData after, TableInfo tableInfo, TransformProjection transformProjection) { Review Comment: The `static` method let the class looks like a utils, could we instantiate a `TransformProjectionProcessor transformProjectionProcessor = new TransformProjectionProcessor(xx, xx)` and then call `transformProjectionProcessor.process()` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org