leonardBang commented on code in PR #2937:
URL: https://github.com/apache/flink-cdc/pull/2937#discussion_r1512676160


##########
flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/transform/TransformFilterProcessor.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.runtime.operators.transform;
+
+import com.ververica.cdc.common.data.RecordData;
+import com.ververica.cdc.common.data.binary.BinaryRecordData;
+import com.ververica.cdc.common.schema.Column;
+import com.ververica.cdc.runtime.parser.JaninoCompiler;
+import com.ververica.cdc.runtime.parser.TransformParser;
+import com.ververica.cdc.runtime.typeutils.DataTypeConverter;
+import org.codehaus.janino.ExpressionEvaluator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** The processor of the transform filter. It processes the data change event 
of matched table. */
+public class TransformFilterProcessor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TransformFilterProcessor.class);
+
+    public static boolean process(
+            BinaryRecordData after, TableInfo tableInfo, TransformFilter 
transformFilter) {
+        TransformExpressionKey transformExpressionKey =
+                generateTransformExpressionKey(tableInfo, transformFilter);
+        ExpressionEvaluator expressionEvaluator =
+                
TransformExpressionCompiler.compileExpression(transformExpressionKey);
+        try {
+            return (Boolean)
+                    expressionEvaluator.evaluate(generateParams(after, 
tableInfo, transformFilter));

Review Comment:
   Could one `TransformFilterProcessor` instance  holds a `expressionEvaluator` 
which built in construction? and  thus we can offer a method like `private 
boolean process( BinaryRecordData after)`. In this way, we  can avoid compile 
expression per record.



##########
flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/transform/TransformProjectionProcessor.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.runtime.operators.transform;
+
+import com.ververica.cdc.common.data.RecordData;
+import com.ververica.cdc.common.data.binary.BinaryRecordData;
+import com.ververica.cdc.common.event.CreateTableEvent;
+import com.ververica.cdc.common.schema.Column;
+import com.ververica.cdc.common.schema.Schema;
+import com.ververica.cdc.runtime.parser.TransformParser;
+import com.ververica.cdc.runtime.typeutils.DataTypeConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The processor of transform projection applies to process a row of filtering 
tables.
+ *
+ * <p>A transform projection processor contains:
+ *
+ * <ul>
+ *   <li>CreateTableEvent: add the user-defined computed columns into Schema.
+ *   <li>SchemaChangeEvent: update the columns of TransformProjection.
+ *   <li>DataChangeEvent: Fill data field to row in TransformSchemaOperator. 
Process the data column
+ *       and the user-defined expression computed columns.
+ * </ul>
+ */
+public class TransformProjectionProcessor {
+    public static CreateTableEvent processCreateTableEvent(
+            CreateTableEvent createTableEvent, TransformProjection 
transformProjection) {
+        List<ProjectionColumn> projectionColumns =
+                TransformParser.generateProjectionColumns(
+                        transformProjection.getProjection(),
+                        createTableEvent.getSchema().getColumns());
+        transformProjection.setProjectionColumns(projectionColumns);
+        List<Column> allColumnList = transformProjection.getAllColumnList();
+        // add the column of projection into Schema
+        Schema schema = createTableEvent.getSchema().copy(allColumnList);
+        return new CreateTableEvent(createTableEvent.tableId(), schema);
+    }
+
+    public static void processSchemaChangeEvent(
+            Schema schema, TransformProjection transformProjection) {
+        List<ProjectionColumn> projectionColumns =
+                TransformParser.generateProjectionColumns(
+                        transformProjection.getProjection(), 
schema.getColumns());
+        transformProjection.setProjectionColumns(projectionColumns);
+    }
+
+    public static BinaryRecordData processFillDataField(
+            BinaryRecordData data,
+            TableChangeInfo tableChangeInfo,
+            TransformProjection transformProjection) {
+        List<Object> valueList = new ArrayList<>();
+        for (Column column : 
tableChangeInfo.getTransformedSchema().getColumns()) {
+            boolean isProjectionColumn = false;
+            for (ProjectionColumn projectionColumn : 
transformProjection.getProjectionColumns()) {
+                if (column.getName().equals(projectionColumn.getColumnName())
+                        && 
projectionColumn.isValidTransformedProjectionColumn()) {
+                    valueList.add(null);
+                    isProjectionColumn = true;
+                    break;
+                }
+            }
+            if (!isProjectionColumn) {
+                valueList.add(
+                        getValueFromBinaryRecordData(
+                                column.getName(),
+                                data,
+                                
tableChangeInfo.getOriginalSchema().getColumns(),
+                                tableChangeInfo.getFieldGetters()));
+            }
+        }
+        return tableChangeInfo
+                .getRecordDataGenerator()
+                .generate(valueList.toArray(new Object[valueList.size()]));
+    }
+
+    public static BinaryRecordData processData(
+            BinaryRecordData after, TableInfo tableInfo, TransformProjection 
transformProjection) {

Review Comment:
   The `static` method let the class looks like a utils, could we instantiate a 
`TransformProjectionProcessor  transformProjectionProcessor = new 
TransformProjectionProcessor(xx, xx)` and then call 
`transformProjectionProcessor.process()` ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to