Hisoka-X commented on code in PR #4148:
URL: 
https://github.com/apache/incubator-seatunnel/pull/4148#discussion_r1115184218


##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSimpleSQLEngine.java:
##########
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sqlengine.zeta;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.transform.exception.TransformException;
+import org.apache.seatunnel.transform.sqlengine.SimpleSQLEngine;
+import org.apache.seatunnel.transform.sqlengine.zeta.function.FunctionUtil;
+
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.expression.BinaryExpression;
+import net.sf.jsqlparser.expression.DoubleValue;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.ExtractExpression;
+import net.sf.jsqlparser.expression.Function;
+import net.sf.jsqlparser.expression.LongValue;
+import net.sf.jsqlparser.expression.Parenthesis;
+import net.sf.jsqlparser.expression.StringValue;
+import net.sf.jsqlparser.expression.TimeKeyExpression;
+import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
+import net.sf.jsqlparser.expression.operators.arithmetic.Concat;
+import net.sf.jsqlparser.expression.operators.arithmetic.Division;
+import net.sf.jsqlparser.expression.operators.arithmetic.Modulo;
+import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication;
+import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction;
+import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
+import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
+import net.sf.jsqlparser.expression.operators.relational.ComparisonOperator;
+import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
+import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
+import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
+import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
+import net.sf.jsqlparser.expression.operators.relational.InExpression;
+import net.sf.jsqlparser.expression.operators.relational.MinorThan;
+import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
+import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.schema.Column;
+import net.sf.jsqlparser.schema.Table;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.select.FromItem;
+import net.sf.jsqlparser.statement.select.PlainSelect;
+import net.sf.jsqlparser.statement.select.Select;
+import net.sf.jsqlparser.statement.select.SelectExpressionItem;
+import net.sf.jsqlparser.statement.select.SelectItem;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ZetaSimpleSQLEngine implements SimpleSQLEngine {
+    private String inputTableName;
+    private SeaTunnelRowType inputRowType;
+
+    private String sql;
+    private PlainSelect selectBody;
+
+    private FunctionUtil simpleSQLFunctions;
+
+    public ZetaSimpleSQLEngine() {}
+
+    @Override
+    public void init(String inputTableName, SeaTunnelRowType inputRowType, 
String sql) {
+        this.inputTableName = inputTableName;
+        this.inputRowType = inputRowType;
+        this.sql = sql;
+
+        this.simpleSQLFunctions = new FunctionUtil(this);
+
+        parseSQL();
+    }
+
+    private void parseSQL() {
+        try {
+            Statement statement = CCJSqlParserUtil.parse(sql);
+            Select select = (Select) statement;
+            selectBody = (PlainSelect) select.getSelectBody();
+
+            // todo validate SQL
+        } catch (JSQLParserException e) {
+            throw new TransformException(
+                    CommonErrorCode.UNSUPPORTED_OPERATION,
+                    String.format("SQL parse failed: %s, cause: %s", sql, 
e.getMessage()));
+        }
+    }
+
+    @Override
+    public SeaTunnelRowType typeMapping() {
+        List<SelectItem> selectItems = selectBody.getSelectItems();
+
+        String[] fieldNames = new String[selectItems.size()];
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = new 
SeaTunnelDataType<?>[selectItems.size()];
+
+        for (int i = 0; i < selectItems.size(); i++) {
+            SelectItem selectItem = selectItems.get(i);
+            SelectExpressionItem expressionItem = (SelectExpressionItem) 
selectItem;
+            Expression expression = expressionItem.getExpression();
+
+            if (expressionItem.getAlias() != null) {
+                fieldNames[i] = expressionItem.getAlias().getName();
+            } else {
+                if (expression instanceof Column) {
+                    fieldNames[i] = ((Column) expression).getColumnName();
+                } else {
+                    fieldNames[i] = expression.toString();
+                }
+            }
+
+            seaTunnelDataTypes[i] = getExpressionType(expression);
+        }
+
+        return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
+    }
+
+    @Override
+    public SeaTunnelRow transformBySQL(SeaTunnelRow inputRow) {
+        // ------Physical Query Plan Execution------
+        // Scan Table
+        Object[] inputFields = physicalScanTable(inputRow);
+
+        // Filter
+        boolean retain = physicalFilter(selectBody.getWhere(), inputFields);
+        if (!retain) {
+            return null;
+        }
+
+        // Project
+        Object[] outputFields = physicalProject(inputFields);
+
+        return new SeaTunnelRow(outputFields);
+    }
+
+    private boolean physicalFilter(Expression whereExpr, Object[] inputFields) 
{
+        if (whereExpr == null) {
+            return true;
+        }
+        if (whereExpr instanceof InExpression) {
+            InExpression inExpression = (InExpression) whereExpr;
+            Expression leftExpr = inExpression.getLeftExpression();
+            ExpressionList itemsList = (ExpressionList) 
inExpression.getRightItemsList();
+            Object leftVal = physicalProject(leftExpr, inputFields);
+            for (Expression exprItem : itemsList.getExpressions()) {
+                Object rightValue = physicalProject(exprItem, inputFields);
+                if (leftVal == null && rightValue == null) {
+                    return true;
+                }
+                if (leftVal != null) {
+                    if (leftVal.equals(rightValue)) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        }
+        if (whereExpr instanceof ComparisonOperator) {
+            ComparisonOperator comparisonOperator = (ComparisonOperator) 
whereExpr;
+            Expression leftExpr = comparisonOperator.getLeftExpression();
+            Expression rightExpr = comparisonOperator.getRightExpression();
+            Object leftVal = physicalProject(leftExpr, inputFields);
+            Object rightVal = physicalProject(rightExpr, inputFields);
+            if (whereExpr instanceof EqualsTo) {
+                if (leftVal == null || rightVal == null) {
+                    return false;
+                }
+                return leftVal.equals(rightVal);
+            }
+            if (whereExpr instanceof NotEqualsTo) {
+                if (leftVal == null) {
+                    return rightVal != null;
+                }
+                return !leftVal.equals(rightVal);
+            }
+            if (whereExpr instanceof GreaterThan) {
+                if (leftVal == null || rightVal == null) {
+                    return false;
+                }
+                if (leftVal instanceof Number && rightVal instanceof Number) {
+                    return ((Number) leftVal).doubleValue() > ((Number) 
rightVal).doubleValue();
+                }
+                if (leftVal instanceof String && rightVal instanceof String) {
+                    return ((String) leftVal).compareTo((String) rightVal) > 0;
+                }
+                if (leftVal instanceof LocalDateTime && rightVal instanceof 
LocalDateTime) {
+                    return ((LocalDateTime) leftVal).isAfter((LocalDateTime) 
rightVal);
+                }
+                if (leftVal instanceof LocalDate && rightVal instanceof 
LocalDate) {
+                    return ((LocalDate) leftVal).isAfter((LocalDate) rightVal);
+                }
+                if (leftVal instanceof LocalTime && rightVal instanceof 
LocalTime) {
+                    return ((LocalTime) leftVal).isAfter((LocalTime) rightVal);
+                }
+                throw new TransformException(
+                        CommonErrorCode.UNSUPPORTED_OPERATION,
+                        String.format(
+                                "Filed types not matched, left: %s, right:%s ",
+                                leftVal.getClass().getSimpleName(),
+                                rightVal.getClass().getSimpleName()));
+            }
+            if (whereExpr instanceof GreaterThanEquals) {
+                if (leftVal == null || rightVal == null) {
+                    return false;
+                }
+                if (leftVal instanceof Number && rightVal instanceof Number) {
+                    return ((Number) leftVal).doubleValue() >= ((Number) 
rightVal).doubleValue();
+                }
+                if (leftVal instanceof String && rightVal instanceof String) {
+                    return ((String) leftVal).compareTo((String) rightVal) >= 
0;
+                }
+                if (leftVal instanceof LocalDateTime && rightVal instanceof 
LocalDateTime) {
+                    return ((LocalDateTime) leftVal).isAfter((LocalDateTime) 
rightVal)
+                            || ((LocalDateTime) 
leftVal).isEqual((LocalDateTime) rightVal);
+                }
+                if (leftVal instanceof LocalDate && rightVal instanceof 
LocalDate) {
+                    return ((LocalDate) leftVal).isAfter((LocalDate) rightVal)
+                            || ((LocalDate) leftVal).isEqual((LocalDate) 
rightVal);
+                }
+                if (leftVal instanceof LocalTime && rightVal instanceof 
LocalTime) {
+                    return ((LocalTime) leftVal).isAfter((LocalTime) rightVal)
+                            || leftVal.equals(rightVal);
+                }
+                throw new TransformException(
+                        CommonErrorCode.UNSUPPORTED_OPERATION,
+                        String.format(
+                                "Filed types not matched, left: %s, right:%s ",
+                                leftVal.getClass().getSimpleName(),
+                                rightVal.getClass().getSimpleName()));
+            }
+            if (whereExpr instanceof MinorThan) {
+                if (leftVal == null || rightVal == null) {
+                    return false;
+                }
+                if (leftVal instanceof Number && rightVal instanceof Number) {
+                    return ((Number) leftVal).doubleValue() < ((Number) 
rightVal).doubleValue();
+                }
+                if (leftVal instanceof String && rightVal instanceof String) {
+                    return ((String) leftVal).compareTo((String) rightVal) < 0;
+                }
+                if (leftVal instanceof LocalDateTime && rightVal instanceof 
LocalDateTime) {
+                    return ((LocalDateTime) leftVal).isBefore((LocalDateTime) 
rightVal);
+                }
+                if (leftVal instanceof LocalDate && rightVal instanceof 
LocalDate) {
+                    return ((LocalDate) leftVal).isBefore((LocalDate) 
rightVal);
+                }
+                if (leftVal instanceof LocalTime && rightVal instanceof 
LocalTime) {
+                    return ((LocalTime) leftVal).isBefore((LocalTime) 
rightVal);
+                }
+                throw new TransformException(
+                        CommonErrorCode.UNSUPPORTED_OPERATION,
+                        String.format(
+                                "Filed types not matched, left: %s, right:%s ",
+                                leftVal.getClass().getSimpleName(),
+                                rightVal.getClass().getSimpleName()));
+            }
+            if (whereExpr instanceof MinorThanEquals) {
+                if (leftVal == null || rightVal == null) {
+                    return false;
+                }
+                if (leftVal instanceof Number && rightVal instanceof Number) {
+                    return ((Number) leftVal).doubleValue() <= ((Number) 
rightVal).doubleValue();
+                }
+                if (leftVal instanceof String && rightVal instanceof String) {
+                    return ((String) leftVal).compareTo((String) rightVal) <= 
0;
+                }
+                if (leftVal instanceof LocalDateTime && rightVal instanceof 
LocalDateTime) {
+                    return ((LocalDateTime) leftVal).isBefore((LocalDateTime) 
rightVal)
+                            || ((LocalDateTime) 
leftVal).isEqual((LocalDateTime) rightVal);
+                }
+                if (leftVal instanceof LocalDate && rightVal instanceof 
LocalDate) {
+                    return ((LocalDate) leftVal).isBefore((LocalDate) rightVal)
+                            || ((LocalDate) leftVal).isEqual((LocalDate) 
rightVal);
+                }
+                if (leftVal instanceof LocalTime && rightVal instanceof 
LocalTime) {
+                    return ((LocalTime) leftVal).isBefore((LocalTime) rightVal)
+                            || leftVal.equals(rightVal);
+                }
+                throw new TransformException(
+                        CommonErrorCode.UNSUPPORTED_OPERATION,
+                        String.format(
+                                "Filed types not matched, left: %s, right:%s ",
+                                leftVal.getClass().getSimpleName(),
+                                rightVal.getClass().getSimpleName()));
+            }
+        }
+        if (whereExpr instanceof AndExpression) {
+            AndExpression andExpression = (AndExpression) whereExpr;
+            Expression leftExpr = andExpression.getLeftExpression();
+            boolean leftRes = physicalFilter(leftExpr, inputFields);
+            Expression rightExpr = andExpression.getRightExpression();
+            boolean rightRes = physicalFilter(rightExpr, inputFields);
+            return leftRes && rightRes;
+        }
+        if (whereExpr instanceof OrExpression) {
+            OrExpression andExpression = (OrExpression) whereExpr;
+            Expression leftExpr = andExpression.getLeftExpression();
+            boolean leftRes = physicalFilter(leftExpr, inputFields);
+            Expression rightExpr = andExpression.getRightExpression();
+            boolean rightRes = physicalFilter(rightExpr, inputFields);
+            return leftRes || rightRes;
+        }
+        if (whereExpr instanceof Parenthesis) {
+            Parenthesis parenthesis = (Parenthesis) whereExpr;
+            Expression expression = parenthesis.getExpression();
+            return physicalFilter(expression, inputFields);
+        }
+        return false;
+    }
+
+    private Object[] physicalScanTable(SeaTunnelRow inputRow) {

Review Comment:
   Seem like this logic shouldn't run on each row. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to