This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7354b448d93 Refactor pipeline Column. (#33670)
7354b448d93 is described below

commit 7354b448d93809734b7e2c3be715f9909237d50f
Author: Cong Hu <[email protected]>
AuthorDate: Fri Nov 15 15:56:49 2024 +0800

    Refactor pipeline Column. (#33670)
---
 .../ingest/dumper/inventory/InventoryDumper.java   |  6 +--
 .../data/pipeline/core/ingest/record/Column.java   | 50 ++++++++++--------
 .../record/{Column.java => NormalColumn.java}      |  4 +-
 .../ingest/record/group/DataRecordGroupEngine.java |  5 +-
 .../core/ingest/record/DataRecordTest.java         |  8 +--
 .../core/ingest/record/RecordUtilsTest.java        | 10 ++--
 .../record/group/DataRecordGroupEngineTest.java    | 61 +++++++++++-----------
 .../sql/PipelineImportSQLBuilderTest.java          | 11 ++--
 .../incremental/dumper/MySQLIncrementalDumper.java | 10 ++--
 .../sqlbuilder/MySQLPipelineSQLBuilderTest.java    | 12 ++---
 .../OpenGaussPipelineSQLBuilderTest.java           | 12 ++---
 .../ingest/incremental/wal/WALEventConverter.java  |  7 +--
 .../PostgreSQLPipelineSQLBuilderTest.java          |  8 +--
 .../cdc/util/DataRecordResultConvertUtilsTest.java | 30 +++++------
 .../sink/type/PipelineDataSourceSinkTest.java      | 14 ++---
 15 files changed, 130 insertions(+), 118 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index 3df8c98344f..daab070fcce 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -28,17 +28,17 @@ import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLife
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.point.InventoryPointQueryParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.InventoryQueryParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.point.InventoryPointQueryParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.InventoryRangeQueryParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
@@ -241,7 +241,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         for (int i = 1; i <= columnCount; i++) {
             String columnName = insertColumnNames.isEmpty() ? 
resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
             
ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName),
 () -> new PipelineInvalidParameterException(String.format("Column name is %s", 
columnName)));
-            result.addColumn(new Column(columnName, 
columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true, 
tableMetaData.getColumnMetaData(columnName).isUniqueKey()));
+            result.addColumn(new NormalColumn(columnName, 
columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true, 
tableMetaData.getColumnMetaData(columnName).isUniqueKey()));
         }
         result.setActualTableName(dumperContext.getActualTableName());
         return result;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
index 221a2c5b478..ff712ff9779 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
@@ -17,35 +17,43 @@
 
 package org.apache.shardingsphere.data.pipeline.core.ingest.record;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
 /**
  * Column.
  */
-@RequiredArgsConstructor
-@Getter
-public final class Column {
-    
-    private final String name;
+public interface Column {
     
     /**
-     * Value are available only when the primary key column is updated.
+     * Get name.
+     *
+     * @return name
      */
-    private final Object oldValue;
-    
-    private final Object value;
+    String getName();
     
-    private final boolean updated;
+    /**
+     * Get old value.
+     *
+     * @return old value
+     */
+    Object getOldValue();
     
-    private final boolean uniqueKey;
+    /**
+     * Get value.
+     *
+     * @return value
+     */
+    Object getValue();
     
-    public Column(final String name, final Object value, final boolean 
updated, final boolean uniqueKey) {
-        this(name, null, value, updated, uniqueKey);
-    }
+    /**
+     * Judge whether the column is updated.
+     *
+     * @return true if the column is updated, otherwise false
+     */
+    boolean isUpdated();
     
-    @Override
-    public String toString() {
-        return String.format("%s: oldValue=%s, value=%s", name, oldValue, 
value);
-    }
+    /**
+     * Judge whether the column is unique key.
+     *
+     * @return true if the column is unique key, otherwise false
+     */
+    boolean isUniqueKey();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/NormalColumn.java
similarity index 89%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/NormalColumn.java
index 221a2c5b478..692798d66c4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/NormalColumn.java
@@ -25,7 +25,7 @@ import lombok.RequiredArgsConstructor;
  */
 @RequiredArgsConstructor
 @Getter
-public final class Column {
+public final class NormalColumn implements Column {
     
     private final String name;
     
@@ -40,7 +40,7 @@ public final class Column {
     
     private final boolean uniqueKey;
     
-    public Column(final String name, final Object value, final boolean 
updated, final boolean uniqueKey) {
+    public NormalColumn(final String name, final Object value, final boolean 
updated, final boolean uniqueKey) {
         this(name, null, value, updated, uniqueKey);
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index cb176a62fa0..aae1e972858 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -21,6 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperatio
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
 
@@ -122,7 +123,7 @@ public final class DataRecordGroupEngine {
             DataRecord mergedDataRecord = new 
DataRecord(PipelineSQLOperationType.DELETE, dataRecord.getTableName(), 
dataRecord.getPosition(), dataRecord.getColumnCount());
             mergeBaseFields(dataRecord, mergedDataRecord);
             for (int i = 0; i < dataRecord.getColumnCount(); i++) {
-                mergedDataRecord.addColumn(new 
Column(dataRecord.getColumn(i).getName(),
+                mergedDataRecord.addColumn(new 
NormalColumn(dataRecord.getColumn(i).getName(),
                         dataRecord.getColumn(i).isUniqueKey() ? 
beforeDataRecord.getColumn(i).getOldValue() : 
beforeDataRecord.getColumn(i).getValue(),
                         null, true, dataRecord.getColumn(i).isUniqueKey()));
             }
@@ -153,7 +154,7 @@ public final class DataRecordGroupEngine {
         DataRecord result = new DataRecord(type, tableName, 
curDataRecord.getPosition(), curDataRecord.getColumnCount());
         mergeBaseFields(curDataRecord, result);
         for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
-            result.addColumn(new Column(
+            result.addColumn(new NormalColumn(
                     curDataRecord.getColumn(i).getName(),
                     preDataRecord.getColumn(i).getOldValue(),
                     curDataRecord.getColumn(i).getValue(),
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
index c991138416c..346d4c6fb37 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
@@ -29,18 +29,18 @@ class DataRecordTest {
     @Test
     void assertGetKey() {
         DataRecord beforeDataRecord = new 
DataRecord(PipelineSQLOperationType.UPDATE, "foo_tbl", new 
IngestPlaceholderPosition(), 1);
-        beforeDataRecord.addColumn(new Column("id", 1, true, true));
+        beforeDataRecord.addColumn(new NormalColumn("id", 1, true, true));
         DataRecord afterDataRecord = new 
DataRecord(PipelineSQLOperationType.UPDATE, "foo_tbl", new 
IngestPlaceholderPosition(), 1);
-        afterDataRecord.addColumn(new Column("id", 2, 1, true, true));
+        afterDataRecord.addColumn(new NormalColumn("id", 2, 1, true, true));
         assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getKey()));
     }
     
     @Test
     void assertGetOldKey() {
         DataRecord beforeDataRecord = new 
DataRecord(PipelineSQLOperationType.DELETE, "foo_tbl", new 
IngestPlaceholderPosition(), 1);
-        beforeDataRecord.addColumn(new Column("id", 1, 2, true, true));
+        beforeDataRecord.addColumn(new NormalColumn("id", 1, 2, true, true));
         DataRecord afterDataRecord = new 
DataRecord(PipelineSQLOperationType.DELETE, "foo_tbl", new 
IngestPlaceholderPosition(), 1);
-        afterDataRecord.addColumn(new Column("id", 1, 3, true, true));
+        afterDataRecord.addColumn(new NormalColumn("id", 1, 3, true, true));
         assertThat(beforeDataRecord.getOldKey(), 
is(afterDataRecord.getOldKey()));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/RecordUtilsTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/RecordUtilsTest.java
index f8471e2bfde..d53f89bd72b 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/RecordUtilsTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/RecordUtilsTest.java
@@ -41,11 +41,11 @@ class RecordUtilsTest {
     
     private DataRecord mockDataRecord() {
         DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
"t2", new IngestPlaceholderPosition(), 4);
-        result.addColumn(new Column("id", "", false, true));
-        result.addColumn(new Column("sc", "", false, true));
-        result.addColumn(new Column("c1", "", true, false));
-        result.addColumn(new Column("c2", "", true, false));
-        result.addColumn(new Column("c3", "", true, false));
+        result.addColumn(new NormalColumn("id", "", false, true));
+        result.addColumn(new NormalColumn("sc", "", false, true));
+        result.addColumn(new NormalColumn("c1", "", true, false));
+        result.addColumn(new NormalColumn("c2", "", true, false));
+        result.addColumn(new NormalColumn("c3", "", true, false));
         return result;
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
index dca4f270d87..fc4bf4bc340 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexp
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
 import org.junit.jupiter.api.Test;
 
@@ -73,9 +74,9 @@ class DataRecordGroupEngineTest {
         assertThat(dataRecord.getTableName(), is("order"));
         assertThat(dataRecord.getActualTableName(), is("order_0"));
         assertThat(dataRecord.getCommitTime(), is(456L));
-        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", null, 
1, true, true));
-        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
null, 10, true, false));
-        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", null, 200, true, false));
+        assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id", 
null, 1, true, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new 
NormalColumn("user_id", null, 10, true, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
NormalColumn("total_price", null, 200, true, false));
     }
     
     private void assertColumnsMatched(final Column actual, final Column 
expected) {
@@ -97,9 +98,9 @@ class DataRecordGroupEngineTest {
         assertThat(dataRecord.getTableName(), is("order"));
         assertThat(dataRecord.getActualTableName(), is("order_0"));
         assertThat(dataRecord.getCommitTime(), is(456L));
-        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", null, 
2, true, true));
-        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
null, 10, true, false));
-        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", null, 50, true, false));
+        assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id", 
null, 2, true, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new 
NormalColumn("user_id", null, 10, true, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
NormalColumn("total_price", null, 50, true, false));
     }
     
     @Test
@@ -113,9 +114,9 @@ class DataRecordGroupEngineTest {
         assertThat(dataRecord.getTableName(), is("order"));
         assertThat(dataRecord.getActualTableName(), is("order_0"));
         assertThat(dataRecord.getCommitTime(), is(456L));
-        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 1, 
false, true));
-        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, 10, false, false));
-        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, 200, true, false));
+        assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id", 
1, 1, false, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new 
NormalColumn("user_id", 10, 10, false, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
NormalColumn("total_price", 50, 200, true, false));
     }
     
     @Test
@@ -129,9 +130,9 @@ class DataRecordGroupEngineTest {
         assertThat(dataRecord.getTableName(), is("order"));
         assertThat(dataRecord.getActualTableName(), is("order_0"));
         assertThat(dataRecord.getCommitTime(), is(456L));
-        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 2, 
true, true));
-        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, 10, false, false));
-        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, 200, true, false));
+        assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id", 
1, 2, true, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new 
NormalColumn("user_id", 10, 10, false, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
NormalColumn("total_price", 50, 200, true, false));
     }
     
     @Test
@@ -145,9 +146,9 @@ class DataRecordGroupEngineTest {
         assertThat(dataRecord.getTableName(), is("order"));
         assertThat(dataRecord.getActualTableName(), is("order_0"));
         assertThat(dataRecord.getCommitTime(), is(456L));
-        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 2, 
true, true));
-        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, 10, false, false));
-        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, 200, true, false));
+        assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id", 
1, 2, true, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new 
NormalColumn("user_id", 10, 10, false, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
NormalColumn("total_price", 50, 200, true, false));
     }
     
     @Test
@@ -161,9 +162,9 @@ class DataRecordGroupEngineTest {
         assertThat(dataRecord.getTableName(), is("order"));
         assertThat(dataRecord.getActualTableName(), is("order_0"));
         assertThat(dataRecord.getCommitTime(), is(456L));
-        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 3, 
true, true));
-        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, 10, false, false));
-        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, 50, false, false));
+        assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id", 
1, 3, true, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new 
NormalColumn("user_id", 10, 10, false, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
NormalColumn("total_price", 50, 50, false, false));
     }
     
     @Test
@@ -202,9 +203,9 @@ class DataRecordGroupEngineTest {
         assertThat(dataRecord.getTableName(), is("order"));
         assertThat(dataRecord.getActualTableName(), is("order_0"));
         assertThat(dataRecord.getCommitTime(), is(789L));
-        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 
null, true, true));
-        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, null, true, false));
-        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, null, true, false));
+        assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id", 
1, null, true, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new 
NormalColumn("user_id", 10, null, true, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
NormalColumn("total_price", 50, null, true, false));
     }
     
     @Test
@@ -246,9 +247,9 @@ class DataRecordGroupEngineTest {
         DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
tableName, new IngestPlaceholderPosition(), 3);
         result.setActualTableName("order_0");
         result.setCommitTime(123L);
-        result.addColumn(new Column("id", id, true, true));
-        result.addColumn(new Column("user_id", userId, true, false));
-        result.addColumn(new Column("total_price", totalPrice, true, false));
+        result.addColumn(new NormalColumn("id", id, true, true));
+        result.addColumn(new NormalColumn("user_id", userId, true, false));
+        result.addColumn(new NormalColumn("total_price", totalPrice, true, 
false));
         return result;
     }
     
@@ -268,9 +269,9 @@ class DataRecordGroupEngineTest {
         DataRecord result = new DataRecord(PipelineSQLOperationType.UPDATE, 
tableName, new IngestPlaceholderPosition(), 3);
         result.setActualTableName("order_0");
         result.setCommitTime(456L);
-        result.addColumn(new Column("id", oldId, id, 
!Objects.deepEquals(oldId, id), true));
-        result.addColumn(new Column("user_id", userId, userId, false, false));
-        result.addColumn(new Column("total_price", 50, totalPrice, 50 != 
totalPrice, false));
+        result.addColumn(new NormalColumn("id", oldId, id, 
!Objects.deepEquals(oldId, id), true));
+        result.addColumn(new NormalColumn("user_id", userId, userId, false, 
false));
+        result.addColumn(new NormalColumn("total_price", 50, totalPrice, 50 != 
totalPrice, false));
         return result;
     }
     
@@ -282,9 +283,9 @@ class DataRecordGroupEngineTest {
         DataRecord result = new DataRecord(PipelineSQLOperationType.DELETE, 
tableName, new IngestPlaceholderPosition(), 3);
         result.setActualTableName("order_0");
         result.setCommitTime(789L);
-        result.addColumn(new Column("id", id, null, true, true));
-        result.addColumn(new Column("user_id", userId, null, true, false));
-        result.addColumn(new Column("total_price", totalPrice, null, true, 
false));
+        result.addColumn(new NormalColumn("id", id, null, true, true));
+        result.addColumn(new NormalColumn("user_id", userId, null, true, 
false));
+        result.addColumn(new NormalColumn("total_price", totalPrice, null, 
true, false));
         return result;
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilderTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilderTest.java
index c1eb17d99ff..4b862672642 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilderTest.java
@@ -21,6 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperatio
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -87,18 +88,18 @@ class PipelineImportSQLBuilderTest {
     
     private DataRecord createDataRecordWithUniqueKey() {
         DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
"foo_tbl", new IngestPlaceholderPosition(), 4);
-        result.addColumn(new Column("id", "", false, true));
-        result.addColumn(new Column("foo_col", "", false, false));
+        result.addColumn(new NormalColumn("id", "", false, true));
+        result.addColumn(new NormalColumn("foo_col", "", false, false));
         for (int i = 1; i <= 3; i++) {
-            result.addColumn(new Column("col" + i, "", true, false));
+            result.addColumn(new NormalColumn("col" + i, "", true, false));
         }
         return result;
     }
     
     private DataRecord createDataRecordWithoutUniqueKey() {
         DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
"foo_tbl", new IngestPlaceholderPosition(), 4);
-        result.addColumn(new Column("id", "", false, false));
-        result.addColumn(new Column("foo_col", "", true, false));
+        result.addColumn(new NormalColumn("id", "", false, false));
+        result.addColumn(new NormalColumn("foo_col", "", true, false));
         return result;
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
index d1a6b07a172..8d1dedecfe3 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
@@ -25,20 +25,20 @@ import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLife
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.position.MySQLBinlogPosition;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.data.MySQLBinlogDataHandler;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.position.MySQLBinlogPosition;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.ConnectInfo;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.MySQLBinlogClient;
 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
@@ -151,7 +151,7 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
             DataRecord dataRecord = 
createDataRecord(PipelineSQLOperationType.INSERT, event, each.length);
             for (int i = 0; i < each.length; i++) {
                 PipelineColumnMetaData columnMetaData = 
tableMetaData.getColumnMetaData(i + 1);
-                dataRecord.addColumn(new Column(columnMetaData.getName(), 
MySQLBinlogDataHandler.handle(columnMetaData, each[i]), true, 
columnMetaData.isUniqueKey()));
+                dataRecord.addColumn(new 
NormalColumn(columnMetaData.getName(), 
MySQLBinlogDataHandler.handle(columnMetaData, each[i]), true, 
columnMetaData.isUniqueKey()));
             }
             result.add(dataRecord);
         }
@@ -169,7 +169,7 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
                 Serializable oldValue = 
MySQLBinlogDataHandler.handle(columnMetaData, beforeValues[j]);
                 Serializable newValue = 
MySQLBinlogDataHandler.handle(columnMetaData, afterValues[j]);
                 boolean updated = !Objects.deepEquals(newValue, oldValue);
-                dataRecord.addColumn(new Column(columnMetaData.getName(), 
oldValue, newValue, updated, columnMetaData.isUniqueKey()));
+                dataRecord.addColumn(new 
NormalColumn(columnMetaData.getName(), oldValue, newValue, updated, 
columnMetaData.isUniqueKey()));
             }
             result.add(dataRecord);
         }
@@ -182,7 +182,7 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
             DataRecord dataRecord = 
createDataRecord(PipelineSQLOperationType.DELETE, event, each.length);
             for (int i = 0, length = each.length; i < length; i++) {
                 PipelineColumnMetaData columnMetaData = 
tableMetaData.getColumnMetaData(i + 1);
-                dataRecord.addColumn(new Column(columnMetaData.getName(), 
MySQLBinlogDataHandler.handle(columnMetaData, each[i]), null, true, 
columnMetaData.isUniqueKey()));
+                dataRecord.addColumn(new 
NormalColumn(columnMetaData.getName(), 
MySQLBinlogDataHandler.handle(columnMetaData, each[i]), null, true, 
columnMetaData.isUniqueKey()));
             }
             result.add(dataRecord);
         }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index 3faae57270b..6d45e8198a8 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -20,8 +20,8 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -56,11 +56,11 @@ class MySQLPipelineSQLBuilderTest {
     
     private DataRecord createDataRecord() {
         DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
"foo_tbl", new IngestPlaceholderPosition(), 4);
-        result.addColumn(new Column("id", "", false, true));
-        result.addColumn(new Column("sc", "", false, false));
-        result.addColumn(new Column("c1", "", true, false));
-        result.addColumn(new Column("c2", "", true, false));
-        result.addColumn(new Column("c3", "", true, false));
+        result.addColumn(new NormalColumn("id", "", false, true));
+        result.addColumn(new NormalColumn("sc", "", false, false));
+        result.addColumn(new NormalColumn("c1", "", true, false));
+        result.addColumn(new NormalColumn("c2", "", true, false));
+        result.addColumn(new NormalColumn("c3", "", true, false));
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index 39b637fc8cd..4c17d23691a 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -20,8 +20,8 @@ package 
org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -61,11 +61,11 @@ class OpenGaussPipelineSQLBuilderTest {
     
     private DataRecord createDataRecord() {
         DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
"foo_tbl", new IngestPlaceholderPosition(), 4);
-        result.addColumn(new Column("id", "", false, true));
-        result.addColumn(new Column("c0", "", false, false));
-        result.addColumn(new Column("c1", "", true, false));
-        result.addColumn(new Column("c2", "", true, false));
-        result.addColumn(new Column("c3", "", true, false));
+        result.addColumn(new NormalColumn("id", "", false, true));
+        result.addColumn(new NormalColumn("c0", "", false, false));
+        result.addColumn(new NormalColumn("c1", "", true, false));
+        result.addColumn(new NormalColumn("c2", "", true, false));
+        result.addColumn(new NormalColumn("c3", "", true, false));
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverter.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverter.java
index 6f669e2307b..1857c9312e2 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverter.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverter.java
@@ -21,9 +21,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperatio
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
@@ -33,6 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.UpdateRowEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.WriteRowEvent;
 import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
+import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 
 import java.util.List;
 
@@ -112,7 +113,7 @@ public final class WALEventConverter {
         // TODO Unique key may be a column within unique index
         List<String> primaryKeyColumns = tableMetaData.getPrimaryKeyColumns();
         for (int i = 0; i < event.getPrimaryKeys().size(); i++) {
-            result.addColumn(new Column(primaryKeyColumns.get(i), 
event.getPrimaryKeys().get(i), null, true, true));
+            result.addColumn(new NormalColumn(primaryKeyColumns.get(i), 
event.getPrimaryKeys().get(i), null, true, true));
         }
         return result;
     }
@@ -130,7 +131,7 @@ public final class WALEventConverter {
             PipelineColumnMetaData columnMetaData = 
tableMetaData.getColumnMetaData(i + 1);
             boolean isUniqueKey = columnMetaData.isUniqueKey();
             Object uniqueKeyOldValue = isUniqueKey && 
PipelineSQLOperationType.UPDATE == dataRecord.getType() ? values.get(i) : null;
-            Column column = new Column(columnMetaData.getName(), 
uniqueKeyOldValue, values.get(i), true, isUniqueKey);
+            Column column = new NormalColumn(columnMetaData.getName(), 
uniqueKeyOldValue, values.get(i), true, isUniqueKey);
             dataRecord.addColumn(column);
         }
     }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index 5ba437ff210..dd63fce0186 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -18,8 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
 
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.WALPosition;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.decode.PostgreSQLLogSequenceNumber;
@@ -63,9 +63,9 @@ class PostgreSQLPipelineSQLBuilderTest {
     
     private DataRecord createDataRecord() {
         DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
"foo_tbl", new WALPosition(new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
-        result.addColumn(new Column("order_id", 1, true, true));
-        result.addColumn(new Column("user_id", 2, true, false));
-        result.addColumn(new Column("status", "ok", true, false));
+        result.addColumn(new NormalColumn("order_id", 1, true, true));
+        result.addColumn(new NormalColumn("user_id", 2, true, false));
+        result.addColumn(new NormalColumn("status", "ok", true, false));
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
index dcf869ea781..1c820f28b36 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
@@ -27,8 +27,8 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordR
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.Builder;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
@@ -56,24 +56,24 @@ class DataRecordResultConvertUtilsTest {
     @Test
     void assertConvertDataRecordToRecord() throws 
InvalidProtocolBufferException, SQLException {
         DataRecord dataRecord = new 
DataRecord(PipelineSQLOperationType.INSERT, "t_order", new 
IntegerPrimaryKeyIngestPosition(0L, 1L), 2);
-        dataRecord.addColumn(new Column("order_id", BigInteger.ONE, false, 
true));
-        dataRecord.addColumn(new Column("price", BigDecimal.valueOf(123L), 
false, false));
-        dataRecord.addColumn(new Column("user_id", Long.MAX_VALUE, false, 
false));
-        dataRecord.addColumn(new Column("item_id", Integer.MAX_VALUE, false, 
false));
-        dataRecord.addColumn(new Column("create_date", LocalDate.now(), false, 
false));
-        dataRecord.addColumn(new Column("create_date2", 
Date.valueOf(LocalDate.now()), false, false));
-        dataRecord.addColumn(new Column("create_time", LocalTime.now(), false, 
false));
-        dataRecord.addColumn(new Column("create_time2", OffsetTime.now(), 
false, false));
-        dataRecord.addColumn(new Column("create_datetime", 
LocalDateTime.now(), false, false));
-        dataRecord.addColumn(new Column("create_datetime2", 
OffsetDateTime.now(), false, false));
-        dataRecord.addColumn(new Column("empty", null, false, false));
+        dataRecord.addColumn(new NormalColumn("order_id", BigInteger.ONE, 
false, true));
+        dataRecord.addColumn(new NormalColumn("price", 
BigDecimal.valueOf(123L), false, false));
+        dataRecord.addColumn(new NormalColumn("user_id", Long.MAX_VALUE, 
false, false));
+        dataRecord.addColumn(new NormalColumn("item_id", Integer.MAX_VALUE, 
false, false));
+        dataRecord.addColumn(new NormalColumn("create_date", LocalDate.now(), 
false, false));
+        dataRecord.addColumn(new NormalColumn("create_date2", 
Date.valueOf(LocalDate.now()), false, false));
+        dataRecord.addColumn(new NormalColumn("create_time", LocalTime.now(), 
false, false));
+        dataRecord.addColumn(new NormalColumn("create_time2", 
OffsetTime.now(), false, false));
+        dataRecord.addColumn(new NormalColumn("create_datetime", 
LocalDateTime.now(), false, false));
+        dataRecord.addColumn(new NormalColumn("create_datetime2", 
OffsetDateTime.now(), false, false));
+        dataRecord.addColumn(new NormalColumn("empty", null, false, false));
         Blob mockedBlob = mock(Blob.class);
         when(mockedBlob.getBytes(anyLong(), anyInt())).thenReturn(new 
byte[]{-1, 0, 1});
-        dataRecord.addColumn(new Column("data_blob", mockedBlob, false, 
false));
+        dataRecord.addColumn(new NormalColumn("data_blob", mockedBlob, false, 
false));
         Clob mockedClob = mock(Clob.class);
         when(mockedClob.getSubString(anyLong(), 
anyInt())).thenReturn("clob\n");
-        dataRecord.addColumn(new Column("text_clob", mockedClob, false, 
false));
-        dataRecord.addColumn(new Column("update_time", new 
Timestamp(System.currentTimeMillis()), false, false));
+        dataRecord.addColumn(new NormalColumn("text_clob", mockedClob, false, 
false));
+        dataRecord.addColumn(new NormalColumn("update_time", new 
Timestamp(System.currentTimeMillis()), false, false));
         TypeRegistry registry = 
TypeRegistry.newBuilder().add(EmptyProto.getDescriptor().getMessageTypes()).add(TimestampProto.getDescriptor().getMessageTypes())
                 .add(WrappersProto.getDescriptor().getMessageTypes()).build();
         Record expectedRecord = 
DataRecordResultConvertUtils.convertDataRecordToRecord("test", null, 
dataRecord);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
index 763d0d186a4..53e13552ac9 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
@@ -28,9 +28,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.fixture.algorithm.FixtureTransmissionJobItemContext;
@@ -154,9 +154,9 @@ class PipelineDataSourceSinkTest {
     
     private DataRecord getUpdatePrimaryKeyDataRecord() {
         DataRecord result = new DataRecord(PipelineSQLOperationType.UPDATE, 
TABLE_NAME, new IngestPlaceholderPosition(), 3);
-        result.addColumn(new Column("id", 1, 2, true, true));
-        result.addColumn(new Column("user", 0, 10, true, false));
-        result.addColumn(new Column("status", null, 
PipelineSQLOperationType.UPDATE, true, false));
+        result.addColumn(new NormalColumn("id", 1, 2, true, true));
+        result.addColumn(new NormalColumn("user", 0, 10, true, false));
+        result.addColumn(new NormalColumn("status", null, 
PipelineSQLOperationType.UPDATE, true, false));
         return result;
     }
     
@@ -192,9 +192,9 @@ class PipelineDataSourceSinkTest {
             statusOldValue = type;
         }
         DataRecord result = new DataRecord(type, TABLE_NAME, new 
IngestPlaceholderPosition(), 3);
-        result.addColumn(new Column("id", idOldValue, idValue, false, true));
-        result.addColumn(new Column("user", userOldValue, userValue, true, 
false));
-        result.addColumn(new Column("status", statusOldValue, statusValue, 
true, false));
+        result.addColumn(new NormalColumn("id", idOldValue, idValue, false, 
true));
+        result.addColumn(new NormalColumn("user", userOldValue, userValue, 
true, false));
+        result.addColumn(new NormalColumn("status", statusOldValue, 
statusValue, true, false));
         return result;
     }
 }

Reply via email to