This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fc89ba4c4 [feature][transform-v2] Add FilterRowKindTransform to filter 
rows based on row kind (#3659)
fc89ba4c4 is described below

commit fc89ba4c44ebaaad3d023a2f81bd58e1123df348
Author: hailin0 <[email protected]>
AuthorDate: Thu Dec 8 17:08:08 2022 +0800

    [feature][transform-v2] Add FilterRowKindTransform to filter rows based on 
row kind (#3659)
---
 .../e2e/transform/TestFilterRowKindIT.java         | 47 ++++++++++++
 .../resources/filter_row_kind_exclude_delete.conf  | 83 +++++++++++++++++++++
 .../resources/filter_row_kind_exclude_insert.conf  | 67 +++++++++++++++++
 .../resources/filter_row_kind_include_insert.conf  | 83 +++++++++++++++++++++
 .../server/task/flow/TransformFlowLifeCycle.java   | 20 ++++-
 .../transform/FilterRowKindTransform.java          | 86 ++++++++++++++++++++++
 .../transform/FilterRowKindTransformFactory.java   | 42 +++++++++++
 .../transform/common/FilterRowTransform.java       | 27 +++++++
 8 files changed, 452 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
new file mode 100644
index 000000000..3b9aea584
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestFilterRowKindIT extends TestSuiteBase {
+
+    @TestTemplate
+    public void testFilterRowKindExcludeDelete(TestContainer container) throws 
IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/filter_row_kind_exclude_delete.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testFilterRowKindExcludeInsert(TestContainer container) throws 
IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/filter_row_kind_exclude_insert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testFilterRowKindIncludeInsert(TestContainer container) throws 
IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/filter_row_kind_include_insert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_exclude_delete.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_exclude_delete.conf
new file mode 100644
index 000000000..44ea90a52
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_exclude_delete.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+  FilterRowKind {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    exclude_kinds = ["DELETE"]
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "fake1"
+  }
+  Assert {
+    source_table_name = "fake1"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 100
+          }
+        ],
+        field_rules = [
+          {
+            field_name = age
+            field_type = int
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = name
+            field_type = string
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_exclude_insert.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_exclude_insert.conf
new file mode 100644
index 000000000..0fe31c921
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_exclude_insert.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+  FilterRowKind {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    exclude_kinds = ["INSERT"]
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "fake1"
+  }
+  Assert {
+    source_table_name = "fake1"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 0
+          },
+          {
+            rule_type = MAX_ROW
+            rule_value = 0
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_include_insert.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_include_insert.conf
new file mode 100644
index 000000000..2ad1fec2b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_include_insert.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+  FilterRowKind {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    include_kinds = ["INSERT"]
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "fake1"
+  }
+  Assert {
+    source_table_name = "fake1"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 100
+          }
+        ],
+        field_rules = [
+          {
+            field_name = age
+            field_type = int
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = name
+            field_type = string
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index bd7f9841a..aec7ae2e1 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -26,10 +26,13 @@ import 
org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+@Slf4j
 public class TransformFlowLifeCycle<T> extends ActionFlowLifeCycle implements 
OneInputFlowLifeCycle<Record<?>> {
 
     private final TransformChainAction<T> action;
@@ -65,11 +68,22 @@ public class TransformFlowLifeCycle<T> extends 
ActionFlowLifeCycle implements On
             if (prepareClose) {
                 return;
             }
-            T r = (T) record.getData();
+            T inputData = (T) record.getData();
+            T outputData = inputData;
             for (SeaTunnelTransform<T> t : transform) {
-                r = t.map(r);
+                outputData = t.map(inputData);
+                log.debug("Transform[{}] input row {} and output row {}", t, 
inputData, outputData);
+                if (outputData == null) {
+                    log.trace("Transform[{}] filtered data row {}", t, 
inputData);
+                    break;
+                }
+
+                inputData = outputData;
+            }
+            if (outputData != null) {
+                // todo log metrics
+                collector.collect(new Record<>(outputData));
             }
-            collector.collect(new Record<>(r));
         }
     }
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java
new file mode 100644
index 000000000..474cbc782
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.transform.common.FilterRowTransform;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import lombok.ToString;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@ToString(of = {"includeKinds", "excludeKinds"})
+@AutoService(SeaTunnelTransform.class)
+public class FilterRowKindTransform extends FilterRowTransform {
+    public static final Option<List<RowKind>> INCLUDE_KINDS = 
Options.key("include_kinds")
+        .listType(RowKind.class)
+        .noDefaultValue()
+        .withDescription("the row kinds to include");
+    public static final Option<List<RowKind>> EXCLUDE_KINDS = 
Options.key("exclude_kinds")
+        .listType(RowKind.class)
+        .noDefaultValue()
+        .withDescription("the row kinds to exclude");
+
+    private Set<RowKind> includeKinds = Collections.emptySet();
+    private Set<RowKind> excludeKinds = Collections.emptySet();
+
+    @Override
+    public String getPluginName() {
+        return "FilterRowKind";
+    }
+
+    @Override
+    protected void setConfig(Config pluginConfig) {
+        if (pluginConfig.hasPath(INCLUDE_KINDS.key())) {
+            includeKinds = new 
HashSet<>(pluginConfig.getEnumList(RowKind.class, INCLUDE_KINDS.key()));
+        }
+        if (pluginConfig.hasPath(EXCLUDE_KINDS.key())) {
+            excludeKinds = new 
HashSet<>(pluginConfig.getEnumList(RowKind.class, EXCLUDE_KINDS.key()));
+        }
+        if ((includeKinds.isEmpty() && excludeKinds.isEmpty())
+            || (!includeKinds.isEmpty() && !excludeKinds.isEmpty())) {
+            throw new 
SeaTunnelRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                String.format("These options(%s,%s) are mutually exclusive, 
allowing only one set of options to be configured.",
+                    INCLUDE_KINDS.key(), EXCLUDE_KINDS.key()));
+        }
+    }
+
+    @Override
+    protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+        if (!excludeKinds.isEmpty()) {
+            return excludeKinds.contains(inputRow.getRowKind()) ? null : 
inputRow;
+        }
+        if (!includeKinds.isEmpty()) {
+            return includeKinds.contains(inputRow.getRowKind()) ? inputRow : 
null;
+        }
+        throw new 
SeaTunnelRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION,
+            "Transform config error! Either excludeKinds or includeKinds must 
be configured");
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java
new file mode 100644
index 000000000..f2ac8adb3
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import static 
org.apache.seatunnel.transform.FilterRowKindTransform.EXCLUDE_KINDS;
+import static 
org.apache.seatunnel.transform.FilterRowKindTransform.INCLUDE_KINDS;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class FilterRowKindTransformFactory implements TableTransformFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "FilterRowKind";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+            .exclusive(EXCLUDE_KINDS, INCLUDE_KINDS)
+            .build();
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
new file mode 100644
index 000000000..939710fb1
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.common;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+public abstract class FilterRowTransform extends AbstractSeaTunnelTransform {
+    @Override
+    protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) 
{
+        return inputRowType;
+    }
+}

Reply via email to