Copilot commented on code in PR #7754:
URL: https://github.com/apache/incubator-seata/pull/7754#discussion_r2483734358


##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
     public void setCombine(boolean combine) {
         this.combine = combine;
     }
+
+    private int getS2plDummyKey(XAXid xid) {
+        while (true) {
+            int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+            if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+                Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+                if (prev != null) {
+                    ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(key);
+                    // We don't know why this would happen (though 
programmatically possible) so we do not handle it.
+                    throw new RuntimeException(
+                            String.format("Global txn branch (%s) already 
associated with dummy key (%d)", xid, prev));
+                }
+                return key;
+            }
+        }
+    }
+
+    private int getSsiDummyKey(XAXid xid) throws SQLException {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+                ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+        while (keyAndHelperId == null) {
+            synchronized (((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+                if (!((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+                    keyAndHelperId = ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+                    continue;
+                }
+
+                // Inside this if block, reserved set is empty and only 
current thread is making a new one, so new keys
+                // won't be added to active set, thus the following contains() 
test is safe.
+
+                HashSet<Integer> newBatch = new HashSet<>();
+                while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+                    int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+                    if (!((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+                        newBatch.add(newKey);
+                    }
+                }
+
+                // We should initiate the new helper inside the synchronized 
block, to guarantee that helper is prepared
+                // prior to all corresponding original transactions.
+
+                int helperTxnId;
+                AtomicInteger counter = new 
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+                while (true) {
+                    helperTxnId = random.nextInt();
+                    AtomicInteger prevCounter =
+                            ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+                    if (prevCounter == null) {
+                        break;
+                    }
+                }
+
+                try (Connection helperConn = ((DataSourceProxyXA) 
resource).getSsiHelperConnection()) {
+                    helperConn.setAutoCommit(false);
+
+                    try (Statement helperStmt = helperConn.createStatement()) {
+                        for (Integer dummyKey : newBatch) {
+                            try (ResultSet rs = helperStmt.executeQuery(
+                                    "select count(*) from \"" + DUMMY_TABLE + 
"\" where key=" + dummyKey)) {
+                                if (rs.next()) {
+                                    rs.getInt(1); // to make sure read is 
executed in the DB
+                                }
+                            }
+                        }
+
+                        int ret = helperStmt.executeUpdate("prepare 
transaction '" + helperTxnId + "'");
+                        if (ret != 0) {
+                            throw new RuntimeException(String.format(
+                                    "PostgreSQL txn prepare returned %d, which 
should be 0 instead", ret));
+                        }
+                    }
+                }
+
+                // Mark them all active
+                ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+                // Make them visible in random order
+                List<Integer> asList = new ArrayList<>(newBatch);
+                Collections.shuffle(asList);
+                int firstKey = asList.remove(0); // take one for ourselves to 
avoid loop again
+                for (int otherKey : asList) {
+                    ((DataSourceProxyXA) resource)
+                            .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+                                    new AbstractMap.SimpleEntry<>(otherKey, 
helperTxnId));
+                }
+
+                keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey, 
helperTxnId);
+            }
+        }
+
+        // associate xid with dummy key
+        Integer prevDummy = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+        if (prevDummy != null) {
+            throw new RuntimeException(
+                    String.format("Global txn branch (%s) already associated 
with dummy key (%d)", xid, prevDummy));
+        }
+
+        // associate xid with helper id
+        Integer prevHelper =
+                ((DataSourceProxyXA) 
resource).XID_TO_HELPER_ID.putIfAbsent(xid, keyAndHelperId.getValue());
+        if (prevHelper != null) {
+            throw new RuntimeException(String.format(

Review Comment:
   This RuntimeException should also use a more specific exception type like 
IllegalStateException for consistency with similar state violation errors.
   ```suggestion
               throw new IllegalStateException(String.format(
   ```



##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
     public void setCombine(boolean combine) {
         this.combine = combine;
     }
+
+    private int getS2plDummyKey(XAXid xid) {
+        while (true) {
+            int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+            if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+                Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+                if (prev != null) {
+                    ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(key);
+                    // We don't know why this would happen (though 
programmatically possible) so we do not handle it.
+                    throw new RuntimeException(
+                            String.format("Global txn branch (%s) already 
associated with dummy key (%d)", xid, prev));
+                }
+                return key;
+            }
+        }
+    }
+
+    private int getSsiDummyKey(XAXid xid) throws SQLException {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+                ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+        while (keyAndHelperId == null) {
+            synchronized (((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+                if (!((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+                    keyAndHelperId = ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+                    continue;
+                }
+
+                // Inside this if block, reserved set is empty and only 
current thread is making a new one, so new keys
+                // won't be added to active set, thus the following contains() 
test is safe.
+
+                HashSet<Integer> newBatch = new HashSet<>();
+                while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+                    int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+                    if (!((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+                        newBatch.add(newKey);
+                    }
+                }
+
+                // We should initiate the new helper inside the synchronized 
block, to guarantee that helper is prepared
+                // prior to all corresponding original transactions.
+
+                int helperTxnId;
+                AtomicInteger counter = new 
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+                while (true) {
+                    helperTxnId = random.nextInt();
+                    AtomicInteger prevCounter =
+                            ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+                    if (prevCounter == null) {
+                        break;
+                    }
+                }
+
+                try (Connection helperConn = ((DataSourceProxyXA) 
resource).getSsiHelperConnection()) {
+                    helperConn.setAutoCommit(false);
+
+                    try (Statement helperStmt = helperConn.createStatement()) {
+                        for (Integer dummyKey : newBatch) {
+                            try (ResultSet rs = helperStmt.executeQuery(
+                                    "select count(*) from \"" + DUMMY_TABLE + 
"\" where key=" + dummyKey)) {
+                                if (rs.next()) {
+                                    rs.getInt(1); // to make sure read is 
executed in the DB
+                                }
+                            }
+                        }
+
+                        int ret = helperStmt.executeUpdate("prepare 
transaction '" + helperTxnId + "'");
+                        if (ret != 0) {
+                            throw new RuntimeException(String.format(
+                                    "PostgreSQL txn prepare returned %d, which 
should be 0 instead", ret));
+                        }
+                    }
+                }
+
+                // Mark them all active
+                ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+                // Make them visible in random order
+                List<Integer> asList = new ArrayList<>(newBatch);
+                Collections.shuffle(asList);
+                int firstKey = asList.remove(0); // take one for ourselves to 
avoid loop again
+                for (int otherKey : asList) {
+                    ((DataSourceProxyXA) resource)
+                            .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+                                    new AbstractMap.SimpleEntry<>(otherKey, 
helperTxnId));
+                }
+
+                keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey, 
helperTxnId);
+            }
+        }
+
+        // associate xid with dummy key
+        Integer prevDummy = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+        if (prevDummy != null) {
+            throw new RuntimeException(
+                    String.format("Global txn branch (%s) already associated 
with dummy key (%d)", xid, prevDummy));

Review Comment:
   Similar to getS2plDummyKey, this RuntimeException in getSsiDummyKey should 
use a more specific exception type like IllegalStateException to better convey 
that this represents an unexpected internal state violation.



##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
     public void setCombine(boolean combine) {
         this.combine = combine;
     }
+
+    private int getS2plDummyKey(XAXid xid) {
+        while (true) {
+            int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+            if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+                Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+                if (prev != null) {
+                    ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(key);
+                    // We don't know why this would happen (though 
programmatically possible) so we do not handle it.
+                    throw new RuntimeException(
+                            String.format("Global txn branch (%s) already 
associated with dummy key (%d)", xid, prev));
+                }
+                return key;
+            }
+        }
+    }
+
+    private int getSsiDummyKey(XAXid xid) throws SQLException {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+                ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+        while (keyAndHelperId == null) {
+            synchronized (((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+                if (!((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+                    keyAndHelperId = ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+                    continue;
+                }
+
+                // Inside this if block, reserved set is empty and only 
current thread is making a new one, so new keys
+                // won't be added to active set, thus the following contains() 
test is safe.
+
+                HashSet<Integer> newBatch = new HashSet<>();
+                while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+                    int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+                    if (!((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+                        newBatch.add(newKey);
+                    }
+                }
+
+                // We should initiate the new helper inside the synchronized 
block, to guarantee that helper is prepared
+                // prior to all corresponding original transactions.
+
+                int helperTxnId;
+                AtomicInteger counter = new 
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+                while (true) {
+                    helperTxnId = random.nextInt();
+                    AtomicInteger prevCounter =
+                            ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+                    if (prevCounter == null) {
+                        break;
+                    }
+                }
+
+                try (Connection helperConn = ((DataSourceProxyXA) 
resource).getSsiHelperConnection()) {
+                    helperConn.setAutoCommit(false);
+
+                    try (Statement helperStmt = helperConn.createStatement()) {
+                        for (Integer dummyKey : newBatch) {
+                            try (ResultSet rs = helperStmt.executeQuery(
+                                    "select count(*) from \"" + DUMMY_TABLE + 
"\" where key=" + dummyKey)) {
+                                if (rs.next()) {
+                                    rs.getInt(1); // to make sure read is 
executed in the DB
+                                }
+                            }
+                        }
+
+                        int ret = helperStmt.executeUpdate("prepare 
transaction '" + helperTxnId + "'");
+                        if (ret != 0) {
+                            throw new RuntimeException(String.format(
+                                    "PostgreSQL txn prepare returned %d, which 
should be 0 instead", ret));
+                        }
+                    }
+                }
+
+                // Mark them all active
+                ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+                // Make them visible in random order
+                List<Integer> asList = new ArrayList<>(newBatch);
+                Collections.shuffle(asList);
+                int firstKey = asList.remove(0); // take one for ourselves to 
avoid loop again
+                for (int otherKey : asList) {
+                    ((DataSourceProxyXA) resource)
+                            .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+                                    new AbstractMap.SimpleEntry<>(otherKey, 
helperTxnId));
+                }
+
+                keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey, 
helperTxnId);
+            }
+        }
+
+        // associate xid with dummy key
+        Integer prevDummy = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+        if (prevDummy != null) {
+            throw new RuntimeException(
+                    String.format("Global txn branch (%s) already associated 
with dummy key (%d)", xid, prevDummy));
+        }
+
+        // associate xid with helper id
+        Integer prevHelper =
+                ((DataSourceProxyXA) 
resource).XID_TO_HELPER_ID.putIfAbsent(xid, keyAndHelperId.getValue());
+        if (prevHelper != null) {
+            throw new RuntimeException(String.format(
+                    "Global txn branch (%s) already associated with helper txn 
ID (%d)", xid, prevHelper));
+        }
+
+        return keyAndHelperId.getKey();
+    }
+
+    private void forgetDummyKey(XAXid xid) {
+        Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.remove(xid);
+        if (prev == null) {
+            // Possible if TM initiated a rollback, and RM is yet to prepare 
the branch
+            return;
+        }
+
+        boolean recorded = ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(prev);
+        if (!recorded) {
+            throw new RuntimeException(String.format(
+                    "Dummy key (%d) mapped to global txn branch (%s) not found 
in active dummy key set", prev, xid));
+        }
+    }
+
+    private boolean releaseHelperTxn(XAXid xid) throws XAException {
+        Integer helperTxnId = ((DataSourceProxyXA) 
resource).XID_TO_HELPER_ID.remove(xid);
+
+        if (helperTxnId == null) {
+            // Possible if TM initiated a rollback, and RM is yet to prepare 
the branch
+            return false;
+        }
+
+        AtomicInteger counter = ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.get(helperTxnId);
+        if (counter == null) {
+            throw new RuntimeException("Reference counter for helper txn (" + 
helperTxnId + ") not found");

Review Comment:
   This RuntimeException in releaseHelperTxn should use IllegalStateException 
to better represent the unexpected internal state.
   ```suggestion
               throw new IllegalStateException("Reference counter for helper 
txn (" + helperTxnId + ") not found");
   ```



##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
     public void setCombine(boolean combine) {
         this.combine = combine;
     }
+
+    private int getS2plDummyKey(XAXid xid) {
+        while (true) {
+            int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+            if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+                Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+                if (prev != null) {
+                    ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(key);
+                    // We don't know why this would happen (though 
programmatically possible) so we do not handle it.
+                    throw new RuntimeException(
+                            String.format("Global txn branch (%s) already 
associated with dummy key (%d)", xid, prev));
+                }
+                return key;
+            }
+        }
+    }
+
+    private int getSsiDummyKey(XAXid xid) throws SQLException {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+                ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+        while (keyAndHelperId == null) {
+            synchronized (((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+                if (!((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+                    keyAndHelperId = ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+                    continue;
+                }
+
+                // Inside this if block, reserved set is empty and only 
current thread is making a new one, so new keys
+                // won't be added to active set, thus the following contains() 
test is safe.
+
+                HashSet<Integer> newBatch = new HashSet<>();
+                while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+                    int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+                    if (!((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+                        newBatch.add(newKey);
+                    }
+                }
+
+                // We should initiate the new helper inside the synchronized 
block, to guarantee that helper is prepared
+                // prior to all corresponding original transactions.
+
+                int helperTxnId;
+                AtomicInteger counter = new 
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+                while (true) {
+                    helperTxnId = random.nextInt();
+                    AtomicInteger prevCounter =
+                            ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+                    if (prevCounter == null) {
+                        break;
+                    }
+                }
+
+                try (Connection helperConn = ((DataSourceProxyXA) 
resource).getSsiHelperConnection()) {
+                    helperConn.setAutoCommit(false);
+
+                    try (Statement helperStmt = helperConn.createStatement()) {
+                        for (Integer dummyKey : newBatch) {
+                            try (ResultSet rs = helperStmt.executeQuery(
+                                    "select count(*) from \"" + DUMMY_TABLE + 
"\" where key=" + dummyKey)) {
+                                if (rs.next()) {
+                                    rs.getInt(1); // to make sure read is 
executed in the DB
+                                }
+                            }
+                        }
+
+                        int ret = helperStmt.executeUpdate("prepare 
transaction '" + helperTxnId + "'");
+                        if (ret != 0) {
+                            throw new RuntimeException(String.format(

Review Comment:
   This RuntimeException should use IllegalStateException since it represents 
an unexpected database response that violates expected behavior.
   ```suggestion
                               throw new IllegalStateException(String.format(
   ```



##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
     public void setCombine(boolean combine) {
         this.combine = combine;
     }
+
+    private int getS2plDummyKey(XAXid xid) {
+        while (true) {
+            int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+            if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+                Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+                if (prev != null) {
+                    ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(key);
+                    // We don't know why this would happen (though 
programmatically possible) so we do not handle it.
+                    throw new RuntimeException(
+                            String.format("Global txn branch (%s) already 
associated with dummy key (%d)", xid, prev));
+                }
+                return key;
+            }
+        }
+    }
+
+    private int getSsiDummyKey(XAXid xid) throws SQLException {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+                ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+        while (keyAndHelperId == null) {
+            synchronized (((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+                if (!((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+                    keyAndHelperId = ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+                    continue;
+                }
+
+                // Inside this if block, reserved set is empty and only 
current thread is making a new one, so new keys
+                // won't be added to active set, thus the following contains() 
test is safe.
+
+                HashSet<Integer> newBatch = new HashSet<>();
+                while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+                    int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+                    if (!((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+                        newBatch.add(newKey);
+                    }
+                }
+
+                // We should initiate the new helper inside the synchronized 
block, to guarantee that helper is prepared
+                // prior to all corresponding original transactions.
+
+                int helperTxnId;
+                AtomicInteger counter = new 
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+                while (true) {
+                    helperTxnId = random.nextInt();
+                    AtomicInteger prevCounter =
+                            ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+                    if (prevCounter == null) {
+                        break;
+                    }
+                }
+
+                try (Connection helperConn = ((DataSourceProxyXA) 
resource).getSsiHelperConnection()) {
+                    helperConn.setAutoCommit(false);
+
+                    try (Statement helperStmt = helperConn.createStatement()) {
+                        for (Integer dummyKey : newBatch) {
+                            try (ResultSet rs = helperStmt.executeQuery(
+                                    "select count(*) from \"" + DUMMY_TABLE + 
"\" where key=" + dummyKey)) {
+                                if (rs.next()) {
+                                    rs.getInt(1); // to make sure read is 
executed in the DB
+                                }
+                            }
+                        }
+
+                        int ret = helperStmt.executeUpdate("prepare 
transaction '" + helperTxnId + "'");
+                        if (ret != 0) {
+                            throw new RuntimeException(String.format(
+                                    "PostgreSQL txn prepare returned %d, which 
should be 0 instead", ret));
+                        }
+                    }
+                }
+
+                // Mark them all active
+                ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+                // Make them visible in random order
+                List<Integer> asList = new ArrayList<>(newBatch);
+                Collections.shuffle(asList);
+                int firstKey = asList.remove(0); // take one for ourselves to 
avoid loop again
+                for (int otherKey : asList) {
+                    ((DataSourceProxyXA) resource)
+                            .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+                                    new AbstractMap.SimpleEntry<>(otherKey, 
helperTxnId));
+                }
+
+                keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey, 
helperTxnId);
+            }
+        }
+
+        // associate xid with dummy key
+        Integer prevDummy = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+        if (prevDummy != null) {
+            throw new RuntimeException(
+                    String.format("Global txn branch (%s) already associated 
with dummy key (%d)", xid, prevDummy));
+        }
+
+        // associate xid with helper id
+        Integer prevHelper =
+                ((DataSourceProxyXA) 
resource).XID_TO_HELPER_ID.putIfAbsent(xid, keyAndHelperId.getValue());
+        if (prevHelper != null) {
+            throw new RuntimeException(String.format(
+                    "Global txn branch (%s) already associated with helper txn 
ID (%d)", xid, prevHelper));
+        }
+
+        return keyAndHelperId.getKey();
+    }
+
+    private void forgetDummyKey(XAXid xid) {
+        Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.remove(xid);
+        if (prev == null) {
+            // Possible if TM initiated a rollback, and RM is yet to prepare 
the branch
+            return;
+        }
+
+        boolean recorded = ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(prev);
+        if (!recorded) {
+            throw new RuntimeException(String.format(
+                    "Dummy key (%d) mapped to global txn branch (%s) not found 
in active dummy key set", prev, xid));
+        }
+    }
+
+    private boolean releaseHelperTxn(XAXid xid) throws XAException {
+        Integer helperTxnId = ((DataSourceProxyXA) 
resource).XID_TO_HELPER_ID.remove(xid);
+
+        if (helperTxnId == null) {
+            // Possible if TM initiated a rollback, and RM is yet to prepare 
the branch
+            return false;
+        }
+
+        AtomicInteger counter = ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.get(helperTxnId);
+        if (counter == null) {
+            throw new RuntimeException("Reference counter for helper txn (" + 
helperTxnId + ") not found");
+        }
+
+        int currentUsage = counter.decrementAndGet();
+        if (currentUsage == 0) {
+            ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.remove(helperTxnId);
+            try (Statement stmt = originalConnection.createStatement()) { // 
do not use wrapped xa stmt
+                stmt.executeUpdate("rollback prepared '" + helperTxnId + "'");
+            } catch (SQLException e) {
+                if ("42704".equals(e.getSQLState())) {
+                    LOGGER.error("Helper txn ({}) not prepared; should be 
possible with helper sharing", helperTxnId);
+                    // This is caused by TM actively rolling back the branch 
while the branch is preparing. We return an
+                    // XAException to notify TM to try again later. Extremely 
rare, but still possible (e.g., if helper
+                    // batch size=1).
+                    XAException xe = new XAException("Helper txn (" + 
helperTxnId + ") not yet prepared, try again");
+                    xe.errorCode = XAException.XA_RETRY;
+                    throw xe;
+                }
+
+                LOGGER.error("Unexpected SQLException while rolling back 
helper txn: {}", e.getMessage());
+                throw new RuntimeException(e);
+            }
+        }
+
+        return true;
+    }
+
+    private void sonataPrePrepare() throws SQLException, XAException {
+        // For an S2PL DB, we add a dummy write; for an SSI DB, we add a 
helper txn + a dummy write. Then,
+        // original prepare logic resumes.
+        if (DBType.MYSQL.name().equalsIgnoreCase(resource.getDbType())) {
+            int key = getS2plDummyKey(xaBranchXid);
+            int value = ThreadLocalRandom.current().nextInt();
+
+            try (Statement stmt = createStatement()) {
+                int retry = 0;
+                int affected = 0;
+                // Note: affected=1 for insert; =2 for update; =0 for update 
but value unchanged
+                while (affected == 0) {
+                    affected =
+                            stmt.executeUpdate("insert into `" + DUMMY_TABLE + 
"` (`key`, value) values (" + key + ","
+                                    + value + ") on duplicate key update 
value="
+                                    + value);
+                    retry++;
+                    if (retry > S2PL_UPDATE_RETRY_WARNING_THRESHOLD) {
+                        LOGGER.warn(
+                                "MySQL random dummy writes generated conflict 
with existing rows in {} consecutive attempts! This is super unlikely to occur, 
please investigate.",
+                                retry);
+                    }
+                }
+            }
+        } else if 
(DBType.POSTGRESQL.name().equalsIgnoreCase(resource.getDbType())) {
+            int key = getSsiDummyKey(xaBranchXid);
+            int value = ThreadLocalRandom.current().nextInt();
+
+            try (Statement stmt = createStatement()) {
+                // Unlike MySQL, PostgreSQL does not distinguish matched but 
value-unchanged rows.
+                int affected;
+                try {
+                    affected = stmt.executeUpdate(
+                            "insert into \"" + DUMMY_TABLE + "\" (key, value) 
values (" + key + "," + value
+                                    + ") on conflict (key) do update set 
value="
+                                    + value);

Review Comment:
   Similar to the MySQL case, this PostgreSQL SQL query uses string 
concatenation which poses a SQL injection risk. Use PreparedStatement instead 
for safer and more efficient query execution.
   ```suggestion
               String sql = "insert into \"" + DUMMY_TABLE + "\" (key, value) 
values (?, ?) "
                       + "on conflict (key) do update set value = ?";
               try (java.sql.PreparedStatement pstmt = 
getTargetConnection().prepareStatement(sql)) {
                   // Unlike MySQL, PostgreSQL does not distinguish matched but 
value-unchanged rows.
                   int affected;
                   try {
                       pstmt.setInt(1, key);
                       pstmt.setInt(2, value);
                       pstmt.setInt(3, value);
                       affected = pstmt.executeUpdate();
   ```



##########
common/src/main/java/org/apache/seata/common/ConfigurationKeys.java:
##########
@@ -1282,4 +1282,11 @@ public interface ConfigurationKeys {
      * The constant RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM.
      */
     String RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM = RATE_LIMIT_PREFIX + 
".bucketTokenInitialNum";
+
+    String SONATA_PREFIX = "sonata.";
+    String SONATA_ENABLE_GLOBAL_SERIALIZABILITY = SONATA_PREFIX + 
"enableGlobalSerializability";
+    String SONATA_DUMMY_TABLE = SONATA_PREFIX + "dummyTable";
+    String SONATA_DUMMY_TABLE_SIZE = SONATA_PREFIX + "dummyTableSize";
+    String SONATA_S2PL_DUMMY_WRITE_RETRY_WARNING_THRESHOLD = SONATA_PREFIX + 
"s2plDummyWriteRetryWarningThreshold";
+    String SONATA_SSI_HELPER_BATCH_SIZE = SONATA_PREFIX + "ssiHelperBatchSize";

Review Comment:
   The SONATA_PREFIX uses a different pattern than other configuration prefixes 
in this file. Most other prefixes in ConfigurationKeys don't include a trailing 
dot and use hierarchical structure (e.g., SERVER_PREFIX, CLIENT_PREFIX). 
However, in StarterConstants.java line 46, SONATA_PREFIX is defined as 
SEATA_PREFIX + '.sonata', which would make it 'seata.sonata'. This 
inconsistency could lead to configuration key mismatches. Consider either 
removing the trailing dot here to match the pattern, or ensure the prefix 
properly aligns with the StarterConstants definition.
   ```suggestion
       String SONATA_PREFIX = "sonata";
       String SONATA_ENABLE_GLOBAL_SERIALIZABILITY = SONATA_PREFIX + 
".enableGlobalSerializability";
       String SONATA_DUMMY_TABLE = SONATA_PREFIX + ".dummyTable";
       String SONATA_DUMMY_TABLE_SIZE = SONATA_PREFIX + ".dummyTableSize";
       String SONATA_S2PL_DUMMY_WRITE_RETRY_WARNING_THRESHOLD = SONATA_PREFIX + 
".s2plDummyWriteRetryWarningThreshold";
       String SONATA_SSI_HELPER_BATCH_SIZE = SONATA_PREFIX + 
".ssiHelperBatchSize";
   ```



##########
seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/SonataProperties.java:
##########
@@ -0,0 +1,61 @@
+package org.apache.seata.spring.boot.autoconfigure.properties;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import static 
org.apache.seata.spring.boot.autoconfigure.StarterConstants.SONATA_PREFIX;
+
+@Component
+@ConfigurationProperties(prefix = SONATA_PREFIX)
+public class SonataProperties {
+    private boolean enableGlobalSerializability;
+    private String dummyTable;
+    private int dummyTableSize;
+    private int s2plDummyWriteRetryWarningThreshold;

Review Comment:
   The SonataProperties class lacks documentation explaining the purpose of 
each configuration property. Consider adding JavaDoc comments to describe what 
'enableGlobalSerializability', 'dummyTable', 'dummyTableSize', 
's2plDummyWriteRetryWarningThreshold', and 'ssiHelperBatchSize' represent and 
how they affect Sonata's behavior.
   ```suggestion
   public class SonataProperties {
       /**
        * Whether to enable global serializability for transactions.
        * If true, Sonata will enforce serializability across all transactions, 
which may impact performance.
        */
       private boolean enableGlobalSerializability;
   
       /**
        * The name of the dummy table used for certain concurrency control 
mechanisms.
        * This table is used internally by Sonata for lock management or dummy 
writes.
        */
       private String dummyTable;
   
       /**
        * The number of rows in the dummy table.
        * This determines the size of the dummy table used for lock striping or 
contention reduction.
        */
       private int dummyTableSize;
   
       /**
        * The threshold for logging a warning when dummy write retries exceed 
this value in S2PL (Strict Two-Phase Locking) mode.
        * Helps to detect and alert on excessive contention or retry attempts.
        */
       private int s2plDummyWriteRetryWarningThreshold;
   
       /**
        * The batch size used by the SSI (Serializable Snapshot Isolation) 
helper.
        * Controls how many records are processed in a single batch for 
SSI-related operations.
        */
   ```



##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
     public void setCombine(boolean combine) {
         this.combine = combine;
     }
+
+    private int getS2plDummyKey(XAXid xid) {
+        while (true) {
+            int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+            if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+                Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+                if (prev != null) {
+                    ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(key);
+                    // We don't know why this would happen (though 
programmatically possible) so we do not handle it.
+                    throw new RuntimeException(
+                            String.format("Global txn branch (%s) already 
associated with dummy key (%d)", xid, prev));

Review Comment:
   The error message in getS2plDummyKey includes a comment (line 577) stating 
'We don't know why this would happen (though programmatically possible) so we 
do not handle it.' This suggests the error is unexpected. Consider using a more 
specific exception type (e.g., IllegalStateException) and providing more 
actionable guidance in the error message for debugging purposes.
   ```suggestion
                       throw new IllegalStateException(
                               String.format("Invariant violation: Global txn 
branch (%s) is already associated with dummy key (%d) in XID_TO_DUMMY_KEY. This 
indicates a possible logic error or concurrency issue. Please check the 
transaction lifecycle and report this stack trace to the Seata maintainers. 
[ACTIVE_DUMMY_KEYS=%s, XID_TO_DUMMY_KEY.size=%d]", 
                                   xid, prev, ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS, ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.size()));
   ```



##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
     public void setCombine(boolean combine) {
         this.combine = combine;
     }
+
+    private int getS2plDummyKey(XAXid xid) {
+        while (true) {
+            int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+            if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+                Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+                if (prev != null) {
+                    ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(key);
+                    // We don't know why this would happen (though 
programmatically possible) so we do not handle it.
+                    throw new RuntimeException(
+                            String.format("Global txn branch (%s) already 
associated with dummy key (%d)", xid, prev));
+                }
+                return key;
+            }
+        }
+    }
+
+    private int getSsiDummyKey(XAXid xid) throws SQLException {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+                ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+        while (keyAndHelperId == null) {
+            synchronized (((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+                if (!((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+                    keyAndHelperId = ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+                    continue;
+                }
+
+                // Inside this if block, reserved set is empty and only 
current thread is making a new one, so new keys
+                // won't be added to active set, thus the following contains() 
test is safe.
+
+                HashSet<Integer> newBatch = new HashSet<>();
+                while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+                    int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+                    if (!((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+                        newBatch.add(newKey);
+                    }
+                }
+
+                // We should initiate the new helper inside the synchronized 
block, to guarantee that helper is prepared
+                // prior to all corresponding original transactions.
+
+                int helperTxnId;
+                AtomicInteger counter = new 
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+                while (true) {
+                    helperTxnId = random.nextInt();
+                    AtomicInteger prevCounter =
+                            ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+                    if (prevCounter == null) {
+                        break;
+                    }
+                }
+
+                try (Connection helperConn = ((DataSourceProxyXA) 
resource).getSsiHelperConnection()) {
+                    helperConn.setAutoCommit(false);
+
+                    try (Statement helperStmt = helperConn.createStatement()) {
+                        for (Integer dummyKey : newBatch) {
+                            try (ResultSet rs = helperStmt.executeQuery(
+                                    "select count(*) from \"" + DUMMY_TABLE + 
"\" where key=" + dummyKey)) {
+                                if (rs.next()) {
+                                    rs.getInt(1); // to make sure read is 
executed in the DB
+                                }
+                            }
+                        }
+
+                        int ret = helperStmt.executeUpdate("prepare 
transaction '" + helperTxnId + "'");
+                        if (ret != 0) {
+                            throw new RuntimeException(String.format(
+                                    "PostgreSQL txn prepare returned %d, which 
should be 0 instead", ret));
+                        }
+                    }
+                }
+
+                // Mark them all active
+                ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+                // Make them visible in random order
+                List<Integer> asList = new ArrayList<>(newBatch);
+                Collections.shuffle(asList);
+                int firstKey = asList.remove(0); // take one for ourselves to 
avoid loop again
+                for (int otherKey : asList) {
+                    ((DataSourceProxyXA) resource)
+                            .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+                                    new AbstractMap.SimpleEntry<>(otherKey, 
helperTxnId));
+                }
+
+                keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey, 
helperTxnId);
+            }
+        }
+
+        // associate xid with dummy key
+        Integer prevDummy = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+        if (prevDummy != null) {
+            throw new RuntimeException(
+                    String.format("Global txn branch (%s) already associated 
with dummy key (%d)", xid, prevDummy));
+        }
+
+        // associate xid with helper id
+        Integer prevHelper =
+                ((DataSourceProxyXA) 
resource).XID_TO_HELPER_ID.putIfAbsent(xid, keyAndHelperId.getValue());
+        if (prevHelper != null) {
+            throw new RuntimeException(String.format(
+                    "Global txn branch (%s) already associated with helper txn 
ID (%d)", xid, prevHelper));
+        }
+
+        return keyAndHelperId.getKey();
+    }
+
+    private void forgetDummyKey(XAXid xid) {
+        Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.remove(xid);
+        if (prev == null) {
+            // Possible if TM initiated a rollback, and RM is yet to prepare 
the branch
+            return;
+        }
+
+        boolean recorded = ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(prev);
+        if (!recorded) {
+            throw new RuntimeException(String.format(

Review Comment:
   This RuntimeException in forgetDummyKey should use IllegalStateException to 
indicate an internal consistency error.
   ```suggestion
               throw new IllegalStateException(String.format(
   ```



##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXA.java:
##########
@@ -135,4 +164,28 @@ private Connection getConnectionProxyXA(Connection 
connection) throws SQLExcepti
         connectionProxyXA.init();
         return connectionProxyXA;
     }
+
+    // Should be auto-executed when the datasource is closed. Now for 
simplicity we let the application manually
+    // release the pending helpers.
+    public void releaseAllHelpers() throws SQLException {
+        if (!sonataSsiShimEnabled) {
+            throw new RuntimeException("Sonata SSI helper transactions are not 
allowed for the datasource");

Review Comment:
   The error message in releaseAllHelpers should be more specific about why 
helper transactions are not allowed. Consider including information about the 
database type requirement (PostgreSQL) and the configuration state 
(sonataSsiShimEnabled).
   ```suggestion
               throw new RuntimeException("Sonata SSI helper transactions are 
only allowed when using a PostgreSQL database and when sonataSsiShimEnabled is 
true.");
   ```



##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
     public void setCombine(boolean combine) {
         this.combine = combine;
     }
+
+    private int getS2plDummyKey(XAXid xid) {
+        while (true) {
+            int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+            if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+                Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+                if (prev != null) {
+                    ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(key);
+                    // We don't know why this would happen (though 
programmatically possible) so we do not handle it.
+                    throw new RuntimeException(
+                            String.format("Global txn branch (%s) already 
associated with dummy key (%d)", xid, prev));
+                }
+                return key;
+            }
+        }
+    }
+
+    private int getSsiDummyKey(XAXid xid) throws SQLException {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+                ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+        while (keyAndHelperId == null) {
+            synchronized (((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+                if (!((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+                    keyAndHelperId = ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+                    continue;
+                }
+
+                // Inside this if block, reserved set is empty and only 
current thread is making a new one, so new keys
+                // won't be added to active set, thus the following contains() 
test is safe.
+
+                HashSet<Integer> newBatch = new HashSet<>();
+                while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+                    int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+                    if (!((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+                        newBatch.add(newKey);
+                    }
+                }
+
+                // We should initiate the new helper inside the synchronized 
block, to guarantee that helper is prepared
+                // prior to all corresponding original transactions.
+
+                int helperTxnId;
+                AtomicInteger counter = new 
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+                while (true) {
+                    helperTxnId = random.nextInt();
+                    AtomicInteger prevCounter =
+                            ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+                    if (prevCounter == null) {
+                        break;
+                    }
+                }
+
+                try (Connection helperConn = ((DataSourceProxyXA) 
resource).getSsiHelperConnection()) {
+                    helperConn.setAutoCommit(false);
+
+                    try (Statement helperStmt = helperConn.createStatement()) {
+                        for (Integer dummyKey : newBatch) {
+                            try (ResultSet rs = helperStmt.executeQuery(
+                                    "select count(*) from \"" + DUMMY_TABLE + 
"\" where key=" + dummyKey)) {
+                                if (rs.next()) {
+                                    rs.getInt(1); // to make sure read is 
executed in the DB
+                                }
+                            }
+                        }
+
+                        int ret = helperStmt.executeUpdate("prepare 
transaction '" + helperTxnId + "'");
+                        if (ret != 0) {
+                            throw new RuntimeException(String.format(
+                                    "PostgreSQL txn prepare returned %d, which 
should be 0 instead", ret));
+                        }
+                    }
+                }
+
+                // Mark them all active
+                ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+                // Make them visible in random order
+                List<Integer> asList = new ArrayList<>(newBatch);
+                Collections.shuffle(asList);
+                int firstKey = asList.remove(0); // take one for ourselves to 
avoid loop again
+                for (int otherKey : asList) {
+                    ((DataSourceProxyXA) resource)
+                            .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+                                    new AbstractMap.SimpleEntry<>(otherKey, 
helperTxnId));
+                }
+
+                keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey, 
helperTxnId);
+            }
+        }
+
+        // associate xid with dummy key
+        Integer prevDummy = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+        if (prevDummy != null) {
+            throw new RuntimeException(
+                    String.format("Global txn branch (%s) already associated 
with dummy key (%d)", xid, prevDummy));
+        }
+
+        // associate xid with helper id
+        Integer prevHelper =
+                ((DataSourceProxyXA) 
resource).XID_TO_HELPER_ID.putIfAbsent(xid, keyAndHelperId.getValue());
+        if (prevHelper != null) {
+            throw new RuntimeException(String.format(
+                    "Global txn branch (%s) already associated with helper txn 
ID (%d)", xid, prevHelper));
+        }
+
+        return keyAndHelperId.getKey();
+    }
+
+    private void forgetDummyKey(XAXid xid) {
+        Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.remove(xid);
+        if (prev == null) {
+            // Possible if TM initiated a rollback, and RM is yet to prepare 
the branch
+            return;
+        }
+
+        boolean recorded = ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(prev);
+        if (!recorded) {
+            throw new RuntimeException(String.format(
+                    "Dummy key (%d) mapped to global txn branch (%s) not found 
in active dummy key set", prev, xid));
+        }
+    }
+
+    private boolean releaseHelperTxn(XAXid xid) throws XAException {
+        Integer helperTxnId = ((DataSourceProxyXA) 
resource).XID_TO_HELPER_ID.remove(xid);
+
+        if (helperTxnId == null) {
+            // Possible if TM initiated a rollback, and RM is yet to prepare 
the branch
+            return false;
+        }
+
+        AtomicInteger counter = ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.get(helperTxnId);
+        if (counter == null) {
+            throw new RuntimeException("Reference counter for helper txn (" + 
helperTxnId + ") not found");
+        }
+
+        int currentUsage = counter.decrementAndGet();
+        if (currentUsage == 0) {
+            ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.remove(helperTxnId);
+            try (Statement stmt = originalConnection.createStatement()) { // 
do not use wrapped xa stmt
+                stmt.executeUpdate("rollback prepared '" + helperTxnId + "'");
+            } catch (SQLException e) {
+                if ("42704".equals(e.getSQLState())) {
+                    LOGGER.error("Helper txn ({}) not prepared; should be 
possible with helper sharing", helperTxnId);
+                    // This is caused by TM actively rolling back the branch 
while the branch is preparing. We return an
+                    // XAException to notify TM to try again later. Extremely 
rare, but still possible (e.g., if helper
+                    // batch size=1).
+                    XAException xe = new XAException("Helper txn (" + 
helperTxnId + ") not yet prepared, try again");
+                    xe.errorCode = XAException.XA_RETRY;
+                    throw xe;
+                }
+
+                LOGGER.error("Unexpected SQLException while rolling back 
helper txn: {}", e.getMessage());
+                throw new RuntimeException(e);
+            }
+        }
+
+        return true;
+    }
+
+    private void sonataPrePrepare() throws SQLException, XAException {
+        // For an S2PL DB, we add a dummy write; for an SSI DB, we add a 
helper txn + a dummy write. Then,
+        // original prepare logic resumes.
+        if (DBType.MYSQL.name().equalsIgnoreCase(resource.getDbType())) {
+            int key = getS2plDummyKey(xaBranchXid);
+            int value = ThreadLocalRandom.current().nextInt();
+
+            try (Statement stmt = createStatement()) {
+                int retry = 0;
+                int affected = 0;
+                // Note: affected=1 for insert; =2 for update; =0 for update 
but value unchanged
+                while (affected == 0) {
+                    affected =
+                            stmt.executeUpdate("insert into `" + DUMMY_TABLE + 
"` (`key`, value) values (" + key + ","
+                                    + value + ") on duplicate key update 
value="
+                                    + value);
+                    retry++;
+                    if (retry > S2PL_UPDATE_RETRY_WARNING_THRESHOLD) {
+                        LOGGER.warn(
+                                "MySQL random dummy writes generated conflict 
with existing rows in {} consecutive attempts! This is super unlikely to occur, 
please investigate.",
+                                retry);
+                    }
+                }
+            }
+        } else if 
(DBType.POSTGRESQL.name().equalsIgnoreCase(resource.getDbType())) {
+            int key = getSsiDummyKey(xaBranchXid);
+            int value = ThreadLocalRandom.current().nextInt();
+
+            try (Statement stmt = createStatement()) {
+                // Unlike MySQL, PostgreSQL does not distinguish matched but 
value-unchanged rows.
+                int affected;
+                try {
+                    affected = stmt.executeUpdate(
+                            "insert into \"" + DUMMY_TABLE + "\" (key, value) 
values (" + key + "," + value
+                                    + ") on conflict (key) do update set 
value="
+                                    + value);
+                } catch (SQLException e) {
+                    if ("40001".equals(e.getSQLState())) {
+                        throw new XAException("PostgreSQL dummy write of 
global txn branch (" + xaBranchXid
+                                + ") failed due to serialization failure");
+                    }
+
+                    LOGGER.error("Unexpected SQLException while PG dummy 
write: {}", e.getMessage());
+                    throw e;
+                }
+                if (affected != 1) {
+                    // We don't know why this would happen (though 
programmatically possible) so we do not
+                    // handle it.
+                    throw new RuntimeException(String.format(

Review Comment:
   This RuntimeException in sonataPrePrepare should use IllegalStateException 
for consistency with other unexpected state errors.
   ```suggestion
                       throw new IllegalStateException(String.format(
   ```



##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXA.java:
##########
@@ -135,4 +164,28 @@ private Connection getConnectionProxyXA(Connection 
connection) throws SQLExcepti
         connectionProxyXA.init();
         return connectionProxyXA;
     }
+
+    // Should be auto-executed when the datasource is closed. Now for 
simplicity we let the application manually
+    // release the pending helpers.
+    public void releaseAllHelpers() throws SQLException {
+        if (!sonataSsiShimEnabled) {
+            throw new RuntimeException("Sonata SSI helper transactions are not 
allowed for the datasource");
+        }
+
+        try (Connection conn = dataSource.getConnection()) {
+            for (int helperTxnId : HELPER_ID_REF_COUNT.keySet()) {
+                try (Statement stmt = conn.createStatement()) {
+                    stmt.executeUpdate("rollback prepared '" + helperTxnId + 
"'");
+                }
+            }
+        }
+    }
+
+    protected Connection getSsiHelperConnection() throws SQLException {
+        if (!sonataSsiShimEnabled) {
+            throw new RuntimeException("Sonata SSI helper transactions are not 
allowed for the datasource");

Review Comment:
   The error message in getSsiHelperConnection should be more specific about 
the requirements, similar to the issue in releaseAllHelpers. Additionally, both 
methods throw the same generic error message, which could be extracted as a 
constant for consistency.



##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
     public void setCombine(boolean combine) {
         this.combine = combine;
     }
+
+    private int getS2plDummyKey(XAXid xid) {
+        while (true) {
+            int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+            if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+                Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+                if (prev != null) {
+                    ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(key);
+                    // We don't know why this would happen (though 
programmatically possible) so we do not handle it.
+                    throw new RuntimeException(
+                            String.format("Global txn branch (%s) already 
associated with dummy key (%d)", xid, prev));
+                }
+                return key;
+            }
+        }
+    }
+
+    private int getSsiDummyKey(XAXid xid) throws SQLException {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+                ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+        while (keyAndHelperId == null) {
+            synchronized (((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+                if (!((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+                    keyAndHelperId = ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+                    continue;
+                }
+
+                // Inside this if block, reserved set is empty and only 
current thread is making a new one, so new keys
+                // won't be added to active set, thus the following contains() 
test is safe.
+
+                HashSet<Integer> newBatch = new HashSet<>();
+                while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+                    int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+                    if (!((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+                        newBatch.add(newKey);
+                    }
+                }
+
+                // We should initiate the new helper inside the synchronized 
block, to guarantee that helper is prepared
+                // prior to all corresponding original transactions.
+
+                int helperTxnId;
+                AtomicInteger counter = new 
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+                while (true) {
+                    helperTxnId = random.nextInt();
+                    AtomicInteger prevCounter =
+                            ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+                    if (prevCounter == null) {
+                        break;
+                    }
+                }
+
+                try (Connection helperConn = ((DataSourceProxyXA) 
resource).getSsiHelperConnection()) {
+                    helperConn.setAutoCommit(false);
+
+                    try (Statement helperStmt = helperConn.createStatement()) {
+                        for (Integer dummyKey : newBatch) {
+                            try (ResultSet rs = helperStmt.executeQuery(
+                                    "select count(*) from \"" + DUMMY_TABLE + 
"\" where key=" + dummyKey)) {
+                                if (rs.next()) {
+                                    rs.getInt(1); // to make sure read is 
executed in the DB
+                                }
+                            }
+                        }
+
+                        int ret = helperStmt.executeUpdate("prepare 
transaction '" + helperTxnId + "'");
+                        if (ret != 0) {
+                            throw new RuntimeException(String.format(
+                                    "PostgreSQL txn prepare returned %d, which 
should be 0 instead", ret));
+                        }
+                    }
+                }
+
+                // Mark them all active
+                ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+                // Make them visible in random order
+                List<Integer> asList = new ArrayList<>(newBatch);
+                Collections.shuffle(asList);
+                int firstKey = asList.remove(0); // take one for ourselves to 
avoid loop again
+                for (int otherKey : asList) {
+                    ((DataSourceProxyXA) resource)
+                            .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+                                    new AbstractMap.SimpleEntry<>(otherKey, 
helperTxnId));
+                }
+
+                keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey, 
helperTxnId);
+            }
+        }
+
+        // associate xid with dummy key
+        Integer prevDummy = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+        if (prevDummy != null) {
+            throw new RuntimeException(
+                    String.format("Global txn branch (%s) already associated 
with dummy key (%d)", xid, prevDummy));
+        }
+
+        // associate xid with helper id
+        Integer prevHelper =
+                ((DataSourceProxyXA) 
resource).XID_TO_HELPER_ID.putIfAbsent(xid, keyAndHelperId.getValue());
+        if (prevHelper != null) {
+            throw new RuntimeException(String.format(
+                    "Global txn branch (%s) already associated with helper txn 
ID (%d)", xid, prevHelper));
+        }
+
+        return keyAndHelperId.getKey();
+    }
+
+    private void forgetDummyKey(XAXid xid) {
+        Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.remove(xid);
+        if (prev == null) {
+            // Possible if TM initiated a rollback, and RM is yet to prepare 
the branch
+            return;
+        }
+
+        boolean recorded = ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(prev);
+        if (!recorded) {
+            throw new RuntimeException(String.format(
+                    "Dummy key (%d) mapped to global txn branch (%s) not found 
in active dummy key set", prev, xid));
+        }
+    }
+
+    private boolean releaseHelperTxn(XAXid xid) throws XAException {
+        Integer helperTxnId = ((DataSourceProxyXA) 
resource).XID_TO_HELPER_ID.remove(xid);
+
+        if (helperTxnId == null) {
+            // Possible if TM initiated a rollback, and RM is yet to prepare 
the branch
+            return false;
+        }
+
+        AtomicInteger counter = ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.get(helperTxnId);
+        if (counter == null) {
+            throw new RuntimeException("Reference counter for helper txn (" + 
helperTxnId + ") not found");
+        }
+
+        int currentUsage = counter.decrementAndGet();
+        if (currentUsage == 0) {
+            ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.remove(helperTxnId);
+            try (Statement stmt = originalConnection.createStatement()) { // 
do not use wrapped xa stmt
+                stmt.executeUpdate("rollback prepared '" + helperTxnId + "'");
+            } catch (SQLException e) {
+                if ("42704".equals(e.getSQLState())) {
+                    LOGGER.error("Helper txn ({}) not prepared; should be 
possible with helper sharing", helperTxnId);
+                    // This is caused by TM actively rolling back the branch 
while the branch is preparing. We return an
+                    // XAException to notify TM to try again later. Extremely 
rare, but still possible (e.g., if helper
+                    // batch size=1).
+                    XAException xe = new XAException("Helper txn (" + 
helperTxnId + ") not yet prepared, try again");
+                    xe.errorCode = XAException.XA_RETRY;
+                    throw xe;
+                }
+
+                LOGGER.error("Unexpected SQLException while rolling back 
helper txn: {}", e.getMessage());
+                throw new RuntimeException(e);
+            }
+        }
+
+        return true;
+    }
+
+    private void sonataPrePrepare() throws SQLException, XAException {
+        // For an S2PL DB, we add a dummy write; for an SSI DB, we add a 
helper txn + a dummy write. Then,
+        // original prepare logic resumes.
+        if (DBType.MYSQL.name().equalsIgnoreCase(resource.getDbType())) {
+            int key = getS2plDummyKey(xaBranchXid);
+            int value = ThreadLocalRandom.current().nextInt();
+
+            try (Statement stmt = createStatement()) {
+                int retry = 0;
+                int affected = 0;
+                // Note: affected=1 for insert; =2 for update; =0 for update 
but value unchanged
+                while (affected == 0) {
+                    affected =
+                            stmt.executeUpdate("insert into `" + DUMMY_TABLE + 
"` (`key`, value) values (" + key + ","
+                                    + value + ") on duplicate key update 
value="
+                                    + value);
+                    retry++;
+                    if (retry > S2PL_UPDATE_RETRY_WARNING_THRESHOLD) {
+                        LOGGER.warn(
+                                "MySQL random dummy writes generated conflict 
with existing rows in {} consecutive attempts! This is super unlikely to occur, 
please investigate.",
+                                retry);
+                    }
+                }

Review Comment:
   The SQL query construction in sonataPrePrepare uses string concatenation 
with user-controlled values ('key' and 'value'), which could potentially lead 
to SQL injection if DUMMY_TABLE name is user-configurable. Consider using 
PreparedStatement instead of Statement with string concatenation for better 
security and performance.



##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
     public void setCombine(boolean combine) {
         this.combine = combine;
     }
+
+    private int getS2plDummyKey(XAXid xid) {
+        while (true) {
+            int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+            if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+                Integer prev = ((DataSourceProxyXA) 
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+                if (prev != null) {
+                    ((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.remove(key);
+                    // We don't know why this would happen (though 
programmatically possible) so we do not handle it.
+                    throw new RuntimeException(
+                            String.format("Global txn branch (%s) already 
associated with dummy key (%d)", xid, prev));
+                }
+                return key;
+            }
+        }
+    }
+
+    private int getSsiDummyKey(XAXid xid) throws SQLException {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+                ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+        while (keyAndHelperId == null) {
+            synchronized (((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+                if (!((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+                    keyAndHelperId = ((DataSourceProxyXA) 
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+                    continue;
+                }
+
+                // Inside this if block, reserved set is empty and only 
current thread is making a new one, so new keys
+                // won't be added to active set, thus the following contains() 
test is safe.
+
+                HashSet<Integer> newBatch = new HashSet<>();
+                while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+                    int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+                    if (!((DataSourceProxyXA) 
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+                        newBatch.add(newKey);
+                    }
+                }
+
+                // We should initiate the new helper inside the synchronized 
block, to guarantee that helper is prepared
+                // prior to all corresponding original transactions.
+
+                int helperTxnId;
+                AtomicInteger counter = new 
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+                while (true) {
+                    helperTxnId = random.nextInt();
+                    AtomicInteger prevCounter =
+                            ((DataSourceProxyXA) 
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+                    if (prevCounter == null) {
+                        break;
+                    }
+                }
+
+                try (Connection helperConn = ((DataSourceProxyXA) 
resource).getSsiHelperConnection()) {
+                    helperConn.setAutoCommit(false);
+
+                    try (Statement helperStmt = helperConn.createStatement()) {
+                        for (Integer dummyKey : newBatch) {
+                            try (ResultSet rs = helperStmt.executeQuery(
+                                    "select count(*) from \"" + DUMMY_TABLE + 
"\" where key=" + dummyKey)) {

Review Comment:
   The query construction in getSsiDummyKey uses string concatenation for the 
dummy key value. While 'dummyKey' is an integer (less risky), using 
PreparedStatement would be more consistent with SQL best practices and provide 
better protection against potential issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to