Copilot commented on code in PR #7754:
URL: https://github.com/apache/incubator-seata/pull/7754#discussion_r2483734358
##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
public void setCombine(boolean combine) {
this.combine = combine;
}
+
+ private int getS2plDummyKey(XAXid xid) {
+ while (true) {
+ int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+ if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+ if (prev != null) {
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(key);
+ // We don't know why this would happen (though
programmatically possible) so we do not handle it.
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already
associated with dummy key (%d)", xid, prev));
+ }
+ return key;
+ }
+ }
+ }
+
+ private int getSsiDummyKey(XAXid xid) throws SQLException {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+ ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ while (keyAndHelperId == null) {
+ synchronized (((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+ if (!((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+ keyAndHelperId = ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ continue;
+ }
+
+ // Inside this if block, reserved set is empty and only
current thread is making a new one, so new keys
+ // won't be added to active set, thus the following contains()
test is safe.
+
+ HashSet<Integer> newBatch = new HashSet<>();
+ while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+ int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+ if (!((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+ newBatch.add(newKey);
+ }
+ }
+
+ // We should initiate the new helper inside the synchronized
block, to guarantee that helper is prepared
+ // prior to all corresponding original transactions.
+
+ int helperTxnId;
+ AtomicInteger counter = new
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+ while (true) {
+ helperTxnId = random.nextInt();
+ AtomicInteger prevCounter =
+ ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+ if (prevCounter == null) {
+ break;
+ }
+ }
+
+ try (Connection helperConn = ((DataSourceProxyXA)
resource).getSsiHelperConnection()) {
+ helperConn.setAutoCommit(false);
+
+ try (Statement helperStmt = helperConn.createStatement()) {
+ for (Integer dummyKey : newBatch) {
+ try (ResultSet rs = helperStmt.executeQuery(
+ "select count(*) from \"" + DUMMY_TABLE +
"\" where key=" + dummyKey)) {
+ if (rs.next()) {
+ rs.getInt(1); // to make sure read is
executed in the DB
+ }
+ }
+ }
+
+ int ret = helperStmt.executeUpdate("prepare
transaction '" + helperTxnId + "'");
+ if (ret != 0) {
+ throw new RuntimeException(String.format(
+ "PostgreSQL txn prepare returned %d, which
should be 0 instead", ret));
+ }
+ }
+ }
+
+ // Mark them all active
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+ // Make them visible in random order
+ List<Integer> asList = new ArrayList<>(newBatch);
+ Collections.shuffle(asList);
+ int firstKey = asList.remove(0); // take one for ourselves to
avoid loop again
+ for (int otherKey : asList) {
+ ((DataSourceProxyXA) resource)
+ .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+ new AbstractMap.SimpleEntry<>(otherKey,
helperTxnId));
+ }
+
+ keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey,
helperTxnId);
+ }
+ }
+
+ // associate xid with dummy key
+ Integer prevDummy = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+ if (prevDummy != null) {
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already associated
with dummy key (%d)", xid, prevDummy));
+ }
+
+ // associate xid with helper id
+ Integer prevHelper =
+ ((DataSourceProxyXA)
resource).XID_TO_HELPER_ID.putIfAbsent(xid, keyAndHelperId.getValue());
+ if (prevHelper != null) {
+ throw new RuntimeException(String.format(
Review Comment:
This RuntimeException should also use a more specific exception type like
IllegalStateException for consistency with similar state violation errors.
```suggestion
throw new IllegalStateException(String.format(
```
##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
public void setCombine(boolean combine) {
this.combine = combine;
}
+
+ private int getS2plDummyKey(XAXid xid) {
+ while (true) {
+ int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+ if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+ if (prev != null) {
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(key);
+ // We don't know why this would happen (though
programmatically possible) so we do not handle it.
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already
associated with dummy key (%d)", xid, prev));
+ }
+ return key;
+ }
+ }
+ }
+
+ private int getSsiDummyKey(XAXid xid) throws SQLException {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+ ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ while (keyAndHelperId == null) {
+ synchronized (((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+ if (!((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+ keyAndHelperId = ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ continue;
+ }
+
+ // Inside this if block, reserved set is empty and only
current thread is making a new one, so new keys
+ // won't be added to active set, thus the following contains()
test is safe.
+
+ HashSet<Integer> newBatch = new HashSet<>();
+ while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+ int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+ if (!((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+ newBatch.add(newKey);
+ }
+ }
+
+ // We should initiate the new helper inside the synchronized
block, to guarantee that helper is prepared
+ // prior to all corresponding original transactions.
+
+ int helperTxnId;
+ AtomicInteger counter = new
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+ while (true) {
+ helperTxnId = random.nextInt();
+ AtomicInteger prevCounter =
+ ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+ if (prevCounter == null) {
+ break;
+ }
+ }
+
+ try (Connection helperConn = ((DataSourceProxyXA)
resource).getSsiHelperConnection()) {
+ helperConn.setAutoCommit(false);
+
+ try (Statement helperStmt = helperConn.createStatement()) {
+ for (Integer dummyKey : newBatch) {
+ try (ResultSet rs = helperStmt.executeQuery(
+ "select count(*) from \"" + DUMMY_TABLE +
"\" where key=" + dummyKey)) {
+ if (rs.next()) {
+ rs.getInt(1); // to make sure read is
executed in the DB
+ }
+ }
+ }
+
+ int ret = helperStmt.executeUpdate("prepare
transaction '" + helperTxnId + "'");
+ if (ret != 0) {
+ throw new RuntimeException(String.format(
+ "PostgreSQL txn prepare returned %d, which
should be 0 instead", ret));
+ }
+ }
+ }
+
+ // Mark them all active
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+ // Make them visible in random order
+ List<Integer> asList = new ArrayList<>(newBatch);
+ Collections.shuffle(asList);
+ int firstKey = asList.remove(0); // take one for ourselves to
avoid loop again
+ for (int otherKey : asList) {
+ ((DataSourceProxyXA) resource)
+ .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+ new AbstractMap.SimpleEntry<>(otherKey,
helperTxnId));
+ }
+
+ keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey,
helperTxnId);
+ }
+ }
+
+ // associate xid with dummy key
+ Integer prevDummy = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+ if (prevDummy != null) {
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already associated
with dummy key (%d)", xid, prevDummy));
Review Comment:
Similar to getS2plDummyKey, this RuntimeException in getSsiDummyKey should
use a more specific exception type like IllegalStateException to better convey
that this represents an unexpected internal state violation.
##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
public void setCombine(boolean combine) {
this.combine = combine;
}
+
+ private int getS2plDummyKey(XAXid xid) {
+ while (true) {
+ int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+ if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+ if (prev != null) {
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(key);
+ // We don't know why this would happen (though
programmatically possible) so we do not handle it.
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already
associated with dummy key (%d)", xid, prev));
+ }
+ return key;
+ }
+ }
+ }
+
+ private int getSsiDummyKey(XAXid xid) throws SQLException {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+ ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ while (keyAndHelperId == null) {
+ synchronized (((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+ if (!((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+ keyAndHelperId = ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ continue;
+ }
+
+ // Inside this if block, reserved set is empty and only
current thread is making a new one, so new keys
+ // won't be added to active set, thus the following contains()
test is safe.
+
+ HashSet<Integer> newBatch = new HashSet<>();
+ while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+ int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+ if (!((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+ newBatch.add(newKey);
+ }
+ }
+
+ // We should initiate the new helper inside the synchronized
block, to guarantee that helper is prepared
+ // prior to all corresponding original transactions.
+
+ int helperTxnId;
+ AtomicInteger counter = new
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+ while (true) {
+ helperTxnId = random.nextInt();
+ AtomicInteger prevCounter =
+ ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+ if (prevCounter == null) {
+ break;
+ }
+ }
+
+ try (Connection helperConn = ((DataSourceProxyXA)
resource).getSsiHelperConnection()) {
+ helperConn.setAutoCommit(false);
+
+ try (Statement helperStmt = helperConn.createStatement()) {
+ for (Integer dummyKey : newBatch) {
+ try (ResultSet rs = helperStmt.executeQuery(
+ "select count(*) from \"" + DUMMY_TABLE +
"\" where key=" + dummyKey)) {
+ if (rs.next()) {
+ rs.getInt(1); // to make sure read is
executed in the DB
+ }
+ }
+ }
+
+ int ret = helperStmt.executeUpdate("prepare
transaction '" + helperTxnId + "'");
+ if (ret != 0) {
+ throw new RuntimeException(String.format(
+ "PostgreSQL txn prepare returned %d, which
should be 0 instead", ret));
+ }
+ }
+ }
+
+ // Mark them all active
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+ // Make them visible in random order
+ List<Integer> asList = new ArrayList<>(newBatch);
+ Collections.shuffle(asList);
+ int firstKey = asList.remove(0); // take one for ourselves to
avoid loop again
+ for (int otherKey : asList) {
+ ((DataSourceProxyXA) resource)
+ .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+ new AbstractMap.SimpleEntry<>(otherKey,
helperTxnId));
+ }
+
+ keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey,
helperTxnId);
+ }
+ }
+
+ // associate xid with dummy key
+ Integer prevDummy = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+ if (prevDummy != null) {
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already associated
with dummy key (%d)", xid, prevDummy));
+ }
+
+ // associate xid with helper id
+ Integer prevHelper =
+ ((DataSourceProxyXA)
resource).XID_TO_HELPER_ID.putIfAbsent(xid, keyAndHelperId.getValue());
+ if (prevHelper != null) {
+ throw new RuntimeException(String.format(
+ "Global txn branch (%s) already associated with helper txn
ID (%d)", xid, prevHelper));
+ }
+
+ return keyAndHelperId.getKey();
+ }
+
+ private void forgetDummyKey(XAXid xid) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.remove(xid);
+ if (prev == null) {
+ // Possible if TM initiated a rollback, and RM is yet to prepare
the branch
+ return;
+ }
+
+ boolean recorded = ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(prev);
+ if (!recorded) {
+ throw new RuntimeException(String.format(
+ "Dummy key (%d) mapped to global txn branch (%s) not found
in active dummy key set", prev, xid));
+ }
+ }
+
+ private boolean releaseHelperTxn(XAXid xid) throws XAException {
+ Integer helperTxnId = ((DataSourceProxyXA)
resource).XID_TO_HELPER_ID.remove(xid);
+
+ if (helperTxnId == null) {
+ // Possible if TM initiated a rollback, and RM is yet to prepare
the branch
+ return false;
+ }
+
+ AtomicInteger counter = ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.get(helperTxnId);
+ if (counter == null) {
+ throw new RuntimeException("Reference counter for helper txn (" +
helperTxnId + ") not found");
Review Comment:
This RuntimeException in releaseHelperTxn should use IllegalStateException
to better represent the unexpected internal state.
```suggestion
throw new IllegalStateException("Reference counter for helper
txn (" + helperTxnId + ") not found");
```
##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
public void setCombine(boolean combine) {
this.combine = combine;
}
+
+ private int getS2plDummyKey(XAXid xid) {
+ while (true) {
+ int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+ if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+ if (prev != null) {
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(key);
+ // We don't know why this would happen (though
programmatically possible) so we do not handle it.
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already
associated with dummy key (%d)", xid, prev));
+ }
+ return key;
+ }
+ }
+ }
+
+ private int getSsiDummyKey(XAXid xid) throws SQLException {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+ ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ while (keyAndHelperId == null) {
+ synchronized (((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+ if (!((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+ keyAndHelperId = ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ continue;
+ }
+
+ // Inside this if block, reserved set is empty and only
current thread is making a new one, so new keys
+ // won't be added to active set, thus the following contains()
test is safe.
+
+ HashSet<Integer> newBatch = new HashSet<>();
+ while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+ int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+ if (!((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+ newBatch.add(newKey);
+ }
+ }
+
+ // We should initiate the new helper inside the synchronized
block, to guarantee that helper is prepared
+ // prior to all corresponding original transactions.
+
+ int helperTxnId;
+ AtomicInteger counter = new
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+ while (true) {
+ helperTxnId = random.nextInt();
+ AtomicInteger prevCounter =
+ ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+ if (prevCounter == null) {
+ break;
+ }
+ }
+
+ try (Connection helperConn = ((DataSourceProxyXA)
resource).getSsiHelperConnection()) {
+ helperConn.setAutoCommit(false);
+
+ try (Statement helperStmt = helperConn.createStatement()) {
+ for (Integer dummyKey : newBatch) {
+ try (ResultSet rs = helperStmt.executeQuery(
+ "select count(*) from \"" + DUMMY_TABLE +
"\" where key=" + dummyKey)) {
+ if (rs.next()) {
+ rs.getInt(1); // to make sure read is
executed in the DB
+ }
+ }
+ }
+
+ int ret = helperStmt.executeUpdate("prepare
transaction '" + helperTxnId + "'");
+ if (ret != 0) {
+ throw new RuntimeException(String.format(
Review Comment:
This RuntimeException should use IllegalStateException since it represents
an unexpected database response that violates expected behavior.
```suggestion
throw new IllegalStateException(String.format(
```
##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
public void setCombine(boolean combine) {
this.combine = combine;
}
+
+ private int getS2plDummyKey(XAXid xid) {
+ while (true) {
+ int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+ if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+ if (prev != null) {
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(key);
+ // We don't know why this would happen (though
programmatically possible) so we do not handle it.
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already
associated with dummy key (%d)", xid, prev));
+ }
+ return key;
+ }
+ }
+ }
+
+ private int getSsiDummyKey(XAXid xid) throws SQLException {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+ ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ while (keyAndHelperId == null) {
+ synchronized (((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+ if (!((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+ keyAndHelperId = ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ continue;
+ }
+
+ // Inside this if block, reserved set is empty and only
current thread is making a new one, so new keys
+ // won't be added to active set, thus the following contains()
test is safe.
+
+ HashSet<Integer> newBatch = new HashSet<>();
+ while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+ int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+ if (!((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+ newBatch.add(newKey);
+ }
+ }
+
+ // We should initiate the new helper inside the synchronized
block, to guarantee that helper is prepared
+ // prior to all corresponding original transactions.
+
+ int helperTxnId;
+ AtomicInteger counter = new
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+ while (true) {
+ helperTxnId = random.nextInt();
+ AtomicInteger prevCounter =
+ ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+ if (prevCounter == null) {
+ break;
+ }
+ }
+
+ try (Connection helperConn = ((DataSourceProxyXA)
resource).getSsiHelperConnection()) {
+ helperConn.setAutoCommit(false);
+
+ try (Statement helperStmt = helperConn.createStatement()) {
+ for (Integer dummyKey : newBatch) {
+ try (ResultSet rs = helperStmt.executeQuery(
+ "select count(*) from \"" + DUMMY_TABLE +
"\" where key=" + dummyKey)) {
+ if (rs.next()) {
+ rs.getInt(1); // to make sure read is
executed in the DB
+ }
+ }
+ }
+
+ int ret = helperStmt.executeUpdate("prepare
transaction '" + helperTxnId + "'");
+ if (ret != 0) {
+ throw new RuntimeException(String.format(
+ "PostgreSQL txn prepare returned %d, which
should be 0 instead", ret));
+ }
+ }
+ }
+
+ // Mark them all active
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+ // Make them visible in random order
+ List<Integer> asList = new ArrayList<>(newBatch);
+ Collections.shuffle(asList);
+ int firstKey = asList.remove(0); // take one for ourselves to
avoid loop again
+ for (int otherKey : asList) {
+ ((DataSourceProxyXA) resource)
+ .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+ new AbstractMap.SimpleEntry<>(otherKey,
helperTxnId));
+ }
+
+ keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey,
helperTxnId);
+ }
+ }
+
+ // associate xid with dummy key
+ Integer prevDummy = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+ if (prevDummy != null) {
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already associated
with dummy key (%d)", xid, prevDummy));
+ }
+
+ // associate xid with helper id
+ Integer prevHelper =
+ ((DataSourceProxyXA)
resource).XID_TO_HELPER_ID.putIfAbsent(xid, keyAndHelperId.getValue());
+ if (prevHelper != null) {
+ throw new RuntimeException(String.format(
+ "Global txn branch (%s) already associated with helper txn
ID (%d)", xid, prevHelper));
+ }
+
+ return keyAndHelperId.getKey();
+ }
+
+ private void forgetDummyKey(XAXid xid) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.remove(xid);
+ if (prev == null) {
+ // Possible if TM initiated a rollback, and RM is yet to prepare
the branch
+ return;
+ }
+
+ boolean recorded = ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(prev);
+ if (!recorded) {
+ throw new RuntimeException(String.format(
+ "Dummy key (%d) mapped to global txn branch (%s) not found
in active dummy key set", prev, xid));
+ }
+ }
+
+ private boolean releaseHelperTxn(XAXid xid) throws XAException {
+ Integer helperTxnId = ((DataSourceProxyXA)
resource).XID_TO_HELPER_ID.remove(xid);
+
+ if (helperTxnId == null) {
+ // Possible if TM initiated a rollback, and RM is yet to prepare
the branch
+ return false;
+ }
+
+ AtomicInteger counter = ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.get(helperTxnId);
+ if (counter == null) {
+ throw new RuntimeException("Reference counter for helper txn (" +
helperTxnId + ") not found");
+ }
+
+ int currentUsage = counter.decrementAndGet();
+ if (currentUsage == 0) {
+ ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.remove(helperTxnId);
+ try (Statement stmt = originalConnection.createStatement()) { //
do not use wrapped xa stmt
+ stmt.executeUpdate("rollback prepared '" + helperTxnId + "'");
+ } catch (SQLException e) {
+ if ("42704".equals(e.getSQLState())) {
+ LOGGER.error("Helper txn ({}) not prepared; should be
possible with helper sharing", helperTxnId);
+ // This is caused by TM actively rolling back the branch
while the branch is preparing. We return an
+ // XAException to notify TM to try again later. Extremely
rare, but still possible (e.g., if helper
+ // batch size=1).
+ XAException xe = new XAException("Helper txn (" +
helperTxnId + ") not yet prepared, try again");
+ xe.errorCode = XAException.XA_RETRY;
+ throw xe;
+ }
+
+ LOGGER.error("Unexpected SQLException while rolling back
helper txn: {}", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
+ return true;
+ }
+
+ private void sonataPrePrepare() throws SQLException, XAException {
+ // For an S2PL DB, we add a dummy write; for an SSI DB, we add a
helper txn + a dummy write. Then,
+ // original prepare logic resumes.
+ if (DBType.MYSQL.name().equalsIgnoreCase(resource.getDbType())) {
+ int key = getS2plDummyKey(xaBranchXid);
+ int value = ThreadLocalRandom.current().nextInt();
+
+ try (Statement stmt = createStatement()) {
+ int retry = 0;
+ int affected = 0;
+ // Note: affected=1 for insert; =2 for update; =0 for update
but value unchanged
+ while (affected == 0) {
+ affected =
+ stmt.executeUpdate("insert into `" + DUMMY_TABLE +
"` (`key`, value) values (" + key + ","
+ + value + ") on duplicate key update
value="
+ + value);
+ retry++;
+ if (retry > S2PL_UPDATE_RETRY_WARNING_THRESHOLD) {
+ LOGGER.warn(
+ "MySQL random dummy writes generated conflict
with existing rows in {} consecutive attempts! This is super unlikely to occur,
please investigate.",
+ retry);
+ }
+ }
+ }
+ } else if
(DBType.POSTGRESQL.name().equalsIgnoreCase(resource.getDbType())) {
+ int key = getSsiDummyKey(xaBranchXid);
+ int value = ThreadLocalRandom.current().nextInt();
+
+ try (Statement stmt = createStatement()) {
+ // Unlike MySQL, PostgreSQL does not distinguish matched but
value-unchanged rows.
+ int affected;
+ try {
+ affected = stmt.executeUpdate(
+ "insert into \"" + DUMMY_TABLE + "\" (key, value)
values (" + key + "," + value
+ + ") on conflict (key) do update set
value="
+ + value);
Review Comment:
Similar to the MySQL case, this PostgreSQL SQL query uses string
concatenation which poses a SQL injection risk. Use PreparedStatement instead
for safer and more efficient query execution.
```suggestion
String sql = "insert into \"" + DUMMY_TABLE + "\" (key, value)
values (?, ?) "
+ "on conflict (key) do update set value = ?";
try (java.sql.PreparedStatement pstmt =
getTargetConnection().prepareStatement(sql)) {
// Unlike MySQL, PostgreSQL does not distinguish matched but
value-unchanged rows.
int affected;
try {
pstmt.setInt(1, key);
pstmt.setInt(2, value);
pstmt.setInt(3, value);
affected = pstmt.executeUpdate();
```
##########
common/src/main/java/org/apache/seata/common/ConfigurationKeys.java:
##########
@@ -1282,4 +1282,11 @@ public interface ConfigurationKeys {
* The constant RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM.
*/
String RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM = RATE_LIMIT_PREFIX +
".bucketTokenInitialNum";
+
+ String SONATA_PREFIX = "sonata.";
+ String SONATA_ENABLE_GLOBAL_SERIALIZABILITY = SONATA_PREFIX +
"enableGlobalSerializability";
+ String SONATA_DUMMY_TABLE = SONATA_PREFIX + "dummyTable";
+ String SONATA_DUMMY_TABLE_SIZE = SONATA_PREFIX + "dummyTableSize";
+ String SONATA_S2PL_DUMMY_WRITE_RETRY_WARNING_THRESHOLD = SONATA_PREFIX +
"s2plDummyWriteRetryWarningThreshold";
+ String SONATA_SSI_HELPER_BATCH_SIZE = SONATA_PREFIX + "ssiHelperBatchSize";
Review Comment:
The SONATA_PREFIX uses a different pattern than other configuration prefixes
in this file. Most other prefixes in ConfigurationKeys don't include a trailing
dot and use hierarchical structure (e.g., SERVER_PREFIX, CLIENT_PREFIX).
However, in StarterConstants.java line 46, SONATA_PREFIX is defined as
SEATA_PREFIX + '.sonata', which would make it 'seata.sonata'. This
inconsistency could lead to configuration key mismatches. Consider either
removing the trailing dot here to match the pattern, or ensure the prefix
properly aligns with the StarterConstants definition.
```suggestion
String SONATA_PREFIX = "sonata";
String SONATA_ENABLE_GLOBAL_SERIALIZABILITY = SONATA_PREFIX +
".enableGlobalSerializability";
String SONATA_DUMMY_TABLE = SONATA_PREFIX + ".dummyTable";
String SONATA_DUMMY_TABLE_SIZE = SONATA_PREFIX + ".dummyTableSize";
String SONATA_S2PL_DUMMY_WRITE_RETRY_WARNING_THRESHOLD = SONATA_PREFIX +
".s2plDummyWriteRetryWarningThreshold";
String SONATA_SSI_HELPER_BATCH_SIZE = SONATA_PREFIX +
".ssiHelperBatchSize";
```
##########
seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/SonataProperties.java:
##########
@@ -0,0 +1,61 @@
+package org.apache.seata.spring.boot.autoconfigure.properties;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import static
org.apache.seata.spring.boot.autoconfigure.StarterConstants.SONATA_PREFIX;
+
+@Component
+@ConfigurationProperties(prefix = SONATA_PREFIX)
+public class SonataProperties {
+ private boolean enableGlobalSerializability;
+ private String dummyTable;
+ private int dummyTableSize;
+ private int s2plDummyWriteRetryWarningThreshold;
Review Comment:
The SonataProperties class lacks documentation explaining the purpose of
each configuration property. Consider adding JavaDoc comments to describe what
'enableGlobalSerializability', 'dummyTable', 'dummyTableSize',
's2plDummyWriteRetryWarningThreshold', and 'ssiHelperBatchSize' represent and
how they affect Sonata's behavior.
```suggestion
public class SonataProperties {
/**
* Whether to enable global serializability for transactions.
* If true, Sonata will enforce serializability across all transactions,
which may impact performance.
*/
private boolean enableGlobalSerializability;
/**
* The name of the dummy table used for certain concurrency control
mechanisms.
* This table is used internally by Sonata for lock management or dummy
writes.
*/
private String dummyTable;
/**
* The number of rows in the dummy table.
* This determines the size of the dummy table used for lock striping or
contention reduction.
*/
private int dummyTableSize;
/**
* The threshold for logging a warning when dummy write retries exceed
this value in S2PL (Strict Two-Phase Locking) mode.
* Helps to detect and alert on excessive contention or retry attempts.
*/
private int s2plDummyWriteRetryWarningThreshold;
/**
* The batch size used by the SSI (Serializable Snapshot Isolation)
helper.
* Controls how many records are processed in a single batch for
SSI-related operations.
*/
```
##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
public void setCombine(boolean combine) {
this.combine = combine;
}
+
+ private int getS2plDummyKey(XAXid xid) {
+ while (true) {
+ int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+ if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+ if (prev != null) {
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(key);
+ // We don't know why this would happen (though
programmatically possible) so we do not handle it.
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already
associated with dummy key (%d)", xid, prev));
Review Comment:
The error message in getS2plDummyKey includes a comment (line 577) stating
'We don't know why this would happen (though programmatically possible) so we
do not handle it.' This suggests the error is unexpected. Consider using a more
specific exception type (e.g., IllegalStateException) and providing more
actionable guidance in the error message for debugging purposes.
```suggestion
throw new IllegalStateException(
String.format("Invariant violation: Global txn
branch (%s) is already associated with dummy key (%d) in XID_TO_DUMMY_KEY. This
indicates a possible logic error or concurrency issue. Please check the
transaction lifecycle and report this stack trace to the Seata maintainers.
[ACTIVE_DUMMY_KEYS=%s, XID_TO_DUMMY_KEY.size=%d]",
xid, prev, ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS, ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.size()));
```
##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
public void setCombine(boolean combine) {
this.combine = combine;
}
+
+ private int getS2plDummyKey(XAXid xid) {
+ while (true) {
+ int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+ if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+ if (prev != null) {
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(key);
+ // We don't know why this would happen (though
programmatically possible) so we do not handle it.
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already
associated with dummy key (%d)", xid, prev));
+ }
+ return key;
+ }
+ }
+ }
+
+ private int getSsiDummyKey(XAXid xid) throws SQLException {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+ ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ while (keyAndHelperId == null) {
+ synchronized (((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+ if (!((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+ keyAndHelperId = ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ continue;
+ }
+
+ // Inside this if block, reserved set is empty and only
current thread is making a new one, so new keys
+ // won't be added to active set, thus the following contains()
test is safe.
+
+ HashSet<Integer> newBatch = new HashSet<>();
+ while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+ int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+ if (!((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+ newBatch.add(newKey);
+ }
+ }
+
+ // We should initiate the new helper inside the synchronized
block, to guarantee that helper is prepared
+ // prior to all corresponding original transactions.
+
+ int helperTxnId;
+ AtomicInteger counter = new
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+ while (true) {
+ helperTxnId = random.nextInt();
+ AtomicInteger prevCounter =
+ ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+ if (prevCounter == null) {
+ break;
+ }
+ }
+
+ try (Connection helperConn = ((DataSourceProxyXA)
resource).getSsiHelperConnection()) {
+ helperConn.setAutoCommit(false);
+
+ try (Statement helperStmt = helperConn.createStatement()) {
+ for (Integer dummyKey : newBatch) {
+ try (ResultSet rs = helperStmt.executeQuery(
+ "select count(*) from \"" + DUMMY_TABLE +
"\" where key=" + dummyKey)) {
+ if (rs.next()) {
+ rs.getInt(1); // to make sure read is
executed in the DB
+ }
+ }
+ }
+
+ int ret = helperStmt.executeUpdate("prepare
transaction '" + helperTxnId + "'");
+ if (ret != 0) {
+ throw new RuntimeException(String.format(
+ "PostgreSQL txn prepare returned %d, which
should be 0 instead", ret));
+ }
+ }
+ }
+
+ // Mark them all active
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+ // Make them visible in random order
+ List<Integer> asList = new ArrayList<>(newBatch);
+ Collections.shuffle(asList);
+ int firstKey = asList.remove(0); // take one for ourselves to
avoid loop again
+ for (int otherKey : asList) {
+ ((DataSourceProxyXA) resource)
+ .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+ new AbstractMap.SimpleEntry<>(otherKey,
helperTxnId));
+ }
+
+ keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey,
helperTxnId);
+ }
+ }
+
+ // associate xid with dummy key
+ Integer prevDummy = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+ if (prevDummy != null) {
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already associated
with dummy key (%d)", xid, prevDummy));
+ }
+
+ // associate xid with helper id
+ Integer prevHelper =
+ ((DataSourceProxyXA)
resource).XID_TO_HELPER_ID.putIfAbsent(xid, keyAndHelperId.getValue());
+ if (prevHelper != null) {
+ throw new RuntimeException(String.format(
+ "Global txn branch (%s) already associated with helper txn
ID (%d)", xid, prevHelper));
+ }
+
+ return keyAndHelperId.getKey();
+ }
+
+ private void forgetDummyKey(XAXid xid) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.remove(xid);
+ if (prev == null) {
+ // Possible if TM initiated a rollback, and RM is yet to prepare
the branch
+ return;
+ }
+
+ boolean recorded = ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(prev);
+ if (!recorded) {
+ throw new RuntimeException(String.format(
Review Comment:
This RuntimeException in forgetDummyKey should use IllegalStateException to
indicate an internal consistency error.
```suggestion
throw new IllegalStateException(String.format(
```
##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXA.java:
##########
@@ -135,4 +164,28 @@ private Connection getConnectionProxyXA(Connection
connection) throws SQLExcepti
connectionProxyXA.init();
return connectionProxyXA;
}
+
+ // Should be auto-executed when the datasource is closed. Now for
simplicity we let the application manually
+ // release the pending helpers.
+ public void releaseAllHelpers() throws SQLException {
+ if (!sonataSsiShimEnabled) {
+ throw new RuntimeException("Sonata SSI helper transactions are not
allowed for the datasource");
Review Comment:
The error message in releaseAllHelpers should be more specific about why
helper transactions are not allowed. Consider including information about the
database type requirement (PostgreSQL) and the configuration state
(sonataSsiShimEnabled).
```suggestion
throw new RuntimeException("Sonata SSI helper transactions are
only allowed when using a PostgreSQL database and when sonataSsiShimEnabled is
true.");
```
##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
public void setCombine(boolean combine) {
this.combine = combine;
}
+
+ private int getS2plDummyKey(XAXid xid) {
+ while (true) {
+ int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+ if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+ if (prev != null) {
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(key);
+ // We don't know why this would happen (though
programmatically possible) so we do not handle it.
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already
associated with dummy key (%d)", xid, prev));
+ }
+ return key;
+ }
+ }
+ }
+
+ private int getSsiDummyKey(XAXid xid) throws SQLException {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+ ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ while (keyAndHelperId == null) {
+ synchronized (((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+ if (!((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+ keyAndHelperId = ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ continue;
+ }
+
+ // Inside this if block, reserved set is empty and only
current thread is making a new one, so new keys
+ // won't be added to active set, thus the following contains()
test is safe.
+
+ HashSet<Integer> newBatch = new HashSet<>();
+ while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+ int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+ if (!((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+ newBatch.add(newKey);
+ }
+ }
+
+ // We should initiate the new helper inside the synchronized
block, to guarantee that helper is prepared
+ // prior to all corresponding original transactions.
+
+ int helperTxnId;
+ AtomicInteger counter = new
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+ while (true) {
+ helperTxnId = random.nextInt();
+ AtomicInteger prevCounter =
+ ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+ if (prevCounter == null) {
+ break;
+ }
+ }
+
+ try (Connection helperConn = ((DataSourceProxyXA)
resource).getSsiHelperConnection()) {
+ helperConn.setAutoCommit(false);
+
+ try (Statement helperStmt = helperConn.createStatement()) {
+ for (Integer dummyKey : newBatch) {
+ try (ResultSet rs = helperStmt.executeQuery(
+ "select count(*) from \"" + DUMMY_TABLE +
"\" where key=" + dummyKey)) {
+ if (rs.next()) {
+ rs.getInt(1); // to make sure read is
executed in the DB
+ }
+ }
+ }
+
+ int ret = helperStmt.executeUpdate("prepare
transaction '" + helperTxnId + "'");
+ if (ret != 0) {
+ throw new RuntimeException(String.format(
+ "PostgreSQL txn prepare returned %d, which
should be 0 instead", ret));
+ }
+ }
+ }
+
+ // Mark them all active
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+ // Make them visible in random order
+ List<Integer> asList = new ArrayList<>(newBatch);
+ Collections.shuffle(asList);
+ int firstKey = asList.remove(0); // take one for ourselves to
avoid loop again
+ for (int otherKey : asList) {
+ ((DataSourceProxyXA) resource)
+ .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+ new AbstractMap.SimpleEntry<>(otherKey,
helperTxnId));
+ }
+
+ keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey,
helperTxnId);
+ }
+ }
+
+ // associate xid with dummy key
+ Integer prevDummy = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+ if (prevDummy != null) {
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already associated
with dummy key (%d)", xid, prevDummy));
+ }
+
+ // associate xid with helper id
+ Integer prevHelper =
+ ((DataSourceProxyXA)
resource).XID_TO_HELPER_ID.putIfAbsent(xid, keyAndHelperId.getValue());
+ if (prevHelper != null) {
+ throw new RuntimeException(String.format(
+ "Global txn branch (%s) already associated with helper txn
ID (%d)", xid, prevHelper));
+ }
+
+ return keyAndHelperId.getKey();
+ }
+
+ private void forgetDummyKey(XAXid xid) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.remove(xid);
+ if (prev == null) {
+ // Possible if TM initiated a rollback, and RM is yet to prepare
the branch
+ return;
+ }
+
+ boolean recorded = ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(prev);
+ if (!recorded) {
+ throw new RuntimeException(String.format(
+ "Dummy key (%d) mapped to global txn branch (%s) not found
in active dummy key set", prev, xid));
+ }
+ }
+
+ private boolean releaseHelperTxn(XAXid xid) throws XAException {
+ Integer helperTxnId = ((DataSourceProxyXA)
resource).XID_TO_HELPER_ID.remove(xid);
+
+ if (helperTxnId == null) {
+ // Possible if TM initiated a rollback, and RM is yet to prepare
the branch
+ return false;
+ }
+
+ AtomicInteger counter = ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.get(helperTxnId);
+ if (counter == null) {
+ throw new RuntimeException("Reference counter for helper txn (" +
helperTxnId + ") not found");
+ }
+
+ int currentUsage = counter.decrementAndGet();
+ if (currentUsage == 0) {
+ ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.remove(helperTxnId);
+ try (Statement stmt = originalConnection.createStatement()) { //
do not use wrapped xa stmt
+ stmt.executeUpdate("rollback prepared '" + helperTxnId + "'");
+ } catch (SQLException e) {
+ if ("42704".equals(e.getSQLState())) {
+ LOGGER.error("Helper txn ({}) not prepared; should be
possible with helper sharing", helperTxnId);
+ // This is caused by TM actively rolling back the branch
while the branch is preparing. We return an
+ // XAException to notify TM to try again later. Extremely
rare, but still possible (e.g., if helper
+ // batch size=1).
+ XAException xe = new XAException("Helper txn (" +
helperTxnId + ") not yet prepared, try again");
+ xe.errorCode = XAException.XA_RETRY;
+ throw xe;
+ }
+
+ LOGGER.error("Unexpected SQLException while rolling back
helper txn: {}", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
+ return true;
+ }
+
+ private void sonataPrePrepare() throws SQLException, XAException {
+ // For an S2PL DB, we add a dummy write; for an SSI DB, we add a
helper txn + a dummy write. Then,
+ // original prepare logic resumes.
+ if (DBType.MYSQL.name().equalsIgnoreCase(resource.getDbType())) {
+ int key = getS2plDummyKey(xaBranchXid);
+ int value = ThreadLocalRandom.current().nextInt();
+
+ try (Statement stmt = createStatement()) {
+ int retry = 0;
+ int affected = 0;
+ // Note: affected=1 for insert; =2 for update; =0 for update
but value unchanged
+ while (affected == 0) {
+ affected =
+ stmt.executeUpdate("insert into `" + DUMMY_TABLE +
"` (`key`, value) values (" + key + ","
+ + value + ") on duplicate key update
value="
+ + value);
+ retry++;
+ if (retry > S2PL_UPDATE_RETRY_WARNING_THRESHOLD) {
+ LOGGER.warn(
+ "MySQL random dummy writes generated conflict
with existing rows in {} consecutive attempts! This is super unlikely to occur,
please investigate.",
+ retry);
+ }
+ }
+ }
+ } else if
(DBType.POSTGRESQL.name().equalsIgnoreCase(resource.getDbType())) {
+ int key = getSsiDummyKey(xaBranchXid);
+ int value = ThreadLocalRandom.current().nextInt();
+
+ try (Statement stmt = createStatement()) {
+ // Unlike MySQL, PostgreSQL does not distinguish matched but
value-unchanged rows.
+ int affected;
+ try {
+ affected = stmt.executeUpdate(
+ "insert into \"" + DUMMY_TABLE + "\" (key, value)
values (" + key + "," + value
+ + ") on conflict (key) do update set
value="
+ + value);
+ } catch (SQLException e) {
+ if ("40001".equals(e.getSQLState())) {
+ throw new XAException("PostgreSQL dummy write of
global txn branch (" + xaBranchXid
+ + ") failed due to serialization failure");
+ }
+
+ LOGGER.error("Unexpected SQLException while PG dummy
write: {}", e.getMessage());
+ throw e;
+ }
+ if (affected != 1) {
+ // We don't know why this would happen (though
programmatically possible) so we do not
+ // handle it.
+ throw new RuntimeException(String.format(
Review Comment:
This RuntimeException in sonataPrePrepare should use IllegalStateException
for consistency with other unexpected state errors.
```suggestion
throw new IllegalStateException(String.format(
```
##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXA.java:
##########
@@ -135,4 +164,28 @@ private Connection getConnectionProxyXA(Connection
connection) throws SQLExcepti
connectionProxyXA.init();
return connectionProxyXA;
}
+
+ // Should be auto-executed when the datasource is closed. Now for
simplicity we let the application manually
+ // release the pending helpers.
+ public void releaseAllHelpers() throws SQLException {
+ if (!sonataSsiShimEnabled) {
+ throw new RuntimeException("Sonata SSI helper transactions are not
allowed for the datasource");
+ }
+
+ try (Connection conn = dataSource.getConnection()) {
+ for (int helperTxnId : HELPER_ID_REF_COUNT.keySet()) {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate("rollback prepared '" + helperTxnId +
"'");
+ }
+ }
+ }
+ }
+
+ protected Connection getSsiHelperConnection() throws SQLException {
+ if (!sonataSsiShimEnabled) {
+ throw new RuntimeException("Sonata SSI helper transactions are not
allowed for the datasource");
Review Comment:
The error message in getSsiHelperConnection should be more specific about
the requirements, similar to the issue in releaseAllHelpers. Additionally, both
methods throw the same generic error message, which could be extracted as a
constant for consistency.
##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
public void setCombine(boolean combine) {
this.combine = combine;
}
+
+ private int getS2plDummyKey(XAXid xid) {
+ while (true) {
+ int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+ if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+ if (prev != null) {
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(key);
+ // We don't know why this would happen (though
programmatically possible) so we do not handle it.
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already
associated with dummy key (%d)", xid, prev));
+ }
+ return key;
+ }
+ }
+ }
+
+ private int getSsiDummyKey(XAXid xid) throws SQLException {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+ ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ while (keyAndHelperId == null) {
+ synchronized (((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+ if (!((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+ keyAndHelperId = ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ continue;
+ }
+
+ // Inside this if block, reserved set is empty and only
current thread is making a new one, so new keys
+ // won't be added to active set, thus the following contains()
test is safe.
+
+ HashSet<Integer> newBatch = new HashSet<>();
+ while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+ int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+ if (!((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+ newBatch.add(newKey);
+ }
+ }
+
+ // We should initiate the new helper inside the synchronized
block, to guarantee that helper is prepared
+ // prior to all corresponding original transactions.
+
+ int helperTxnId;
+ AtomicInteger counter = new
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+ while (true) {
+ helperTxnId = random.nextInt();
+ AtomicInteger prevCounter =
+ ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+ if (prevCounter == null) {
+ break;
+ }
+ }
+
+ try (Connection helperConn = ((DataSourceProxyXA)
resource).getSsiHelperConnection()) {
+ helperConn.setAutoCommit(false);
+
+ try (Statement helperStmt = helperConn.createStatement()) {
+ for (Integer dummyKey : newBatch) {
+ try (ResultSet rs = helperStmt.executeQuery(
+ "select count(*) from \"" + DUMMY_TABLE +
"\" where key=" + dummyKey)) {
+ if (rs.next()) {
+ rs.getInt(1); // to make sure read is
executed in the DB
+ }
+ }
+ }
+
+ int ret = helperStmt.executeUpdate("prepare
transaction '" + helperTxnId + "'");
+ if (ret != 0) {
+ throw new RuntimeException(String.format(
+ "PostgreSQL txn prepare returned %d, which
should be 0 instead", ret));
+ }
+ }
+ }
+
+ // Mark them all active
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.addAll(newBatch);
+
+ // Make them visible in random order
+ List<Integer> asList = new ArrayList<>(newBatch);
+ Collections.shuffle(asList);
+ int firstKey = asList.remove(0); // take one for ourselves to
avoid loop again
+ for (int otherKey : asList) {
+ ((DataSourceProxyXA) resource)
+ .RESERVED_DUMMY_KEYS_AND_HELPER_IDS.add(
+ new AbstractMap.SimpleEntry<>(otherKey,
helperTxnId));
+ }
+
+ keyAndHelperId = new AbstractMap.SimpleEntry<>(firstKey,
helperTxnId);
+ }
+ }
+
+ // associate xid with dummy key
+ Integer prevDummy = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, keyAndHelperId.getKey());
+ if (prevDummy != null) {
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already associated
with dummy key (%d)", xid, prevDummy));
+ }
+
+ // associate xid with helper id
+ Integer prevHelper =
+ ((DataSourceProxyXA)
resource).XID_TO_HELPER_ID.putIfAbsent(xid, keyAndHelperId.getValue());
+ if (prevHelper != null) {
+ throw new RuntimeException(String.format(
+ "Global txn branch (%s) already associated with helper txn
ID (%d)", xid, prevHelper));
+ }
+
+ return keyAndHelperId.getKey();
+ }
+
+ private void forgetDummyKey(XAXid xid) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.remove(xid);
+ if (prev == null) {
+ // Possible if TM initiated a rollback, and RM is yet to prepare
the branch
+ return;
+ }
+
+ boolean recorded = ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(prev);
+ if (!recorded) {
+ throw new RuntimeException(String.format(
+ "Dummy key (%d) mapped to global txn branch (%s) not found
in active dummy key set", prev, xid));
+ }
+ }
+
+ private boolean releaseHelperTxn(XAXid xid) throws XAException {
+ Integer helperTxnId = ((DataSourceProxyXA)
resource).XID_TO_HELPER_ID.remove(xid);
+
+ if (helperTxnId == null) {
+ // Possible if TM initiated a rollback, and RM is yet to prepare
the branch
+ return false;
+ }
+
+ AtomicInteger counter = ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.get(helperTxnId);
+ if (counter == null) {
+ throw new RuntimeException("Reference counter for helper txn (" +
helperTxnId + ") not found");
+ }
+
+ int currentUsage = counter.decrementAndGet();
+ if (currentUsage == 0) {
+ ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.remove(helperTxnId);
+ try (Statement stmt = originalConnection.createStatement()) { //
do not use wrapped xa stmt
+ stmt.executeUpdate("rollback prepared '" + helperTxnId + "'");
+ } catch (SQLException e) {
+ if ("42704".equals(e.getSQLState())) {
+ LOGGER.error("Helper txn ({}) not prepared; should be
possible with helper sharing", helperTxnId);
+ // This is caused by TM actively rolling back the branch
while the branch is preparing. We return an
+ // XAException to notify TM to try again later. Extremely
rare, but still possible (e.g., if helper
+ // batch size=1).
+ XAException xe = new XAException("Helper txn (" +
helperTxnId + ") not yet prepared, try again");
+ xe.errorCode = XAException.XA_RETRY;
+ throw xe;
+ }
+
+ LOGGER.error("Unexpected SQLException while rolling back
helper txn: {}", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
+ return true;
+ }
+
+ private void sonataPrePrepare() throws SQLException, XAException {
+ // For an S2PL DB, we add a dummy write; for an SSI DB, we add a
helper txn + a dummy write. Then,
+ // original prepare logic resumes.
+ if (DBType.MYSQL.name().equalsIgnoreCase(resource.getDbType())) {
+ int key = getS2plDummyKey(xaBranchXid);
+ int value = ThreadLocalRandom.current().nextInt();
+
+ try (Statement stmt = createStatement()) {
+ int retry = 0;
+ int affected = 0;
+ // Note: affected=1 for insert; =2 for update; =0 for update
but value unchanged
+ while (affected == 0) {
+ affected =
+ stmt.executeUpdate("insert into `" + DUMMY_TABLE +
"` (`key`, value) values (" + key + ","
+ + value + ") on duplicate key update
value="
+ + value);
+ retry++;
+ if (retry > S2PL_UPDATE_RETRY_WARNING_THRESHOLD) {
+ LOGGER.warn(
+ "MySQL random dummy writes generated conflict
with existing rows in {} consecutive attempts! This is super unlikely to occur,
please investigate.",
+ retry);
+ }
+ }
Review Comment:
The SQL query construction in sonataPrePrepare uses string concatenation
with user-controlled values ('key' and 'value'), which could potentially lead
to SQL injection if DUMMY_TABLE name is user-configurable. Consider using
PreparedStatement instead of Statement with string concatenation for better
security and performance.
##########
rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java:
##########
@@ -452,4 +566,219 @@ public ResourceLock getResourceLock() {
public void setCombine(boolean combine) {
this.combine = combine;
}
+
+ private int getS2plDummyKey(XAXid xid) {
+ while (true) {
+ int key = ThreadLocalRandom.current().nextInt(DUMMY_TABLE_SIZE);
+ if (((DataSourceProxyXA) resource).ACTIVE_DUMMY_KEYS.add(key)) {
+ Integer prev = ((DataSourceProxyXA)
resource).XID_TO_DUMMY_KEY.putIfAbsent(xid, key);
+ if (prev != null) {
+ ((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.remove(key);
+ // We don't know why this would happen (though
programmatically possible) so we do not handle it.
+ throw new RuntimeException(
+ String.format("Global txn branch (%s) already
associated with dummy key (%d)", xid, prev));
+ }
+ return key;
+ }
+ }
+ }
+
+ private int getSsiDummyKey(XAXid xid) throws SQLException {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ AbstractMap.SimpleEntry<Integer, Integer> keyAndHelperId =
+ ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ while (keyAndHelperId == null) {
+ synchronized (((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS) {
+ if (!((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.isEmpty()) {
+ keyAndHelperId = ((DataSourceProxyXA)
resource).RESERVED_DUMMY_KEYS_AND_HELPER_IDS.poll();
+ continue;
+ }
+
+ // Inside this if block, reserved set is empty and only
current thread is making a new one, so new keys
+ // won't be added to active set, thus the following contains()
test is safe.
+
+ HashSet<Integer> newBatch = new HashSet<>();
+ while (newBatch.size() < SSI_HELPER_BATCH_SIZE) {
+ int newKey = random.nextInt(DUMMY_TABLE_SIZE);
+ if (!((DataSourceProxyXA)
resource).ACTIVE_DUMMY_KEYS.contains(newKey)) {
+ newBatch.add(newKey);
+ }
+ }
+
+ // We should initiate the new helper inside the synchronized
block, to guarantee that helper is prepared
+ // prior to all corresponding original transactions.
+
+ int helperTxnId;
+ AtomicInteger counter = new
AtomicInteger(SSI_HELPER_BATCH_SIZE);
+ while (true) {
+ helperTxnId = random.nextInt();
+ AtomicInteger prevCounter =
+ ((DataSourceProxyXA)
resource).HELPER_ID_REF_COUNT.put(helperTxnId, counter);
+ if (prevCounter == null) {
+ break;
+ }
+ }
+
+ try (Connection helperConn = ((DataSourceProxyXA)
resource).getSsiHelperConnection()) {
+ helperConn.setAutoCommit(false);
+
+ try (Statement helperStmt = helperConn.createStatement()) {
+ for (Integer dummyKey : newBatch) {
+ try (ResultSet rs = helperStmt.executeQuery(
+ "select count(*) from \"" + DUMMY_TABLE +
"\" where key=" + dummyKey)) {
Review Comment:
The query construction in getSsiDummyKey uses string concatenation for the
dummy key value. While 'dummyKey' is an integer (less risky), using
PreparedStatement would be more consistent with SQL best practices and provide
better protection against potential issues.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]