This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 60cbeb08b [Feature][Transform] Add CatalogTable support for
ReplaceTransform (#4411)
60cbeb08b is described below
commit 60cbeb08b65944219942c8984a19844947ec7f4a
Author: Eric <[email protected]>
AuthorDate: Tue Mar 28 14:39:46 2023 +0800
[Feature][Transform] Add CatalogTable support for ReplaceTransform (#4411)
* Make Transform Support CatalogTable And CatalogTable Evolution
* revert example code
* Update
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
Co-authored-by: hailin0 <[email protected]>
* Fix CI problems
* Fix CI problems
* Make ReplaceTransform Support CatalogTable
* Update
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
Co-authored-by: hailin0 <[email protected]>
* Update
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
Co-authored-by: hailin0 <[email protected]>
* fix review problems
---------
Co-authored-by: hailin0 <[email protected]>
---
.../test/resources/iotdb/iotdb_source_to_sink.conf | 1 +
.../src/test/resources/copy_transform.conf | 3 -
.../resources/filter_row_kind_exclude_delete.conf | 3 -
.../resources/filter_row_kind_exclude_insert.conf | 3 -
.../resources/filter_row_kind_include_insert.conf | 3 -
.../src/test/resources/filter_transform.conf | 3 -
.../src/test/resources/split_transform.conf | 3 -
.../src/test/resources/replace_transform.conf | 1 +
.../seatunnel/transform/ReplaceTransform.java | 137 -------------------
.../common/MultipleFieldOutputTransform.java | 24 ++--
.../common/SingleFieldOutputTransform.java | 98 +++++++++++++-
.../transform/replace/ReplaceTransform.java | 145 +++++++++++++++++++++
.../transform/replace/ReplaceTransformConfig.java | 56 ++++++++
.../{ => replace}/ReplaceTransformFactory.java | 29 +++--
.../transform/ReplaceTransformFactoryTest.java | 2 +
15 files changed, 336 insertions(+), 175 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf
index 012ece4ea..10e4eef92 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf
@@ -62,6 +62,7 @@ transform {
pattern = "root.source_group"
replacement = "root.sink_group"
is_regex = false
+ replace_first = true
}
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
index 68c9e4e52..25ca4ce5f 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
@@ -54,9 +54,6 @@ transform {
}
sink {
- Console {
- source_table_name = "fake2"
- }
Assert {
source_table_name = "fake2"
rules =
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
index 44ea90a52..f7fc0f6e0 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
@@ -45,9 +45,6 @@ transform {
}
sink {
- Console {
- source_table_name = "fake1"
- }
Assert {
source_table_name = "fake1"
rules =
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
index 0fe31c921..cc3641778 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
@@ -45,9 +45,6 @@ transform {
}
sink {
- Console {
- source_table_name = "fake1"
- }
Assert {
source_table_name = "fake1"
rules =
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
index 2ad1fec2b..d1fbf79be 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
@@ -45,9 +45,6 @@ transform {
}
sink {
- Console {
- source_table_name = "fake1"
- }
Assert {
source_table_name = "fake1"
rules =
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
index 889ebe76f..56439b441 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
@@ -45,9 +45,6 @@ transform {
}
sink {
- Console {
- source_table_name = "fake1"
- }
Assert {
source_table_name = "fake1"
rules =
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
index 4627e25f0..61e10f694 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
@@ -47,9 +47,6 @@ transform {
}
sink {
- Console {
- source_table_name = "fake1"
- }
Assert {
source_table_name = "fake1"
rules =
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/replace_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/replace_transform.conf
index 4f5942c20..95150fb0d 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/replace_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/replace_transform.conf
@@ -43,6 +43,7 @@ transform {
pattern = ".+"
replacement = "b"
is_regex = true
+ replace_first = true
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransform.java
deleted file mode 100644
index b95d2b4fc..000000000
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransform.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
-import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(SeaTunnelTransform.class)
-public class ReplaceTransform extends SingleFieldOutputTransform {
-
- public static final Option<String> KEY_REPLACE_FIELD =
- Options.key("replace_field")
- .stringType()
- .noDefaultValue()
- .withDescription("The field you want to replace");
-
- public static final Option<String> KEY_PATTERN =
- Options.key("pattern")
- .stringType()
- .noDefaultValue()
- .withDescription("The old string that will be replaced");
-
- public static final Option<String> KEY_REPLACEMENT =
- Options.key("replacement")
- .stringType()
- .noDefaultValue()
- .withDescription("The new string for replace");
-
- public static final Option<Boolean> KEY_IS_REGEX =
- Options.key("is_regex")
- .booleanType()
- .defaultValue(false)
- .withDescription("Use regex for string match");
-
- public static final Option<Boolean> KEY_REPLACE_FIRST =
- Options.key("replace_first")
- .booleanType()
- .noDefaultValue()
- .withDescription("Replace the first match string");
-
- private int inputFieldIndex;
- private String replaceField;
- private String pattern;
- private String replacement;
- private boolean isRegex;
- private boolean replaceFirst;
-
- @Override
- public String getPluginName() {
- return "Replace";
- }
-
- @Override
- protected void setConfig(Config pluginConfig) {
- CheckResult checkResult =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- KEY_REPLACE_FIELD.key(),
- KEY_PATTERN.key(),
- KEY_REPLACEMENT.key());
- if (!checkResult.isSuccess()) {
- throw new IllegalArgumentException("Failed to check config! " +
checkResult.getMsg());
- }
-
- replaceField = pluginConfig.getString(KEY_REPLACE_FIELD.key());
- pattern = pluginConfig.getString(KEY_PATTERN.key());
- replacement = pluginConfig.getString(KEY_REPLACEMENT.key());
- if (pluginConfig.hasPath(KEY_IS_REGEX.key())) {
- isRegex = pluginConfig.getBoolean(KEY_IS_REGEX.key());
- }
- if (pluginConfig.hasPath(KEY_REPLACE_FIRST.key())) {
- replaceFirst = pluginConfig.getBoolean(KEY_REPLACE_FIRST.key());
- }
- }
-
- @Override
- protected void setInputRowType(SeaTunnelRowType rowType) {
- inputFieldIndex = rowType.indexOf(replaceField);
- if (inputFieldIndex == -1) {
- throw new IllegalArgumentException(
- "Cannot find [" + replaceField + "] field in input row
type");
- }
- }
-
- @Override
- protected String getOutputFieldName() {
- return replaceField;
- }
-
- @Override
- protected SeaTunnelDataType getOutputFieldDataType() {
- return BasicType.STRING_TYPE;
- }
-
- @Override
- protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
- Object inputFieldValue = inputRow.getField(inputFieldIndex);
- if (inputFieldValue == null) {
- return null;
- }
-
- if (isRegex) {
- if (replaceFirst) {
- return inputFieldValue.toString().replaceFirst(pattern,
replacement);
- }
- return inputFieldValue.toString().replaceAll(pattern, replacement);
- }
- return inputFieldValue.toString().replace(pattern, replacement);
- }
-}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index 2609ced1b..1287fff83 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -19,12 +19,14 @@ package org.apache.seatunnel.transform.common;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -36,6 +38,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
+@NoArgsConstructor
public abstract class MultipleFieldOutputTransform extends
AbstractCatalogSupportTransform {
private static final String[] TYPE_ARRAY_STRING = new String[0];
@@ -46,10 +49,6 @@ public abstract class MultipleFieldOutputTransform extends
AbstractCatalogSuppor
private int[] fieldsIndex;
private SeaTunnelRowContainerGenerator rowContainerGenerator;
- public MultipleFieldOutputTransform() {
- super();
- }
-
public MultipleFieldOutputTransform(@NonNull CatalogTable
inputCatalogTable) {
super(inputCatalogTable);
}
@@ -181,10 +180,17 @@ public abstract class MultipleFieldOutputTransform
extends AbstractCatalogSuppor
.map(Column::getName)
.collect(Collectors.toList())
.toArray(TYPE_ARRAY_STRING);
- TableSchema.Builder builder =
- TableSchema.builder()
-
.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey())
-
.constraintKey(inputCatalogTable.getTableSchema().getConstraintKeys());
+
+ List<ConstraintKey> copiedConstraintKeys =
+ inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+ .map(ConstraintKey::copy)
+ .collect(Collectors.toList());
+
+ TableSchema.Builder builder = TableSchema.builder();
+ if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) {
+ builder =
builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
+ }
+ builder = builder.constraintKey(copiedConstraintKeys);
List<Column> columns =
inputCatalogTable.getTableSchema().getColumns().stream()
.map(Column::copy)
@@ -252,7 +258,7 @@ public abstract class MultipleFieldOutputTransform extends
AbstractCatalogSuppor
@Override
protected TableIdentifier transformTableIdentifier() {
- return inputCatalogTable.getTableId();
+ return inputCatalogTable.getTableId().copy();
}
protected abstract Column[] getOutputColumns();
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
index dfc217546..76554e176 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
@@ -17,17 +17,28 @@
package org.apache.seatunnel.transform.common;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
@Slf4j
-public abstract class SingleFieldOutputTransform extends
AbstractSeaTunnelTransform {
+@NoArgsConstructor
+public abstract class SingleFieldOutputTransform extends
AbstractCatalogSupportTransform {
private static final String[] TYPE_ARRAY_STRING = new String[0];
private static final SeaTunnelDataType[] TYPE_ARRAY_SEATUNNEL_DATA_TYPE =
@@ -36,6 +47,10 @@ public abstract class SingleFieldOutputTransform extends
AbstractSeaTunnelTransf
private int fieldIndex;
private SeaTunnelRowContainerGenerator rowContainerGenerator;
+ public SingleFieldOutputTransform(@NonNull CatalogTable inputCatalogTable)
{
+ super(inputCatalogTable);
+ }
+
@Override
protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType)
{
setInputRowType(inputRowType);
@@ -131,4 +146,85 @@ public abstract class SingleFieldOutputTransform extends
AbstractSeaTunnelTransf
* @return
*/
protected abstract Object getOutputFieldValue(SeaTunnelRowAccessor
inputRow);
+
+ @Override
+ protected TableSchema transformTableSchema() {
+ Column outputColumn = getOutputColumn();
+ List<ConstraintKey> copiedConstraintKeys =
+ inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+ .map(ConstraintKey::copy)
+ .collect(Collectors.toList());
+
+ TableSchema.Builder builder = TableSchema.builder();
+ if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) {
+ builder =
builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
+ }
+ builder = builder.constraintKey(copiedConstraintKeys);
+ List<Column> columns =
+ inputCatalogTable.getTableSchema().getColumns().stream()
+ .map(Column::copy)
+ .collect(Collectors.toList());
+
+ int addFieldCount = 0;
+ Optional<Column> optional =
+ columns.stream()
+ .filter(c ->
c.getName().equals(outputColumn.getName()))
+ .findFirst();
+ if (optional.isPresent()) {
+ Column originalColumn = optional.get();
+ int originalColumnIndex = columns.indexOf(originalColumn);
+ if
(!originalColumn.getDataType().equals(outputColumn.getDataType())) {
+ columns.set(originalColumnIndex,
originalColumn.copy(outputColumn.getDataType()));
+ }
+ this.fieldIndex = originalColumnIndex;
+ } else {
+ addFieldCount++;
+ columns.add(outputColumn);
+ this.fieldIndex = columns.indexOf(outputColumn);
+ }
+
+ TableSchema outputTableSchema = builder.columns(columns).build();
+ if (addFieldCount > 0) {
+ this.fieldIndex = outputTableSchema.getColumns().size() - 1;
+ int inputFieldLength =
+
inputCatalogTable.getTableSchema().toPhysicalRowDataType().getTotalFields();
+ int outputFieldLength = outputTableSchema.getColumns().size();
+
+ rowContainerGenerator =
+ new SeaTunnelRowContainerGenerator() {
+ @Override
+ public SeaTunnelRow apply(SeaTunnelRow inputRow) {
+ // todo reuse array container
+ Object[] outputFieldValues = new
Object[outputFieldLength];
+ System.arraycopy(
+ inputRow.getFields(),
+ 0,
+ outputFieldValues,
+ 0,
+ inputFieldLength);
+
+ SeaTunnelRow outputRow = new
SeaTunnelRow(outputFieldValues);
+ outputRow.setTableId(inputRow.getTableId());
+ outputRow.setRowKind(inputRow.getRowKind());
+ return outputRow;
+ }
+ };
+ } else {
+ rowContainerGenerator = SeaTunnelRowContainerGenerator.REUSE_ROW;
+ }
+
+ log.info(
+ "Changed input table schema: {} to output table schema: {}",
+ inputCatalogTable.getTableSchema(),
+ outputTableSchema);
+
+ return outputTableSchema;
+ }
+
+ @Override
+ protected TableIdentifier transformTableIdentifier() {
+ return inputCatalogTable.getTableId().copy();
+ }
+
+ protected abstract Column getOutputColumn();
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
new file mode 100644
index 000000000..891b1bb20
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.replace;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@AutoService(SeaTunnelTransform.class)
+@NoArgsConstructor
+public class ReplaceTransform extends SingleFieldOutputTransform {
+ private ReadonlyConfig config;
+ private int inputFieldIndex;
+
+ public ReplaceTransform(
+ @NonNull ReadonlyConfig config, @NonNull CatalogTable
inputCatalogTable) {
+ super(inputCatalogTable);
+ this.config = config;
+ initOutputFields(
+ inputCatalogTable.getTableSchema().toPhysicalRowDataType(),
+ this.config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD));
+ }
+
+ @Override
+ public String getPluginName() {
+ return "Replace";
+ }
+
+ @Override
+ protected void setConfig(Config pluginConfig) {
+ ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+ .validate(new ReplaceTransformFactory().optionRule());
+ this.config = ReadonlyConfig.fromConfig(pluginConfig);
+ }
+
+ @Override
+ protected void setInputRowType(SeaTunnelRowType rowType) {
+ initOutputFields(rowType,
config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD));
+ }
+
+ private void initOutputFields(SeaTunnelRowType inputRowType, String
replaceField) {
+ inputFieldIndex = inputRowType.indexOf(replaceField);
+ if (inputFieldIndex == -1) {
+ throw new IllegalArgumentException(
+ "Cannot find [" + replaceField + "] field in input row
type");
+ }
+ }
+
+ @Override
+ protected String getOutputFieldName() {
+ return config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD);
+ }
+
+ @Override
+ protected SeaTunnelDataType getOutputFieldDataType() {
+ return BasicType.STRING_TYPE;
+ }
+
+ @Override
+ protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
+ Object inputFieldValue = inputRow.getField(inputFieldIndex);
+ if (inputFieldValue == null) {
+ return null;
+ }
+
+ boolean isRegex =
+ config.get(ReplaceTransformConfig.KEY_IS_REGEX) == null
+ ? false
+ : config.get(ReplaceTransformConfig.KEY_IS_REGEX);
+ if (isRegex) {
+ if (config.get(ReplaceTransformConfig.KEY_REPLACE_FIRST)) {
+ return inputFieldValue
+ .toString()
+ .replaceFirst(
+ config.get(ReplaceTransformConfig.KEY_PATTERN),
+
config.get(ReplaceTransformConfig.KEY_REPLACEMENT));
+ }
+ return inputFieldValue
+ .toString()
+ .replaceAll(
+ config.get(ReplaceTransformConfig.KEY_PATTERN),
+
config.get(ReplaceTransformConfig.KEY_REPLACEMENT));
+ }
+ return inputFieldValue
+ .toString()
+ .replace(
+ config.get(ReplaceTransformConfig.KEY_PATTERN),
+ config.get(ReplaceTransformConfig.KEY_REPLACEMENT));
+ }
+
+ @Override
+ protected Column getOutputColumn() {
+ List<Column> columns = inputCatalogTable.getTableSchema().getColumns();
+ List<Column> collect =
+ columns.stream()
+ .filter(
+ column ->
+ column.getName()
+ .equals(
+ config.get(
+
ReplaceTransformConfig
+
.KEY_REPLACE_FIELD)))
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(collect)) {
+ throw new IllegalArgumentException(
+ "Cannot find ["
+ +
config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD)
+ + "] field in input catalog table");
+ }
+ return collect.get(0).copy();
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformConfig.java
new file mode 100644
index 000000000..97630080e
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformConfig.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.replace;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.io.Serializable;
+
+public class ReplaceTransformConfig implements Serializable {
+
+ public static final Option<String> KEY_REPLACE_FIELD =
+ Options.key("replace_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The field you want to replace");
+
+ public static final Option<String> KEY_PATTERN =
+ Options.key("pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The old string that will be replaced");
+
+ public static final Option<String> KEY_REPLACEMENT =
+ Options.key("replacement")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The new string for replace");
+
+ public static final Option<Boolean> KEY_IS_REGEX =
+ Options.key("is_regex")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Use regex for string match");
+
+ public static final Option<Boolean> KEY_REPLACE_FIRST =
+ Options.key("replace_first")
+ .booleanType()
+ .noDefaultValue()
+ .withDescription("Replace the first match string");
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
similarity index 58%
rename from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransformFactory.java
rename to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
index fa765156c..25696ba6e 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.replace;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import com.google.auto.service.AutoService;
-import static org.apache.seatunnel.transform.ReplaceTransform.KEY_IS_REGEX;
-import static org.apache.seatunnel.transform.ReplaceTransform.KEY_PATTERN;
-import static org.apache.seatunnel.transform.ReplaceTransform.KEY_REPLACEMENT;
-import static
org.apache.seatunnel.transform.ReplaceTransform.KEY_REPLACE_FIELD;
-import static
org.apache.seatunnel.transform.ReplaceTransform.KEY_REPLACE_FIRST;
-
@AutoService(Factory.class)
public class ReplaceTransformFactory implements TableTransformFactory {
@Override
@@ -39,9 +36,21 @@ public class ReplaceTransformFactory implements
TableTransformFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(KEY_REPLACE_FIELD, KEY_PATTERN, KEY_REPLACEMENT)
- .optional(KEY_IS_REGEX)
- .conditional(KEY_IS_REGEX, true, KEY_REPLACE_FIRST)
+ .required(
+ ReplaceTransformConfig.KEY_REPLACE_FIELD,
+ ReplaceTransformConfig.KEY_PATTERN,
+ ReplaceTransformConfig.KEY_REPLACEMENT)
+ .optional(ReplaceTransformConfig.KEY_IS_REGEX)
+ .conditional(
+ ReplaceTransformConfig.KEY_IS_REGEX,
+ true,
+ ReplaceTransformConfig.KEY_REPLACE_FIRST)
.build();
}
+
+ @Override
+ public TableTransform createTransform(TableFactoryContext context) {
+ CatalogTable catalogTable = context.getCatalogTable();
+ return () -> new ReplaceTransform(context.getOptions(), catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/ReplaceTransformFactoryTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/ReplaceTransformFactoryTest.java
index 5bc9267c4..6bacfec8f 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/ReplaceTransformFactoryTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/ReplaceTransformFactoryTest.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.transform;
+import org.apache.seatunnel.transform.replace.ReplaceTransformFactory;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;