This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 79ca3bc112e  Notify the CDC Client if streaming stopped due to 
exception (#28998)
79ca3bc112e is described below

commit 79ca3bc112e988559513afaaa2f15478266bb8cc
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Nov 10 17:16:32 2023 +0800

     Notify the CDC Client if streaming stopped due to exception (#28998)
    
    * Add tinyint decode
    
    * Notify the client if streaming stopped due to exception
---
 .../opengauss/ingest/wal/decode/MppdbDecodingPlugin.java   |  2 +-
 .../ingest/wal/decode/MppdbDecodingPluginTest.java         | 14 ++++++++++++++
 .../pipeline/cdc/core/importer/sink/CDCSocketSink.java     |  2 ++
 .../shardingsphere/data/pipeline/cdc/core/job/CDCJob.java  |  8 +++++++-
 4 files changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
index be187823a36..8858632858a 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
@@ -163,8 +163,8 @@ public final class MppdbDecodingPlugin implements 
DecodingPlugin {
             return decodeString(data.substring(1));
         }
         switch (columnType) {
+            case "tinyint":
             case "smallint":
-                return Short.parseShort(data);
             case "integer":
                 return Integer.parseInt(data);
             case "bigint":
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
index 5b446bcfc7e..23d4bf31512 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
@@ -304,4 +304,18 @@ class MppdbDecodingPluginTest {
         Object byteaObj = actual.getAfterRow().get(0);
         assertThat(byteaObj.toString(), is("'fff' | 'faa'"));
     }
+    
+    @Test
+    void assertDecodeWitTinyint() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("INSERT");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"tinyint"});
+        tableData.setColumnsVal(new String[]{"255"});
+        ByteBuffer data = ByteBuffer.wrap(toJSON(tableData).getBytes());
+        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        Object byteaObj = actual.getAfterRow().get(0);
+        assertThat(byteaObj, is(255));
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
index d9f5ad83012..d45c14b4b8c 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
 
 import io.netty.channel.Channel;
+import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
@@ -55,6 +56,7 @@ public final class CDCSocketSink implements PipelineSink {
     
     private final ShardingSphereDatabase database;
     
+    @Getter
     private final Channel channel;
     
     private final Map<String, String> tableNameSchemaMap = new HashMap<>();
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 9858e2932e3..50e572c5a32 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -25,8 +25,10 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguratio
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
+import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
@@ -184,7 +186,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
         
         private final String identifier;
         
-        private final PipelineJobItemContext jobItemContext;
+        private final CDCJobItemContext jobItemContext;
         
         @Override
         public void onSuccess() {
@@ -200,6 +202,10 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
             log.error("onFailure, {} task execute failed.", identifier, 
throwable);
             String jobId = jobItemContext.getJobId();
             jobAPI.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
+            if (jobItemContext.getSink() instanceof CDCSocketSink) {
+                CDCSocketSink cdcSink = (CDCSocketSink) 
jobItemContext.getSink();
+                cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", 
"", throwable.getMessage()));
+            }
             PipelineJobCenter.stop(jobId);
             jobAPI.updateJobConfigurationDisabled(jobId, true);
         }

Reply via email to