This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7354b448d93 Refactor pipeline Column. (#33670)
7354b448d93 is described below
commit 7354b448d93809734b7e2c3be715f9909237d50f
Author: Cong Hu <[email protected]>
AuthorDate: Fri Nov 15 15:56:49 2024 +0800
Refactor pipeline Column. (#33670)
---
.../ingest/dumper/inventory/InventoryDumper.java | 6 +--
.../data/pipeline/core/ingest/record/Column.java | 50 ++++++++++--------
.../record/{Column.java => NormalColumn.java} | 4 +-
.../ingest/record/group/DataRecordGroupEngine.java | 5 +-
.../core/ingest/record/DataRecordTest.java | 8 +--
.../core/ingest/record/RecordUtilsTest.java | 10 ++--
.../record/group/DataRecordGroupEngineTest.java | 61 +++++++++++-----------
.../sql/PipelineImportSQLBuilderTest.java | 11 ++--
.../incremental/dumper/MySQLIncrementalDumper.java | 10 ++--
.../sqlbuilder/MySQLPipelineSQLBuilderTest.java | 12 ++---
.../OpenGaussPipelineSQLBuilderTest.java | 12 ++---
.../ingest/incremental/wal/WALEventConverter.java | 7 +--
.../PostgreSQLPipelineSQLBuilderTest.java | 8 +--
.../cdc/util/DataRecordResultConvertUtilsTest.java | 30 +++++------
.../sink/type/PipelineDataSourceSinkTest.java | 14 ++---
15 files changed, 130 insertions(+), 118 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index 3df8c98344f..daab070fcce 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -28,17 +28,17 @@ import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLife
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.point.InventoryPointQueryParameter;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.InventoryQueryParameter;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.point.InventoryPointQueryParameter;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.InventoryRangeQueryParameter;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
@@ -241,7 +241,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
for (int i = 1; i <= columnCount; i++) {
String columnName = insertColumnNames.isEmpty() ?
resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName),
() -> new PipelineInvalidParameterException(String.format("Column name is %s",
columnName)));
- result.addColumn(new Column(columnName,
columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true,
tableMetaData.getColumnMetaData(columnName).isUniqueKey()));
+ result.addColumn(new NormalColumn(columnName,
columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true,
tableMetaData.getColumnMetaData(columnName).isUniqueKey()));
}
result.setActualTableName(dumperContext.getActualTableName());
return result;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
index 221a2c5b478..ff712ff9779 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
@@ -17,35 +17,43 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.record;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
/**
* Column.
*/
-@RequiredArgsConstructor
-@Getter
-public final class Column {
-
- private final String name;
+public interface Column {
/**
- * Value are available only when the primary key column is updated.
+ * Get name.
+ *
+ * @return name
*/
- private final Object oldValue;
-
- private final Object value;
+ String getName();
- private final boolean updated;
+ /**
+ * Get old value.
+ *
+ * @return old value
+ */
+ Object getOldValue();
- private final boolean uniqueKey;
+ /**
+ * Get value.
+ *
+ * @return value
+ */
+ Object getValue();
- public Column(final String name, final Object value, final boolean
updated, final boolean uniqueKey) {
- this(name, null, value, updated, uniqueKey);
- }
+ /**
+ * Judge whether the column is updated.
+ *
+ * @return true if the column is updated, otherwise false
+ */
+ boolean isUpdated();
- @Override
- public String toString() {
- return String.format("%s: oldValue=%s, value=%s", name, oldValue,
value);
- }
+ /**
+ * Judge whether the column is unique key.
+ *
+ * @return true if the column is unique key, otherwise false
+ */
+ boolean isUniqueKey();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/NormalColumn.java
similarity index 89%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/NormalColumn.java
index 221a2c5b478..692798d66c4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/Column.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/NormalColumn.java
@@ -25,7 +25,7 @@ import lombok.RequiredArgsConstructor;
*/
@RequiredArgsConstructor
@Getter
-public final class Column {
+public final class NormalColumn implements Column {
private final String name;
@@ -40,7 +40,7 @@ public final class Column {
private final boolean uniqueKey;
- public Column(final String name, final Object value, final boolean
updated, final boolean uniqueKey) {
+ public NormalColumn(final String name, final Object value, final boolean
updated, final boolean uniqueKey) {
this(name, null, value, updated, uniqueKey);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index cb176a62fa0..aae1e972858 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperatio
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
@@ -122,7 +123,7 @@ public final class DataRecordGroupEngine {
DataRecord mergedDataRecord = new
DataRecord(PipelineSQLOperationType.DELETE, dataRecord.getTableName(),
dataRecord.getPosition(), dataRecord.getColumnCount());
mergeBaseFields(dataRecord, mergedDataRecord);
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
- mergedDataRecord.addColumn(new
Column(dataRecord.getColumn(i).getName(),
+ mergedDataRecord.addColumn(new
NormalColumn(dataRecord.getColumn(i).getName(),
dataRecord.getColumn(i).isUniqueKey() ?
beforeDataRecord.getColumn(i).getOldValue() :
beforeDataRecord.getColumn(i).getValue(),
null, true, dataRecord.getColumn(i).isUniqueKey()));
}
@@ -153,7 +154,7 @@ public final class DataRecordGroupEngine {
DataRecord result = new DataRecord(type, tableName,
curDataRecord.getPosition(), curDataRecord.getColumnCount());
mergeBaseFields(curDataRecord, result);
for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
- result.addColumn(new Column(
+ result.addColumn(new NormalColumn(
curDataRecord.getColumn(i).getName(),
preDataRecord.getColumn(i).getOldValue(),
curDataRecord.getColumn(i).getValue(),
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
index c991138416c..346d4c6fb37 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
@@ -29,18 +29,18 @@ class DataRecordTest {
@Test
void assertGetKey() {
DataRecord beforeDataRecord = new
DataRecord(PipelineSQLOperationType.UPDATE, "foo_tbl", new
IngestPlaceholderPosition(), 1);
- beforeDataRecord.addColumn(new Column("id", 1, true, true));
+ beforeDataRecord.addColumn(new NormalColumn("id", 1, true, true));
DataRecord afterDataRecord = new
DataRecord(PipelineSQLOperationType.UPDATE, "foo_tbl", new
IngestPlaceholderPosition(), 1);
- afterDataRecord.addColumn(new Column("id", 2, 1, true, true));
+ afterDataRecord.addColumn(new NormalColumn("id", 2, 1, true, true));
assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getKey()));
}
@Test
void assertGetOldKey() {
DataRecord beforeDataRecord = new
DataRecord(PipelineSQLOperationType.DELETE, "foo_tbl", new
IngestPlaceholderPosition(), 1);
- beforeDataRecord.addColumn(new Column("id", 1, 2, true, true));
+ beforeDataRecord.addColumn(new NormalColumn("id", 1, 2, true, true));
DataRecord afterDataRecord = new
DataRecord(PipelineSQLOperationType.DELETE, "foo_tbl", new
IngestPlaceholderPosition(), 1);
- afterDataRecord.addColumn(new Column("id", 1, 3, true, true));
+ afterDataRecord.addColumn(new NormalColumn("id", 1, 3, true, true));
assertThat(beforeDataRecord.getOldKey(),
is(afterDataRecord.getOldKey()));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/RecordUtilsTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/RecordUtilsTest.java
index f8471e2bfde..d53f89bd72b 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/RecordUtilsTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/RecordUtilsTest.java
@@ -41,11 +41,11 @@ class RecordUtilsTest {
private DataRecord mockDataRecord() {
DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
"t2", new IngestPlaceholderPosition(), 4);
- result.addColumn(new Column("id", "", false, true));
- result.addColumn(new Column("sc", "", false, true));
- result.addColumn(new Column("c1", "", true, false));
- result.addColumn(new Column("c2", "", true, false));
- result.addColumn(new Column("c3", "", true, false));
+ result.addColumn(new NormalColumn("id", "", false, true));
+ result.addColumn(new NormalColumn("sc", "", false, true));
+ result.addColumn(new NormalColumn("c1", "", true, false));
+ result.addColumn(new NormalColumn("c2", "", true, false));
+ result.addColumn(new NormalColumn("c3", "", true, false));
return result;
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
index dca4f270d87..fc4bf4bc340 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexp
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
import org.junit.jupiter.api.Test;
@@ -73,9 +74,9 @@ class DataRecordGroupEngineTest {
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getActualTableName(), is("order_0"));
assertThat(dataRecord.getCommitTime(), is(456L));
- assertColumnsMatched(dataRecord.getColumn(0), new Column("id", null,
1, true, true));
- assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
null, 10, true, false));
- assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", null, 200, true, false));
+ assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id",
null, 1, true, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new
NormalColumn("user_id", null, 10, true, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
NormalColumn("total_price", null, 200, true, false));
}
private void assertColumnsMatched(final Column actual, final Column
expected) {
@@ -97,9 +98,9 @@ class DataRecordGroupEngineTest {
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getActualTableName(), is("order_0"));
assertThat(dataRecord.getCommitTime(), is(456L));
- assertColumnsMatched(dataRecord.getColumn(0), new Column("id", null,
2, true, true));
- assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
null, 10, true, false));
- assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", null, 50, true, false));
+ assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id",
null, 2, true, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new
NormalColumn("user_id", null, 10, true, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
NormalColumn("total_price", null, 50, true, false));
}
@Test
@@ -113,9 +114,9 @@ class DataRecordGroupEngineTest {
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getActualTableName(), is("order_0"));
assertThat(dataRecord.getCommitTime(), is(456L));
- assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 1,
false, true));
- assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
10, 10, false, false));
- assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", 50, 200, true, false));
+ assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id",
1, 1, false, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new
NormalColumn("user_id", 10, 10, false, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
NormalColumn("total_price", 50, 200, true, false));
}
@Test
@@ -129,9 +130,9 @@ class DataRecordGroupEngineTest {
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getActualTableName(), is("order_0"));
assertThat(dataRecord.getCommitTime(), is(456L));
- assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 2,
true, true));
- assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
10, 10, false, false));
- assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", 50, 200, true, false));
+ assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id",
1, 2, true, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new
NormalColumn("user_id", 10, 10, false, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
NormalColumn("total_price", 50, 200, true, false));
}
@Test
@@ -145,9 +146,9 @@ class DataRecordGroupEngineTest {
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getActualTableName(), is("order_0"));
assertThat(dataRecord.getCommitTime(), is(456L));
- assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 2,
true, true));
- assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
10, 10, false, false));
- assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", 50, 200, true, false));
+ assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id",
1, 2, true, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new
NormalColumn("user_id", 10, 10, false, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
NormalColumn("total_price", 50, 200, true, false));
}
@Test
@@ -161,9 +162,9 @@ class DataRecordGroupEngineTest {
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getActualTableName(), is("order_0"));
assertThat(dataRecord.getCommitTime(), is(456L));
- assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 3,
true, true));
- assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
10, 10, false, false));
- assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", 50, 50, false, false));
+ assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id",
1, 3, true, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new
NormalColumn("user_id", 10, 10, false, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
NormalColumn("total_price", 50, 50, false, false));
}
@Test
@@ -202,9 +203,9 @@ class DataRecordGroupEngineTest {
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getActualTableName(), is("order_0"));
assertThat(dataRecord.getCommitTime(), is(789L));
- assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1,
null, true, true));
- assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
10, null, true, false));
- assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", 50, null, true, false));
+ assertColumnsMatched(dataRecord.getColumn(0), new NormalColumn("id",
1, null, true, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new
NormalColumn("user_id", 10, null, true, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
NormalColumn("total_price", 50, null, true, false));
}
@Test
@@ -246,9 +247,9 @@ class DataRecordGroupEngineTest {
DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
tableName, new IngestPlaceholderPosition(), 3);
result.setActualTableName("order_0");
result.setCommitTime(123L);
- result.addColumn(new Column("id", id, true, true));
- result.addColumn(new Column("user_id", userId, true, false));
- result.addColumn(new Column("total_price", totalPrice, true, false));
+ result.addColumn(new NormalColumn("id", id, true, true));
+ result.addColumn(new NormalColumn("user_id", userId, true, false));
+ result.addColumn(new NormalColumn("total_price", totalPrice, true,
false));
return result;
}
@@ -268,9 +269,9 @@ class DataRecordGroupEngineTest {
DataRecord result = new DataRecord(PipelineSQLOperationType.UPDATE,
tableName, new IngestPlaceholderPosition(), 3);
result.setActualTableName("order_0");
result.setCommitTime(456L);
- result.addColumn(new Column("id", oldId, id,
!Objects.deepEquals(oldId, id), true));
- result.addColumn(new Column("user_id", userId, userId, false, false));
- result.addColumn(new Column("total_price", 50, totalPrice, 50 !=
totalPrice, false));
+ result.addColumn(new NormalColumn("id", oldId, id,
!Objects.deepEquals(oldId, id), true));
+ result.addColumn(new NormalColumn("user_id", userId, userId, false,
false));
+ result.addColumn(new NormalColumn("total_price", 50, totalPrice, 50 !=
totalPrice, false));
return result;
}
@@ -282,9 +283,9 @@ class DataRecordGroupEngineTest {
DataRecord result = new DataRecord(PipelineSQLOperationType.DELETE,
tableName, new IngestPlaceholderPosition(), 3);
result.setActualTableName("order_0");
result.setCommitTime(789L);
- result.addColumn(new Column("id", id, null, true, true));
- result.addColumn(new Column("user_id", userId, null, true, false));
- result.addColumn(new Column("total_price", totalPrice, null, true,
false));
+ result.addColumn(new NormalColumn("id", id, null, true, true));
+ result.addColumn(new NormalColumn("user_id", userId, null, true,
false));
+ result.addColumn(new NormalColumn("total_price", totalPrice, null,
true, false));
return result;
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilderTest.java
index c1eb17d99ff..4b862672642 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilderTest.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperatio
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -87,18 +88,18 @@ class PipelineImportSQLBuilderTest {
private DataRecord createDataRecordWithUniqueKey() {
DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
"foo_tbl", new IngestPlaceholderPosition(), 4);
- result.addColumn(new Column("id", "", false, true));
- result.addColumn(new Column("foo_col", "", false, false));
+ result.addColumn(new NormalColumn("id", "", false, true));
+ result.addColumn(new NormalColumn("foo_col", "", false, false));
for (int i = 1; i <= 3; i++) {
- result.addColumn(new Column("col" + i, "", true, false));
+ result.addColumn(new NormalColumn("col" + i, "", true, false));
}
return result;
}
private DataRecord createDataRecordWithoutUniqueKey() {
DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
"foo_tbl", new IngestPlaceholderPosition(), 4);
- result.addColumn(new Column("id", "", false, false));
- result.addColumn(new Column("foo_col", "", true, false));
+ result.addColumn(new NormalColumn("id", "", false, false));
+ result.addColumn(new NormalColumn("foo_col", "", true, false));
return result;
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
index d1a6b07a172..8d1dedecfe3 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
@@ -25,20 +25,20 @@ import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLife
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.position.MySQLBinlogPosition;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.data.MySQLBinlogDataHandler;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.position.MySQLBinlogPosition;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.ConnectInfo;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.MySQLBinlogClient;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
@@ -151,7 +151,7 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
DataRecord dataRecord =
createDataRecord(PipelineSQLOperationType.INSERT, event, each.length);
for (int i = 0; i < each.length; i++) {
PipelineColumnMetaData columnMetaData =
tableMetaData.getColumnMetaData(i + 1);
- dataRecord.addColumn(new Column(columnMetaData.getName(),
MySQLBinlogDataHandler.handle(columnMetaData, each[i]), true,
columnMetaData.isUniqueKey()));
+ dataRecord.addColumn(new
NormalColumn(columnMetaData.getName(),
MySQLBinlogDataHandler.handle(columnMetaData, each[i]), true,
columnMetaData.isUniqueKey()));
}
result.add(dataRecord);
}
@@ -169,7 +169,7 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
Serializable oldValue =
MySQLBinlogDataHandler.handle(columnMetaData, beforeValues[j]);
Serializable newValue =
MySQLBinlogDataHandler.handle(columnMetaData, afterValues[j]);
boolean updated = !Objects.deepEquals(newValue, oldValue);
- dataRecord.addColumn(new Column(columnMetaData.getName(),
oldValue, newValue, updated, columnMetaData.isUniqueKey()));
+ dataRecord.addColumn(new
NormalColumn(columnMetaData.getName(), oldValue, newValue, updated,
columnMetaData.isUniqueKey()));
}
result.add(dataRecord);
}
@@ -182,7 +182,7 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
DataRecord dataRecord =
createDataRecord(PipelineSQLOperationType.DELETE, event, each.length);
for (int i = 0, length = each.length; i < length; i++) {
PipelineColumnMetaData columnMetaData =
tableMetaData.getColumnMetaData(i + 1);
- dataRecord.addColumn(new Column(columnMetaData.getName(),
MySQLBinlogDataHandler.handle(columnMetaData, each[i]), null, true,
columnMetaData.isUniqueKey()));
+ dataRecord.addColumn(new
NormalColumn(columnMetaData.getName(),
MySQLBinlogDataHandler.handle(columnMetaData, each[i]), null, true,
columnMetaData.isUniqueKey()));
}
result.add(dataRecord);
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index 3faae57270b..6d45e8198a8 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -20,8 +20,8 @@ package
org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -56,11 +56,11 @@ class MySQLPipelineSQLBuilderTest {
private DataRecord createDataRecord() {
DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
"foo_tbl", new IngestPlaceholderPosition(), 4);
- result.addColumn(new Column("id", "", false, true));
- result.addColumn(new Column("sc", "", false, false));
- result.addColumn(new Column("c1", "", true, false));
- result.addColumn(new Column("c2", "", true, false));
- result.addColumn(new Column("c3", "", true, false));
+ result.addColumn(new NormalColumn("id", "", false, true));
+ result.addColumn(new NormalColumn("sc", "", false, false));
+ result.addColumn(new NormalColumn("c1", "", true, false));
+ result.addColumn(new NormalColumn("c2", "", true, false));
+ result.addColumn(new NormalColumn("c3", "", true, false));
return result;
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index 39b637fc8cd..4c17d23691a 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -20,8 +20,8 @@ package
org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -61,11 +61,11 @@ class OpenGaussPipelineSQLBuilderTest {
private DataRecord createDataRecord() {
DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
"foo_tbl", new IngestPlaceholderPosition(), 4);
- result.addColumn(new Column("id", "", false, true));
- result.addColumn(new Column("c0", "", false, false));
- result.addColumn(new Column("c1", "", true, false));
- result.addColumn(new Column("c2", "", true, false));
- result.addColumn(new Column("c3", "", true, false));
+ result.addColumn(new NormalColumn("id", "", false, true));
+ result.addColumn(new NormalColumn("c0", "", false, false));
+ result.addColumn(new NormalColumn("c1", "", true, false));
+ result.addColumn(new NormalColumn("c2", "", true, false));
+ result.addColumn(new NormalColumn("c3", "", true, false));
return result;
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverter.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverter.java
index 6f669e2307b..1857c9312e2 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverter.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverter.java
@@ -21,9 +21,9 @@ import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperatio
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
@@ -33,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.UpdateRowEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.WriteRowEvent;
import
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
+import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import java.util.List;
@@ -112,7 +113,7 @@ public final class WALEventConverter {
// TODO Unique key may be a column within unique index
List<String> primaryKeyColumns = tableMetaData.getPrimaryKeyColumns();
for (int i = 0; i < event.getPrimaryKeys().size(); i++) {
- result.addColumn(new Column(primaryKeyColumns.get(i),
event.getPrimaryKeys().get(i), null, true, true));
+ result.addColumn(new NormalColumn(primaryKeyColumns.get(i),
event.getPrimaryKeys().get(i), null, true, true));
}
return result;
}
@@ -130,7 +131,7 @@ public final class WALEventConverter {
PipelineColumnMetaData columnMetaData =
tableMetaData.getColumnMetaData(i + 1);
boolean isUniqueKey = columnMetaData.isUniqueKey();
Object uniqueKeyOldValue = isUniqueKey &&
PipelineSQLOperationType.UPDATE == dataRecord.getType() ? values.get(i) : null;
- Column column = new Column(columnMetaData.getName(),
uniqueKeyOldValue, values.get(i), true, isUniqueKey);
+ Column column = new NormalColumn(columnMetaData.getName(),
uniqueKeyOldValue, values.get(i), true, isUniqueKey);
dataRecord.addColumn(column);
}
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index 5ba437ff210..dd63fce0186 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -18,8 +18,8 @@
package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.WALPosition;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.decode.PostgreSQLLogSequenceNumber;
@@ -63,9 +63,9 @@ class PostgreSQLPipelineSQLBuilderTest {
private DataRecord createDataRecord() {
DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
"foo_tbl", new WALPosition(new
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
- result.addColumn(new Column("order_id", 1, true, true));
- result.addColumn(new Column("user_id", 2, true, false));
- result.addColumn(new Column("status", "ok", true, false));
+ result.addColumn(new NormalColumn("order_id", 1, true, true));
+ result.addColumn(new NormalColumn("user_id", 2, true, false));
+ result.addColumn(new NormalColumn("status", "ok", true, false));
return result;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
index dcf869ea781..1c820f28b36 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
@@ -27,8 +27,8 @@ import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordR
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.Builder;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
@@ -56,24 +56,24 @@ class DataRecordResultConvertUtilsTest {
@Test
void assertConvertDataRecordToRecord() throws
InvalidProtocolBufferException, SQLException {
DataRecord dataRecord = new
DataRecord(PipelineSQLOperationType.INSERT, "t_order", new
IntegerPrimaryKeyIngestPosition(0L, 1L), 2);
- dataRecord.addColumn(new Column("order_id", BigInteger.ONE, false,
true));
- dataRecord.addColumn(new Column("price", BigDecimal.valueOf(123L),
false, false));
- dataRecord.addColumn(new Column("user_id", Long.MAX_VALUE, false,
false));
- dataRecord.addColumn(new Column("item_id", Integer.MAX_VALUE, false,
false));
- dataRecord.addColumn(new Column("create_date", LocalDate.now(), false,
false));
- dataRecord.addColumn(new Column("create_date2",
Date.valueOf(LocalDate.now()), false, false));
- dataRecord.addColumn(new Column("create_time", LocalTime.now(), false,
false));
- dataRecord.addColumn(new Column("create_time2", OffsetTime.now(),
false, false));
- dataRecord.addColumn(new Column("create_datetime",
LocalDateTime.now(), false, false));
- dataRecord.addColumn(new Column("create_datetime2",
OffsetDateTime.now(), false, false));
- dataRecord.addColumn(new Column("empty", null, false, false));
+ dataRecord.addColumn(new NormalColumn("order_id", BigInteger.ONE,
false, true));
+ dataRecord.addColumn(new NormalColumn("price",
BigDecimal.valueOf(123L), false, false));
+ dataRecord.addColumn(new NormalColumn("user_id", Long.MAX_VALUE,
false, false));
+ dataRecord.addColumn(new NormalColumn("item_id", Integer.MAX_VALUE,
false, false));
+ dataRecord.addColumn(new NormalColumn("create_date", LocalDate.now(),
false, false));
+ dataRecord.addColumn(new NormalColumn("create_date2",
Date.valueOf(LocalDate.now()), false, false));
+ dataRecord.addColumn(new NormalColumn("create_time", LocalTime.now(),
false, false));
+ dataRecord.addColumn(new NormalColumn("create_time2",
OffsetTime.now(), false, false));
+ dataRecord.addColumn(new NormalColumn("create_datetime",
LocalDateTime.now(), false, false));
+ dataRecord.addColumn(new NormalColumn("create_datetime2",
OffsetDateTime.now(), false, false));
+ dataRecord.addColumn(new NormalColumn("empty", null, false, false));
Blob mockedBlob = mock(Blob.class);
when(mockedBlob.getBytes(anyLong(), anyInt())).thenReturn(new
byte[]{-1, 0, 1});
- dataRecord.addColumn(new Column("data_blob", mockedBlob, false,
false));
+ dataRecord.addColumn(new NormalColumn("data_blob", mockedBlob, false,
false));
Clob mockedClob = mock(Clob.class);
when(mockedClob.getSubString(anyLong(),
anyInt())).thenReturn("clob\n");
- dataRecord.addColumn(new Column("text_clob", mockedClob, false,
false));
- dataRecord.addColumn(new Column("update_time", new
Timestamp(System.currentTimeMillis()), false, false));
+ dataRecord.addColumn(new NormalColumn("text_clob", mockedClob, false,
false));
+ dataRecord.addColumn(new NormalColumn("update_time", new
Timestamp(System.currentTimeMillis()), false, false));
TypeRegistry registry =
TypeRegistry.newBuilder().add(EmptyProto.getDescriptor().getMessageTypes()).add(TimestampProto.getDescriptor().getMessageTypes())
.add(WrappersProto.getDescriptor().getMessageTypes()).build();
Record expectedRecord =
DataRecordResultConvertUtils.convertDataRecordToRecord("test", null,
dataRecord);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
index 763d0d186a4..53e13552ac9 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
@@ -28,9 +28,9 @@ import
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.test.it.data.pipeline.core.fixture.algorithm.FixtureTransmissionJobItemContext;
@@ -154,9 +154,9 @@ class PipelineDataSourceSinkTest {
private DataRecord getUpdatePrimaryKeyDataRecord() {
DataRecord result = new DataRecord(PipelineSQLOperationType.UPDATE,
TABLE_NAME, new IngestPlaceholderPosition(), 3);
- result.addColumn(new Column("id", 1, 2, true, true));
- result.addColumn(new Column("user", 0, 10, true, false));
- result.addColumn(new Column("status", null,
PipelineSQLOperationType.UPDATE, true, false));
+ result.addColumn(new NormalColumn("id", 1, 2, true, true));
+ result.addColumn(new NormalColumn("user", 0, 10, true, false));
+ result.addColumn(new NormalColumn("status", null,
PipelineSQLOperationType.UPDATE, true, false));
return result;
}
@@ -192,9 +192,9 @@ class PipelineDataSourceSinkTest {
statusOldValue = type;
}
DataRecord result = new DataRecord(type, TABLE_NAME, new
IngestPlaceholderPosition(), 3);
- result.addColumn(new Column("id", idOldValue, idValue, false, true));
- result.addColumn(new Column("user", userOldValue, userValue, true,
false));
- result.addColumn(new Column("status", statusOldValue, statusValue,
true, false));
+ result.addColumn(new NormalColumn("id", idOldValue, idValue, false,
true));
+ result.addColumn(new NormalColumn("user", userOldValue, userValue,
true, false));
+ result.addColumn(new NormalColumn("status", statusOldValue,
statusValue, true, false));
return result;
}
}