lincoln-lil commented on code in PR #21676:
URL: https://github.com/apache/flink/pull/21676#discussion_r1071693761


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1492,6 +1500,41 @@ private Operation convertStopJob(SqlStopJob sqlStopJob) {
                 sqlStopJob.getId(), sqlStopJob.isWithSavepoint(), 
sqlStopJob.isWithDrain());
     }
 
+    private Operation convertToDelete(SqlDelete sqlDelete) {
+        RelRoot updateRelational = flinkPlanner.rel(sqlDelete);
+        LogicalTableModify tableModify = (LogicalTableModify) 
updateRelational.rel;
+        UnresolvedIdentifier unresolvedTableIdentifier =
+                
UnresolvedIdentifier.of(tableModify.getTable().getQualifiedName());
+        ContextResolvedTable contextResolvedTable =
+                catalogManager.getTableOrError(
+                        
catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
+        // try push down delete
+        Optional<DynamicTableSink> optionalDynamicTableSink =
+                DeletePushDownUtils.getDynamicTableSink(
+                        contextResolvedTable, tableModify, catalogManager);
+        if (optionalDynamicTableSink.isPresent()) {
+            DynamicTableSink dynamicTableSink = optionalDynamicTableSink.get();
+            // if the table sink supports delete push down
+            if (dynamicTableSink instanceof SupportsDeletePushDown) {
+                SupportsDeletePushDown supportsDeletePushDown =

Review Comment:
   nit: supportsDeletePushDown -> supportsDeletePushDownSink



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DeleteFromFilterOperation.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
+import org.apache.flink.table.expressions.ResolvedExpression;
+
+import java.util.Collections;
+import java.util.List;
+
+/** The operation for deleting data in a table according to filters directly. 
*/
+@Internal
+public class DeleteFromFilterOperation implements Operation {
+    private final SupportsDeletePushDown supportsDeletePushDown;
+    private final List<ResolvedExpression> filters;
+
+    public DeleteFromFilterOperation(
+            SupportsDeletePushDown supportsDeletePushDown, 
List<ResolvedExpression> filters) {

Review Comment:
   add `Nonnull` annotation and null check since only accept non-empty objects 
when do conversion



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The IT case for DELETE statement in batch mode. */
+public class DeleteTableITCase extends BatchTestBase {
+    private static final int ROW_NUM = 5;
+
+    @Test
+    public void testDeletePushDown() throws Exception {
+        String dataId = registerData();
+        tEnv().executeSql(
+                        String.format(
+                                "CREATE TABLE t (a int, b string, c double) 
WITH"
+                                        + " ('connector' = 
'test-update-delete',"
+                                        + " 'data-id' = '%s',"
+                                        + " 'only-accept-empty-filter' = 
'true'"
+                                        + ")",
+                                dataId));
+        List<Row> rows =
+                CollectionUtil.iteratorToList(tEnv().executeSql("DELETE FROM 
t").collect());
+        assertThat(rows.toString()).isEqualTo(String.format("[+I[%d], 
+I[OK]]", ROW_NUM));
+        rows = CollectionUtil.iteratorToList(tEnv().executeSql("SELECT * FROM 
t").collect());
+        assertThat(rows).isEmpty();
+
+        // should throw exception for non-empty filter
+        assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t where a = 
1"))

Review Comment:
   Can we extend current test utility to also support verifing the case that a 
sink can accept non-empty filters?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1407,6 +1408,23 @@ public TableResultInternal executeInternal(Operation 
operation) {
             } catch (Exception e) {
                 throw new TableException("Failed to execute ANALYZE TABLE 
command", e);
             }
+        } else if (operation instanceof DeleteFromFilterOperation) {
+            if (isStreamingMode) {
+                throw new TableException("DELETE TABLE is not supported for 
streaming mode now.");

Review Comment:
   'DELETE TABLE' -> 'DELETE from table' or 'DELETE operation' 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1492,6 +1500,41 @@ private Operation convertStopJob(SqlStopJob sqlStopJob) {
                 sqlStopJob.getId(), sqlStopJob.isWithSavepoint(), 
sqlStopJob.isWithDrain());
     }
 
+    private Operation convertToDelete(SqlDelete sqlDelete) {
+        RelRoot updateRelational = flinkPlanner.rel(sqlDelete);
+        LogicalTableModify tableModify = (LogicalTableModify) 
updateRelational.rel;
+        UnresolvedIdentifier unresolvedTableIdentifier =
+                
UnresolvedIdentifier.of(tableModify.getTable().getQualifiedName());
+        ContextResolvedTable contextResolvedTable =
+                catalogManager.getTableOrError(
+                        
catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
+        // try push down delete
+        Optional<DynamicTableSink> optionalDynamicTableSink =
+                DeletePushDownUtils.getDynamicTableSink(
+                        contextResolvedTable, tableModify, catalogManager);
+        if (optionalDynamicTableSink.isPresent()) {
+            DynamicTableSink dynamicTableSink = optionalDynamicTableSink.get();
+            // if the table sink supports delete push down
+            if (dynamicTableSink instanceof SupportsDeletePushDown) {
+                SupportsDeletePushDown supportsDeletePushDown =
+                        (SupportsDeletePushDown) dynamicTableSink;
+                // get resolved filter expression
+                Optional<List<ResolvedExpression>> filters =
+                        
DeletePushDownUtils.getResolveFilterExpressions(tableModify);
+                if (filters.isPresent()
+                        && 
supportsDeletePushDown.applyDeleteFilters(filters.get())) {
+                    return new 
DeleteFromFilterOperation(supportsDeletePushDown, filters.get());
+                }
+            }
+        }

Review Comment:
   should we raise an error when the returned dynamicTableSink is empty?(test 
case also needed)



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TableFactoryUtil;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import 
org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.RelOptPredicateList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+
+/** A utility class for delete push down. */
+public class DeletePushDownUtils {
+
+    /**
+     * Get the {@link DynamicTableSink} for the table to be modified. Return 
Optional.empty() if it
+     * can't get the {@link DynamicTableSink}.
+     */
+    public static Optional<DynamicTableSink> getDynamicTableSink(
+            ContextResolvedTable contextResolvedTable,
+            LogicalTableModify tableModify,
+            CatalogManager catalogManager) {
+        // don't consider anonymous table
+        if (contextResolvedTable.isAnonymous()) {
+            return Optional.empty();
+        }
+        final FlinkContext context = 
ShortcutUtils.unwrapContext(tableModify.getCluster());
+
+        CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable();
+        // only consider DynamicTableSink
+        if (catalogBaseTable instanceof CatalogTable) {
+            ResolvedCatalogTable resolvedTable = 
contextResolvedTable.getResolvedTable();
+            Optional<Catalog> optionalCatalog = 
contextResolvedTable.getCatalog();
+            ObjectIdentifier objectIdentifier = 
contextResolvedTable.getIdentifier();
+            boolean isTemporary = contextResolvedTable.isTemporary();
+            // only consider the CatalogTable that doesn't use legacy 
connector sink option
+            if (!TableFactoryUtil.isLegacyConnectorOptions(
+                    
catalogManager.getCatalog(objectIdentifier.getCatalogName()).orElse(null),
+                    context.getTableConfig(),
+                    !context.isBatchMode(),
+                    objectIdentifier,
+                    resolvedTable,
+                    isTemporary)) {
+                DynamicTableSinkFactory dynamicTableSinkFactory = null;
+                if (optionalCatalog.isPresent()
+                        && optionalCatalog.get().getFactory().isPresent()
+                        && optionalCatalog.get().getFactory().get()
+                                instanceof DynamicTableSinkFactory) {
+                    // try get from catalog
+                    dynamicTableSinkFactory =
+                            (DynamicTableSinkFactory) 
optionalCatalog.get().getFactory().get();
+                }
+
+                if (dynamicTableSinkFactory == null) {
+                    Optional<DynamicTableSinkFactory> factoryFromModule =
+                            
context.getModuleManager().getFactory((Module::getTableSinkFactory));
+                    // then try get from module
+                    dynamicTableSinkFactory = factoryFromModule.orElse(null);
+                }
+                // create table dynamic table sink
+                DynamicTableSink tableSink =
+                        FactoryUtil.createDynamicTableSink(
+                                dynamicTableSinkFactory,
+                                objectIdentifier,
+                                resolvedTable,
+                                Collections.emptyMap(),
+                                context.getTableConfig(),
+                                context.getClassLoader(),
+                                contextResolvedTable.isTemporary());
+                return Optional.of(tableSink);
+            }
+        }
+        return Optional.empty();
+    }
+
+    /** Get the resolve filter expressions from the {@code WHERE} clause in 
DELETE statement. */
+    public static Optional<List<ResolvedExpression>> 
getResolveFilterExpressions(
+            LogicalTableModify tableModify) {
+        FlinkContext context = 
ShortcutUtils.unwrapContext(tableModify.getCluster());
+        RelNode input = tableModify.getInput().getInput(0);
+        // no WHERE clause, return an empty list
+        if (input instanceof LogicalTableScan) {
+            return Optional.of(Collections.emptyList());
+        }
+        if (!(input instanceof LogicalFilter)) {
+            return Optional.empty();
+        }
+
+        Filter filter = (Filter) input;
+        if (RexUtil.SubQueryFinder.containsSubQuery(filter)) {
+            return Optional.empty();
+        }
+
+        // optimize the filter
+        filter = prepareFilter(filter);
+
+        // resolve the filter to get resolved expression
+        List<ResolvedExpression> resolveExpression = resolveFilter(context, 
filter);
+        return Optional.ofNullable(resolveExpression);
+    }
+
+    /** Prepare the filter with reducing && simplifying. */
+    private static Filter prepareFilter(Filter filter) {
+        // we try to reduce and simplify the filter
+        ReduceExpressionsRuleProxy reduceExpressionsRuleProxy = 
ReduceExpressionsRuleProxy.INSTANCE;
+        SimplifyFilterConditionRule simplifyFilterConditionRule =
+                SimplifyFilterConditionRule.INSTANCE();
+        // max iteration num for reducing and simplifying filter,
+        // we use 5 as the max iteration num which is same with the iteration 
num in Flink's plan
+        // optimizing.
+        int maxIteration = 5;
+
+        boolean changed = true;
+        int iteration = 1;
+        // iterate until it reaches max iteration num or there's no changes in 
one iterate
+        while (changed && iteration <= maxIteration) {
+            changed = false;
+            // first apply the rule to reduce condition in filter
+            RexNode newCondition = filter.getCondition();
+            List<RexNode> expList = new ArrayList<>();
+            expList.add(newCondition);
+            if (reduceExpressionsRuleProxy.reduce(filter, expList)) {
+                // get the new condition
+                newCondition = expList.get(0);
+                changed = true;
+            }
+            // create a new filter
+            filter = filter.copy(filter.getTraitSet(), filter.getInput(), 
newCondition);
+            // then apply the rule to simplify filter
+            Option<Filter> changedFilter =
+                    simplifyFilterConditionRule.simplify(filter, new boolean[] 
{false});
+            if (changedFilter.isDefined()) {
+                filter = changedFilter.get();
+                changed = true;
+            }
+            iteration += 1;
+        }
+        return filter;
+    }
+
+    /**
+     * A proxy for {@link ReduceExpressionsRule}, which enables us to call the 
method {@link
+     * ReduceExpressionsRule#reduceExpressions(RelNode, List, 
RelOptPredicateList)}.
+     */
+    private static class ReduceExpressionsRuleProxy
+            extends ReduceExpressionsRule<ReduceExpressionsRule.Config> {
+        private static final ReduceExpressionsRule.Config config =
+                
FilterReduceExpressionsRule.FilterReduceExpressionsRuleConfig.DEFAULT;
+        private static final ReduceExpressionsRuleProxy INSTANCE = new 
ReduceExpressionsRuleProxy();
+
+        public ReduceExpressionsRuleProxy() {
+            super(config);
+        }
+
+        @Override
+        public void onMatch(RelOptRuleCall relOptRuleCall) {}

Review Comment:
   We can add an exception thrown clause here to guard unexpected call



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TableFactoryUtil;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import 
org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.RelOptPredicateList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+
+/** A utility class for delete push down. */
+public class DeletePushDownUtils {

Review Comment:
   Add tests for this new utility



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TableFactoryUtil;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import 
org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.RelOptPredicateList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+
+/** A utility class for delete push down. */
+public class DeletePushDownUtils {
+
+    /**
+     * Get the {@link DynamicTableSink} for the table to be modified. Return 
Optional.empty() if it
+     * can't get the {@link DynamicTableSink}.
+     */
+    public static Optional<DynamicTableSink> getDynamicTableSink(
+            ContextResolvedTable contextResolvedTable,
+            LogicalTableModify tableModify,
+            CatalogManager catalogManager) {
+        // don't consider anonymous table
+        if (contextResolvedTable.isAnonymous()) {
+            return Optional.empty();
+        }
+        final FlinkContext context = 
ShortcutUtils.unwrapContext(tableModify.getCluster());
+
+        CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable();
+        // only consider DynamicTableSink
+        if (catalogBaseTable instanceof CatalogTable) {
+            ResolvedCatalogTable resolvedTable = 
contextResolvedTable.getResolvedTable();
+            Optional<Catalog> optionalCatalog = 
contextResolvedTable.getCatalog();
+            ObjectIdentifier objectIdentifier = 
contextResolvedTable.getIdentifier();
+            boolean isTemporary = contextResolvedTable.isTemporary();
+            // only consider the CatalogTable that doesn't use legacy 
connector sink option
+            if (!TableFactoryUtil.isLegacyConnectorOptions(
+                    
catalogManager.getCatalog(objectIdentifier.getCatalogName()).orElse(null),
+                    context.getTableConfig(),
+                    !context.isBatchMode(),
+                    objectIdentifier,
+                    resolvedTable,
+                    isTemporary)) {
+                DynamicTableSinkFactory dynamicTableSinkFactory = null;
+                if (optionalCatalog.isPresent()
+                        && optionalCatalog.get().getFactory().isPresent()
+                        && optionalCatalog.get().getFactory().get()
+                                instanceof DynamicTableSinkFactory) {
+                    // try get from catalog
+                    dynamicTableSinkFactory =
+                            (DynamicTableSinkFactory) 
optionalCatalog.get().getFactory().get();
+                }
+
+                if (dynamicTableSinkFactory == null) {
+                    Optional<DynamicTableSinkFactory> factoryFromModule =
+                            
context.getModuleManager().getFactory((Module::getTableSinkFactory));
+                    // then try get from module
+                    dynamicTableSinkFactory = factoryFromModule.orElse(null);
+                }
+                // create table dynamic table sink
+                DynamicTableSink tableSink =
+                        FactoryUtil.createDynamicTableSink(
+                                dynamicTableSinkFactory,
+                                objectIdentifier,
+                                resolvedTable,
+                                Collections.emptyMap(),
+                                context.getTableConfig(),
+                                context.getClassLoader(),
+                                contextResolvedTable.isTemporary());
+                return Optional.of(tableSink);
+            }
+        }
+        return Optional.empty();
+    }
+
+    /** Get the resolve filter expressions from the {@code WHERE} clause in 
DELETE statement. */
+    public static Optional<List<ResolvedExpression>> 
getResolveFilterExpressions(
+            LogicalTableModify tableModify) {
+        FlinkContext context = 
ShortcutUtils.unwrapContext(tableModify.getCluster());
+        RelNode input = tableModify.getInput().getInput(0);
+        // no WHERE clause, return an empty list
+        if (input instanceof LogicalTableScan) {
+            return Optional.of(Collections.emptyList());
+        }
+        if (!(input instanceof LogicalFilter)) {
+            return Optional.empty();
+        }
+
+        Filter filter = (Filter) input;
+        if (RexUtil.SubQueryFinder.containsSubQuery(filter)) {
+            return Optional.empty();
+        }
+
+        // optimize the filter
+        filter = prepareFilter(filter);
+
+        // resolve the filter to get resolved expression
+        List<ResolvedExpression> resolveExpression = resolveFilter(context, 
filter);
+        return Optional.ofNullable(resolveExpression);
+    }
+
+    /** Prepare the filter with reducing && simplifying. */
+    private static Filter prepareFilter(Filter filter) {
+        // we try to reduce and simplify the filter
+        ReduceExpressionsRuleProxy reduceExpressionsRuleProxy = 
ReduceExpressionsRuleProxy.INSTANCE;
+        SimplifyFilterConditionRule simplifyFilterConditionRule =
+                SimplifyFilterConditionRule.INSTANCE();
+        // max iteration num for reducing and simplifying filter,
+        // we use 5 as the max iteration num which is same with the iteration 
num in Flink's plan
+        // optimizing.
+        int maxIteration = 5;
+
+        boolean changed = true;
+        int iteration = 1;
+        // iterate until it reaches max iteration num or there's no changes in 
one iterate
+        while (changed && iteration <= maxIteration) {
+            changed = false;
+            // first apply the rule to reduce condition in filter
+            RexNode newCondition = filter.getCondition();
+            List<RexNode> expList = new ArrayList<>();
+            expList.add(newCondition);
+            if (reduceExpressionsRuleProxy.reduce(filter, expList)) {
+                // get the new condition
+                newCondition = expList.get(0);
+                changed = true;
+            }
+            // create a new filter
+            filter = filter.copy(filter.getTraitSet(), filter.getInput(), 
newCondition);
+            // then apply the rule to simplify filter
+            Option<Filter> changedFilter =
+                    simplifyFilterConditionRule.simplify(filter, new boolean[] 
{false});
+            if (changedFilter.isDefined()) {
+                filter = changedFilter.get();
+                changed = true;
+            }
+            iteration += 1;
+        }
+        return filter;
+    }
+
+    /**
+     * A proxy for {@link ReduceExpressionsRule}, which enables us to call the 
method {@link
+     * ReduceExpressionsRule#reduceExpressions(RelNode, List, 
RelOptPredicateList)}.
+     */
+    private static class ReduceExpressionsRuleProxy
+            extends ReduceExpressionsRule<ReduceExpressionsRule.Config> {
+        private static final ReduceExpressionsRule.Config config =
+                
FilterReduceExpressionsRule.FilterReduceExpressionsRuleConfig.DEFAULT;
+        private static final ReduceExpressionsRuleProxy INSTANCE = new 
ReduceExpressionsRuleProxy();
+
+        public ReduceExpressionsRuleProxy() {
+            super(config);
+        }
+
+        @Override
+        public void onMatch(RelOptRuleCall relOptRuleCall) {}
+
+        private boolean reduce(RelNode rel, List<RexNode> expList) {
+            return reduceExpressions(
+                    rel,
+                    expList,
+                    RelOptPredicateList.EMPTY,
+                    true,
+                    config.matchNullability(),
+                    config.treatDynamicCallsAsConstant());
+        }
+    }
+
+    /** Return the ResolvedExpression according to Filter. */
+    private static List<ResolvedExpression> resolveFilter(FlinkContext 
context, Filter filter) {
+        Tuple2<RexNode[], RexNode[]> extractedPredicates =
+                extractPredicates(
+                        
filter.getInput().getRowType().getFieldNames().toArray(new String[0]),
+                        filter.getCondition(),
+                        filter,
+                        filter.getCluster().getRexBuilder());
+        RexNode[] convertiblePredicates = extractedPredicates._1;
+        RexNode[] unconvertedPredicates = extractedPredicates._2;
+        if (unconvertedPredicates.length != 0) {
+            // if contain any unconverted condition, return null
+            return null;
+        }
+        RexNodeToExpressionConverter converter =
+                new RexNodeToExpressionConverter(
+                        filter.getCluster().getRexBuilder(),
+                        
filter.getInput().getRowType().getFieldNames().toArray(new String[0]),
+                        context.getFunctionCatalog(),
+                        context.getCatalogManager(),
+                        TimeZone.getTimeZone(
+                                
TableConfigUtils.getLocalTimeZone(context.getTableConfig())));
+        List<Expression> filters =
+                Arrays.stream(convertiblePredicates)
+                        .map(
+                                p -> {
+                                    Option<ResolvedExpression> expr = 
p.accept(converter);
+                                    if (expr.isDefined()) {
+                                        return expr.get();
+                                    } else {
+                                        throw new TableException(
+                                                String.format(
+                                                        "%s can not be 
converted to Expression",
+                                                        p));
+                                    }
+                                })
+                        .collect(Collectors.toList());
+        ExpressionResolver resolver =
+                ExpressionResolver.resolverFor(
+                                context.getTableConfig(),
+                                context.getClassLoader(),
+                                name -> Optional.empty(),
+                                context.getFunctionCatalog()
+                                        .asLookup(
+                                                str -> {
+                                                    throw new TableException(
+                                                            "We should not 
need to lookup any expressions at this point");
+                                                }),
+                                
context.getCatalogManager().getDataTypeFactory(),
+                                (sqlExpression, inputRowType, outputType) -> {
+                                    throw new TableException(
+                                            "SQL expression parsing is not 
supported at this location.");
+                                })
+                        .build();
+        return resolver.resolve(filters);
+    }
+
+    private static Tuple2<RexNode[], RexNode[]> extractPredicates(

Review Comment:
   We'd better to extract the similar method in 
`PushFilterIntoSourceScanRuleBase` into a utility class, e.g., `FlinkRexUtil`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DeleteFromFilterOperation.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
+import org.apache.flink.table.expressions.ResolvedExpression;
+
+import java.util.Collections;
+import java.util.List;
+
+/** The operation for deleting data in a table according to filters directly. 
*/
+@Internal
+public class DeleteFromFilterOperation implements Operation {
+    private final SupportsDeletePushDown supportsDeletePushDown;

Review Comment:
   -> 'supportsDeletePushDownSink' maybe more clear  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to