lincoln-lil commented on code in PR #21676: URL: https://github.com/apache/flink/pull/21676#discussion_r1071973609
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.abilities.sink; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** + * A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the row-level + * delete mode to/from JSON, but also can delete existing data for {@link + * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonTypeName("RowLevelDelete") +public class RowLevelDeleteSpec implements SinkAbilitySpec { + public static final String FIELD_NAME_ROW_LEVEL_DELETE_MODE = "rowLevelDeleteMode"; + + @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE) + private final SupportsRowLevelDelete.RowLevelDeleteMode rowLevelDeleteMode; + + @JsonIgnore @Nullable private final RowLevelModificationScanContext scanContext; + + @JsonCreator + public RowLevelDeleteSpec( + @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE) + SupportsRowLevelDelete.RowLevelDeleteMode rowLevelDeleteMode, + RowLevelModificationScanContext scanContext) { Review Comment: add '@Nullable' to param scanContext ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/RowKindSetter.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.types.RowKind; + +/** An operator that sets the row kind of the incoming records to a specific row kind. */ +@Internal +public class RowKindSetter extends TableStreamOperator<RowData> + implements OneInputStreamOperator<RowData, RowData> { + Review Comment: add serialVersionUID ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java: ########## @@ -618,4 +642,24 @@ private enum LengthEnforcerType { CHAR, BINARY } + + /** + * Get the row-kind that the row data should change to, assuming the current row kind is + * RowKind.INSERT. Return Optional.empty() if it doesn't need to change. Currently, it'll only + * consider row-level delete. + */ + private Optional<RowKind> getTargetRowKindChangeTo() { Review Comment: just name it 'getTargetRowKind'? ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ########## @@ -780,6 +784,7 @@ public CompiledPlan compilePlanSql(String stmt) { if (operations.size() != 1 || !(operations.get(0) instanceof ModifyOperation) + || isRowLevelModification(operations.get(0)) Review Comment: add a test contains following statement set: ``` insert into ... delete from ... ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java: ########## @@ -282,6 +351,290 @@ public static RelNode validateSchemaAndApplyImplicitCast( return query; } + private static RelNode convertDelete( + LogicalTableModify tableModify, + DynamicTableSink sink, + ContextResolvedTable contextResolvedTable, + String tableDebugName, + DataTypeFactory dataTypeFactory, + FlinkTypeFactory typeFactory, + List<SinkAbilitySpec> sinkAbilitySpecs) { + if (!(sink instanceof SupportsRowLevelDelete)) { + throw new UnsupportedOperationException( + String.format( + "Can't perform deletion for the table %s for the corresponding dynamic table sink haven't implement %s.", Review Comment: 'Can't perform delete operation of the table %s because the corresponding dynamic table sink has not yet implemented %s.' ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java: ########## @@ -43,6 +43,23 @@ public class SinkModifyOperation implements ModifyOperation { private final QueryOperation child; private final boolean overwrite; private final Map<String, String> dynamicOptions; + private final boolean isDelete; Review Comment: Change these two flags into an enum, e.g., reuse `TableModify.Operation`? (Consider we may add new types of modification later) ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/RowKindSetter.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.types.RowKind; + +/** An operator that sets the row kind of the incoming records to a specific row kind. */ +@Internal +public class RowKindSetter extends TableStreamOperator<RowData> Review Comment: Add an operator test for this new one ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.batch.sql; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; + +/** Test for row-level delete. */ +@RunWith(Parameterized.class) +public class RowLevelDeleteTest extends TableTestBase { Review Comment: We should also verify the json plan to ensure the new internal 'RowkindSetter' operator has been taken effect via `verifyJsonPlan` or `verifyExplain` ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java: ########## @@ -43,6 +43,23 @@ public class SinkModifyOperation implements ModifyOperation { private final QueryOperation child; private final boolean overwrite; private final Map<String, String> dynamicOptions; + private final boolean isDelete; Review Comment: also the newly added attributes should be reflected via `asSummaryString` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.abilities.sink; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** + * A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the row-level + * delete mode to/from JSON, but also can delete existing data for {@link + * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonTypeName("RowLevelDelete") +public class RowLevelDeleteSpec implements SinkAbilitySpec { + public static final String FIELD_NAME_ROW_LEVEL_DELETE_MODE = "rowLevelDeleteMode"; + + @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE) + private final SupportsRowLevelDelete.RowLevelDeleteMode rowLevelDeleteMode; + + @JsonIgnore @Nullable private final RowLevelModificationScanContext scanContext; + + @JsonCreator + public RowLevelDeleteSpec( + @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE) + SupportsRowLevelDelete.RowLevelDeleteMode rowLevelDeleteMode, + RowLevelModificationScanContext scanContext) { + this.rowLevelDeleteMode = rowLevelDeleteMode; Review Comment: add nullcheck -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
