luoyuxia commented on code in PR #21676:
URL: https://github.com/apache/flink/pull/21676#discussion_r1071898126
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1492,6 +1500,41 @@ private Operation convertStopJob(SqlStopJob sqlStopJob) {
sqlStopJob.getId(), sqlStopJob.isWithSavepoint(),
sqlStopJob.isWithDrain());
}
+ private Operation convertToDelete(SqlDelete sqlDelete) {
+ RelRoot updateRelational = flinkPlanner.rel(sqlDelete);
+ LogicalTableModify tableModify = (LogicalTableModify)
updateRelational.rel;
+ UnresolvedIdentifier unresolvedTableIdentifier =
+
UnresolvedIdentifier.of(tableModify.getTable().getQualifiedName());
+ ContextResolvedTable contextResolvedTable =
+ catalogManager.getTableOrError(
+
catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
+ // try push down delete
+ Optional<DynamicTableSink> optionalDynamicTableSink =
+ DeletePushDownUtils.getDynamicTableSink(
+ contextResolvedTable, tableModify, catalogManager);
+ if (optionalDynamicTableSink.isPresent()) {
+ DynamicTableSink dynamicTableSink = optionalDynamicTableSink.get();
+ // if the table sink supports delete push down
+ if (dynamicTableSink instanceof SupportsDeletePushDown) {
+ SupportsDeletePushDown supportsDeletePushDown =
+ (SupportsDeletePushDown) dynamicTableSink;
+ // get resolved filter expression
+ Optional<List<ResolvedExpression>> filters =
+
DeletePushDownUtils.getResolveFilterExpressions(tableModify);
+ if (filters.isPresent()
+ &&
supportsDeletePushDown.applyDeleteFilters(filters.get())) {
+ return new
DeleteFromFilterOperation(supportsDeletePushDown, filters.get());
+ }
+ }
+ }
Review Comment:
If the table is deprecated `TableSInk`, it will get empty.
I think we can raise an error in
`PlannerBase#translateToRel(modifyOperation: ModifyOperation)`, when the sink
is
a deprecated `TableSInk`, we then throw exception. It'll be more unified
for we also need to do such check for row-level delete/udpate.
If that's the case, I will add the code while I'm fixing the second
commit(support row-level delete).
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1492,6 +1500,41 @@ private Operation convertStopJob(SqlStopJob sqlStopJob) {
sqlStopJob.getId(), sqlStopJob.isWithSavepoint(),
sqlStopJob.isWithDrain());
}
+ private Operation convertToDelete(SqlDelete sqlDelete) {
+ RelRoot updateRelational = flinkPlanner.rel(sqlDelete);
+ LogicalTableModify tableModify = (LogicalTableModify)
updateRelational.rel;
+ UnresolvedIdentifier unresolvedTableIdentifier =
+
UnresolvedIdentifier.of(tableModify.getTable().getQualifiedName());
+ ContextResolvedTable contextResolvedTable =
+ catalogManager.getTableOrError(
+
catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
+ // try push down delete
+ Optional<DynamicTableSink> optionalDynamicTableSink =
+ DeletePushDownUtils.getDynamicTableSink(
+ contextResolvedTable, tableModify, catalogManager);
+ if (optionalDynamicTableSink.isPresent()) {
+ DynamicTableSink dynamicTableSink = optionalDynamicTableSink.get();
+ // if the table sink supports delete push down
+ if (dynamicTableSink instanceof SupportsDeletePushDown) {
+ SupportsDeletePushDown supportsDeletePushDown =
+ (SupportsDeletePushDown) dynamicTableSink;
+ // get resolved filter expression
+ Optional<List<ResolvedExpression>> filters =
+
DeletePushDownUtils.getResolveFilterExpressions(tableModify);
+ if (filters.isPresent()
+ &&
supportsDeletePushDown.applyDeleteFilters(filters.get())) {
+ return new
DeleteFromFilterOperation(supportsDeletePushDown, filters.get());
+ }
+ }
+ }
Review Comment:
If the table is deprecated `TableSink`, it will get empty.
I think we can raise an error in
`PlannerBase#translateToRel(modifyOperation: ModifyOperation)`, when the sink
is
a deprecated `TableSInk`, we then throw exception. It'll be more unified
for we also need to do such check for row-level delete/udpate.
If that's the case, I will add the code while I'm fixing the second
commit(support row-level delete).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]