This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fc89ba4c4 [feature][transform-v2] Add FilterRowKindTransform to filter
rows based on row kind (#3659)
fc89ba4c4 is described below
commit fc89ba4c44ebaaad3d023a2f81bd58e1123df348
Author: hailin0 <[email protected]>
AuthorDate: Thu Dec 8 17:08:08 2022 +0800
[feature][transform-v2] Add FilterRowKindTransform to filter rows based on
row kind (#3659)
---
.../e2e/transform/TestFilterRowKindIT.java | 47 ++++++++++++
.../resources/filter_row_kind_exclude_delete.conf | 83 +++++++++++++++++++++
.../resources/filter_row_kind_exclude_insert.conf | 67 +++++++++++++++++
.../resources/filter_row_kind_include_insert.conf | 83 +++++++++++++++++++++
.../server/task/flow/TransformFlowLifeCycle.java | 20 ++++-
.../transform/FilterRowKindTransform.java | 86 ++++++++++++++++++++++
.../transform/FilterRowKindTransformFactory.java | 42 +++++++++++
.../transform/common/FilterRowTransform.java | 27 +++++++
8 files changed, 452 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
new file mode 100644
index 000000000..3b9aea584
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestFilterRowKindIT extends TestSuiteBase {
+
+ @TestTemplate
+ public void testFilterRowKindExcludeDelete(TestContainer container) throws
IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/filter_row_kind_exclude_delete.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testFilterRowKindExcludeInsert(TestContainer container) throws
IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/filter_row_kind_exclude_insert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testFilterRowKindIncludeInsert(TestContainer container) throws
IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/filter_row_kind_include_insert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_exclude_delete.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_exclude_delete.conf
new file mode 100644
index 000000000..44ea90a52
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_exclude_delete.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+ FilterRowKind {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ exclude_kinds = ["DELETE"]
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "fake1"
+ }
+ Assert {
+ source_table_name = "fake1"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = age
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_exclude_insert.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_exclude_insert.conf
new file mode 100644
index 000000000..0fe31c921
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_exclude_insert.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+ FilterRowKind {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ exclude_kinds = ["INSERT"]
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "fake1"
+ }
+ Assert {
+ source_table_name = "fake1"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 0
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 0
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_include_insert.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_include_insert.conf
new file mode 100644
index 000000000..2ad1fec2b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/filter_row_kind_include_insert.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+ FilterRowKind {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ include_kinds = ["INSERT"]
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "fake1"
+ }
+ Assert {
+ source_table_name = "fake1"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = age
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index bd7f9841a..aec7ae2e1 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -26,10 +26,13 @@ import
org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.record.Barrier;
+import lombok.extern.slf4j.Slf4j;
+
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+@Slf4j
public class TransformFlowLifeCycle<T> extends ActionFlowLifeCycle implements
OneInputFlowLifeCycle<Record<?>> {
private final TransformChainAction<T> action;
@@ -65,11 +68,22 @@ public class TransformFlowLifeCycle<T> extends
ActionFlowLifeCycle implements On
if (prepareClose) {
return;
}
- T r = (T) record.getData();
+ T inputData = (T) record.getData();
+ T outputData = inputData;
for (SeaTunnelTransform<T> t : transform) {
- r = t.map(r);
+ outputData = t.map(inputData);
+ log.debug("Transform[{}] input row {} and output row {}", t,
inputData, outputData);
+ if (outputData == null) {
+ log.trace("Transform[{}] filtered data row {}", t,
inputData);
+ break;
+ }
+
+ inputData = outputData;
+ }
+ if (outputData != null) {
+ // todo log metrics
+ collector.collect(new Record<>(outputData));
}
- collector.collect(new Record<>(r));
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java
new file mode 100644
index 000000000..474cbc782
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.transform.common.FilterRowTransform;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import lombok.ToString;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@ToString(of = {"includeKinds", "excludeKinds"})
+@AutoService(SeaTunnelTransform.class)
+public class FilterRowKindTransform extends FilterRowTransform {
+ public static final Option<List<RowKind>> INCLUDE_KINDS =
Options.key("include_kinds")
+ .listType(RowKind.class)
+ .noDefaultValue()
+ .withDescription("the row kinds to include");
+ public static final Option<List<RowKind>> EXCLUDE_KINDS =
Options.key("exclude_kinds")
+ .listType(RowKind.class)
+ .noDefaultValue()
+ .withDescription("the row kinds to exclude");
+
+ private Set<RowKind> includeKinds = Collections.emptySet();
+ private Set<RowKind> excludeKinds = Collections.emptySet();
+
+ @Override
+ public String getPluginName() {
+ return "FilterRowKind";
+ }
+
+ @Override
+ protected void setConfig(Config pluginConfig) {
+ if (pluginConfig.hasPath(INCLUDE_KINDS.key())) {
+ includeKinds = new
HashSet<>(pluginConfig.getEnumList(RowKind.class, INCLUDE_KINDS.key()));
+ }
+ if (pluginConfig.hasPath(EXCLUDE_KINDS.key())) {
+ excludeKinds = new
HashSet<>(pluginConfig.getEnumList(RowKind.class, EXCLUDE_KINDS.key()));
+ }
+ if ((includeKinds.isEmpty() && excludeKinds.isEmpty())
+ || (!includeKinds.isEmpty() && !excludeKinds.isEmpty())) {
+ throw new
SeaTunnelRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ String.format("These options(%s,%s) are mutually exclusive,
allowing only one set of options to be configured.",
+ INCLUDE_KINDS.key(), EXCLUDE_KINDS.key()));
+ }
+ }
+
+ @Override
+ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+ if (!excludeKinds.isEmpty()) {
+ return excludeKinds.contains(inputRow.getRowKind()) ? null :
inputRow;
+ }
+ if (!includeKinds.isEmpty()) {
+ return includeKinds.contains(inputRow.getRowKind()) ? inputRow :
null;
+ }
+ throw new
SeaTunnelRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "Transform config error! Either excludeKinds or includeKinds must
be configured");
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java
new file mode 100644
index 000000000..f2ac8adb3
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import static
org.apache.seatunnel.transform.FilterRowKindTransform.EXCLUDE_KINDS;
+import static
org.apache.seatunnel.transform.FilterRowKindTransform.INCLUDE_KINDS;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class FilterRowKindTransformFactory implements TableTransformFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "FilterRowKind";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .exclusive(EXCLUDE_KINDS, INCLUDE_KINDS)
+ .build();
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
new file mode 100644
index 000000000..939710fb1
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.common;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+public abstract class FilterRowTransform extends AbstractSeaTunnelTransform {
+ @Override
+ protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType)
{
+ return inputRowType;
+ }
+}