This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f351ceaf4 [Hotfix][connector-v2] fix SemanticXidGenerator#generateXid 
indexOutOfBounds #3701 (#3705)
f351ceaf4 is described below

commit f351ceaf4ba7276882ec4e9fb73f3cc1874ff1f0
Author: ic4y <[email protected]>
AuthorDate: Tue Dec 13 10:21:40 2022 +0800

    [Hotfix][connector-v2] fix SemanticXidGenerator#generateXid 
indexOutOfBounds #3701 (#3705)
---
 .../jdbc/internal/xa/SemanticXidGenerator.java       | 20 +++++++++++++-------
 .../jdbc/internal/xa/SemanticXidGeneratorTest.java   | 11 ++++++++---
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
index 4f51c31a5..df26d82b1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
@@ -33,9 +33,9 @@ import java.util.Arrays;
  * <ol>
  *   <li>To provide uniqueness over other jobs and apps, and other instances
  *   <li>of this job, gtrid consists of
- *   <li>job id (16 bytes)
+ *   <li>job id (32 bytes)
  *   <li>subtask index (4 bytes)
- *   <li>checkpoint id (4 bytes)
+ *   <li>checkpoint id (8 bytes)
  *   <li>bqual consists of 4 random bytes (generated using {@link 
SecureRandom})
  * </ol>
  *
@@ -65,8 +65,9 @@ class SemanticXidGenerator
     @Override
     public Xid generateXid(JobContext context, SinkWriter.Context sinkContext, 
long checkpointId) {
         byte[] jobIdBytes = context.getJobId().getBytes();
+        Arrays.fill(gtridBuffer, (byte) 0);
         checkArgument(jobIdBytes.length <= JOB_ID_BYTES);
-        System.arraycopy(jobIdBytes, 0, gtridBuffer, 0, JOB_ID_BYTES);
+        System.arraycopy(jobIdBytes, 0, gtridBuffer, 0, jobIdBytes.length);
 
         writeNumber(sinkContext.getIndexOfSubtask(), Integer.BYTES, 
gtridBuffer, JOB_ID_BYTES);
         writeNumber(checkpointId, Long.BYTES, gtridBuffer, JOB_ID_BYTES + 
Integer.BYTES);
@@ -79,13 +80,18 @@ class SemanticXidGenerator
         if (xid.getFormatId() != FORMAT_ID) {
             return false;
         }
-        int subtaskIndex = readNumber(xid.getGlobalTransactionId(), 
JOB_ID_BYTES, Integer.BYTES);
-        if (subtaskIndex != sinkContext.getIndexOfSubtask()) {
+        int xidSubtaskIndex = readNumber(xid.getGlobalTransactionId(), 
JOB_ID_BYTES, Integer.BYTES);
+        if (xidSubtaskIndex != sinkContext.getIndexOfSubtask()) {
             return false;
         }
+        byte[] xidJobIdBytes = new byte[JOB_ID_BYTES];
+        System.arraycopy(xid.getGlobalTransactionId(), 0, xidJobIdBytes, 0, 
JOB_ID_BYTES);
+
         byte[] jobIdBytes = new byte[JOB_ID_BYTES];
-        System.arraycopy(xid.getGlobalTransactionId(), 0, jobIdBytes, 0, 
JOB_ID_BYTES);
-        return Arrays.equals(jobIdBytes, context.getJobId().getBytes());
+        byte[] bytes = context.getJobId().getBytes();
+        System.arraycopy(bytes, 0, jobIdBytes, 0, bytes.length);
+
+        return Arrays.equals(jobIdBytes, xidJobIdBytes);
     }
 
     private static int readNumber(byte[] bytes, int offset, int numBytes) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
index 906c3bd33..5ff57eac1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
@@ -28,19 +28,24 @@ import org.junit.jupiter.api.Test;
 import javax.transaction.xa.Xid;
 
 class SemanticXidGeneratorTest {
-    private JobContext jobContext;
     private SemanticXidGenerator xidGenerator;
 
     @BeforeEach
     void before() {
-        jobContext = new JobContext();
         xidGenerator = new SemanticXidGenerator();
         xidGenerator.open();
     }
 
     @Test
     void testBelongsToSubtask() {
-        DefaultSinkWriterContext dc1 = new DefaultSinkWriterContext(1);
+        JobContext uuidJobContext = new JobContext();
+        check(uuidJobContext);
+        JobContext longJobContext = new JobContext(Long.MIN_VALUE);
+        check(longJobContext);
+    }
+
+    void check(JobContext jobContext){
+        DefaultSinkWriterContext dc1 = new 
DefaultSinkWriterContext(Integer.MAX_VALUE);
         Xid xid1 = xidGenerator.generateXid(jobContext, dc1, 
System.currentTimeMillis());
         Assertions.assertTrue(xidGenerator.belongsToSubtask(xid1, jobContext, 
dc1));
         Assertions.assertFalse(xidGenerator.belongsToSubtask(xid1, jobContext, 
new DefaultSinkWriterContext(2)));

Reply via email to