lhotari commented on code in PR #25821:
URL: https://github.com/apache/pulsar/pull/25821#discussion_r3272485197
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java:
##########
@@ -136,46 +157,82 @@ private void recover() {
}
subscription = handle;
- // Scan all /txn/op records for this segment, group by txnId.
- Map<String, List<Position>> opsByTxn = new ConcurrentHashMap<>();
- txnStore.listWritesBySegment(segmentName, new ScanConsumer() {
- @Override
- public void onNext(GetResult r) {
- TxnOp op = TxnMetadataStore.fromJson(r.getValue(),
TxnOp.class);
- String txnIdKey =
TxnPaths.txnIdFromOpPath(r.getStat().getPath());
- if (txnIdKey == null) {
- return;
- }
- opsByTxn.computeIfAbsent(txnIdKey, k -> new ArrayList<>())
- .add(PositionFactory.create(op.getLedgerId(),
op.getEntryId()));
- }
+ // 1. Load durable watermark.
+ CompletableFuture<Void> watermarkLoad =
txnStore.getSegmentWatermark(segmentName)
+ .thenAccept(opt -> {
+ if (opt.isPresent()) {
+ synchronized (lock) {
Review Comment:
Just one comment about synchronization.
A further improvement could be to refactor to a model which avoids
synchronization. One approach would be to use virtual threads on Java 25 which
handles synchronization in a non-blocking way, however that might not be
feasible with Netty. Another approach would be to follow "single writer
principle" and ensure that mutations happen in a single thread sequentially. A
third option would be to use state objects that tie all individual state
changes together and use volatile or CAS to ensure that state transitions are
atomic without inconsistencies. The current Pulsar code base uses a lot of
synchronization and it would be great if we could reduce that since
synchronization causes issues with Netty IO threads and thread pools in general.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java:
##########
@@ -134,6 +149,85 @@ public static String ackIndexKey(String segment, String
subscription) {
return segmentKey(segment) + ":" + Codec.encode(subscription);
}
+ /** Parent path for per-segment watermark records. Records are direct
children of this. */
+ public static final String TXN_SEGMENT_WATERMARK_PREFIX =
TXN_SEGMENT_STATE_PREFIX + "/watermark";
+
+ /** Parent path for per-aborted-txn records, flat across all segments. */
+ public static final String TXN_SEGMENT_ABORTED_PREFIX =
TXN_SEGMENT_STATE_PREFIX + "/aborted";
+
+ /**
+ * @return {@code /txn/segment-state/watermark/<encoded-segment>} —
durable watermark record
+ * for {@code segment}. Direct child of {@link
#TXN_SEGMENT_WATERMARK_PREFIX} so the
+ * fallback {@code scanByIndex} path works on backends without a
native secondary index.
+ */
+ public static String segmentWatermarkPath(String segment) {
+ return TXN_SEGMENT_WATERMARK_PREFIX + "/" + segmentKey(segment);
+ }
+
+ /**
+ * @return {@code /txn/segment-state/aborted/<encoded-segment>:<txnId>} —
durable
+ * per-aborted-txn record. Direct child of {@link
#TXN_SEGMENT_ABORTED_PREFIX} (flat
+ * across all segments) so the fallback scan finds it; {@code
<encoded-segment>:<txnId>}
+ * keeps the per-segment grouping addressable.
+ */
+ public static String segmentAbortedTxnPath(String segment, String txnId) {
+ return TXN_SEGMENT_ABORTED_PREFIX + "/" + segmentKey(segment) + ":" +
txnId;
+ }
+
+ /**
+ * @return the {@link #IDX_TXN_ABORTED_BY_POSITION} index key for a
per-segment aborted-txn
+ * record. Format: {@code
<encoded-segment>:padded(ledgerId):padded(entryId)} — the
+ * encoded prefix scopes scans to one segment; the padded position is
lexicographically
+ * ordered so range scans by trim point work naturally.
+ */
+ public static String abortedByPositionIndexKey(String segment, long
ledgerId, long entryId) {
+ return segmentKey(segment) + ":" + longKey(ledgerId) + ":" +
longKey(entryId);
+ }
+
+ /** @return the lower bound of the per-segment range in {@link
#IDX_TXN_ABORTED_BY_POSITION}. */
+ public static String abortedByPositionSegmentLowerBound(String segment) {
+ return segmentKey(segment) + ":" + longKey(0L) + ":" + longKey(0L);
+ }
+
+ /** @return the upper bound of the per-segment range in {@link
#IDX_TXN_ABORTED_BY_POSITION}. */
+ public static String abortedByPositionSegmentUpperBound(String segment) {
+ return segmentKey(segment) + ":" + MAX_LONG_KEY + ":" + MAX_LONG_KEY;
Review Comment:
the suffix could be a constant
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java:
##########
@@ -134,6 +149,85 @@ public static String ackIndexKey(String segment, String
subscription) {
return segmentKey(segment) + ":" + Codec.encode(subscription);
}
+ /** Parent path for per-segment watermark records. Records are direct
children of this. */
+ public static final String TXN_SEGMENT_WATERMARK_PREFIX =
TXN_SEGMENT_STATE_PREFIX + "/watermark";
+
+ /** Parent path for per-aborted-txn records, flat across all segments. */
+ public static final String TXN_SEGMENT_ABORTED_PREFIX =
TXN_SEGMENT_STATE_PREFIX + "/aborted";
+
+ /**
+ * @return {@code /txn/segment-state/watermark/<encoded-segment>} —
durable watermark record
+ * for {@code segment}. Direct child of {@link
#TXN_SEGMENT_WATERMARK_PREFIX} so the
+ * fallback {@code scanByIndex} path works on backends without a
native secondary index.
+ */
+ public static String segmentWatermarkPath(String segment) {
+ return TXN_SEGMENT_WATERMARK_PREFIX + "/" + segmentKey(segment);
+ }
+
+ /**
+ * @return {@code /txn/segment-state/aborted/<encoded-segment>:<txnId>} —
durable
+ * per-aborted-txn record. Direct child of {@link
#TXN_SEGMENT_ABORTED_PREFIX} (flat
+ * across all segments) so the fallback scan finds it; {@code
<encoded-segment>:<txnId>}
+ * keeps the per-segment grouping addressable.
+ */
+ public static String segmentAbortedTxnPath(String segment, String txnId) {
+ return TXN_SEGMENT_ABORTED_PREFIX + "/" + segmentKey(segment) + ":" +
txnId;
+ }
+
+ /**
+ * @return the {@link #IDX_TXN_ABORTED_BY_POSITION} index key for a
per-segment aborted-txn
+ * record. Format: {@code
<encoded-segment>:padded(ledgerId):padded(entryId)} — the
+ * encoded prefix scopes scans to one segment; the padded position is
lexicographically
+ * ordered so range scans by trim point work naturally.
+ */
+ public static String abortedByPositionIndexKey(String segment, long
ledgerId, long entryId) {
+ return segmentKey(segment) + ":" + longKey(ledgerId) + ":" +
longKey(entryId);
+ }
+
+ /** @return the lower bound of the per-segment range in {@link
#IDX_TXN_ABORTED_BY_POSITION}. */
+ public static String abortedByPositionSegmentLowerBound(String segment) {
+ return segmentKey(segment) + ":" + longKey(0L) + ":" + longKey(0L);
Review Comment:
the suffix could be a constant
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java:
##########
@@ -134,6 +149,85 @@ public static String ackIndexKey(String segment, String
subscription) {
return segmentKey(segment) + ":" + Codec.encode(subscription);
}
+ /** Parent path for per-segment watermark records. Records are direct
children of this. */
+ public static final String TXN_SEGMENT_WATERMARK_PREFIX =
TXN_SEGMENT_STATE_PREFIX + "/watermark";
+
+ /** Parent path for per-aborted-txn records, flat across all segments. */
+ public static final String TXN_SEGMENT_ABORTED_PREFIX =
TXN_SEGMENT_STATE_PREFIX + "/aborted";
+
+ /**
+ * @return {@code /txn/segment-state/watermark/<encoded-segment>} —
durable watermark record
+ * for {@code segment}. Direct child of {@link
#TXN_SEGMENT_WATERMARK_PREFIX} so the
+ * fallback {@code scanByIndex} path works on backends without a
native secondary index.
+ */
+ public static String segmentWatermarkPath(String segment) {
+ return TXN_SEGMENT_WATERMARK_PREFIX + "/" + segmentKey(segment);
+ }
+
+ /**
+ * @return {@code /txn/segment-state/aborted/<encoded-segment>:<txnId>} —
durable
+ * per-aborted-txn record. Direct child of {@link
#TXN_SEGMENT_ABORTED_PREFIX} (flat
+ * across all segments) so the fallback scan finds it; {@code
<encoded-segment>:<txnId>}
+ * keeps the per-segment grouping addressable.
+ */
+ public static String segmentAbortedTxnPath(String segment, String txnId) {
+ return TXN_SEGMENT_ABORTED_PREFIX + "/" + segmentKey(segment) + ":" +
txnId;
+ }
+
+ /**
+ * @return the {@link #IDX_TXN_ABORTED_BY_POSITION} index key for a
per-segment aborted-txn
+ * record. Format: {@code
<encoded-segment>:padded(ledgerId):padded(entryId)} — the
+ * encoded prefix scopes scans to one segment; the padded position is
lexicographically
+ * ordered so range scans by trim point work naturally.
+ */
+ public static String abortedByPositionIndexKey(String segment, long
ledgerId, long entryId) {
+ return segmentKey(segment) + ":" + longKey(ledgerId) + ":" +
longKey(entryId);
+ }
+
+ /** @return the lower bound of the per-segment range in {@link
#IDX_TXN_ABORTED_BY_POSITION}. */
+ public static String abortedByPositionSegmentLowerBound(String segment) {
+ return segmentKey(segment) + ":" + longKey(0L) + ":" + longKey(0L);
+ }
+
+ /** @return the upper bound of the per-segment range in {@link
#IDX_TXN_ABORTED_BY_POSITION}. */
+ public static String abortedByPositionSegmentUpperBound(String segment) {
+ return segmentKey(segment) + ":" + MAX_LONG_KEY + ":" + MAX_LONG_KEY;
+ }
+
+ /**
+ * Extract the {@code txnId} part from a path returned by {@link
#segmentAbortedTxnPath}. The
+ * path is {@code .../aborted/<encoded-segment>:<txnId>}; {@code
encoded-segment} is URL-
+ * encoded (no {@code :}) so the first {@code :} in the trailing name is
the segment / txn
+ * separator.
+ *
+ * @return the txnId key, or {@code null} if {@code abortedPath} doesn't
match
+ */
+ public static String txnIdFromAbortedPath(String abortedPath) {
+ int lastSlash = abortedPath.lastIndexOf('/');
+ if (lastSlash < 0) {
+ return null;
+ }
+ String name = abortedPath.substring(lastSlash + 1);
+ int colon = name.indexOf(':');
+ if (colon < 0 || colon == name.length() - 1) {
+ return null;
+ }
+ return name.substring(colon + 1);
+ }
+
+ /**
+ * @return the {@code segmentKey} part from a path returned by {@link
#segmentAbortedTxnPath}.
+ */
+ public static String segmentKeyFromAbortedPath(String abortedPath) {
+ int lastSlash = abortedPath.lastIndexOf('/');
+ if (lastSlash < 0) {
+ return null;
+ }
+ String name = abortedPath.substring(lastSlash + 1);
+ int colon = name.indexOf(':');
+ return colon < 0 ? null : name.substring(0, colon);
+ }
+
/** @return {@code value} formatted as a zero-padded fixed-width decimal
for use as a range-scan index key. */
public static String longKey(long value) {
return String.format("%0" + LONG_KEY_WIDTH + "d", value);
Review Comment:
unrelated to this PR. For performance reasons, String.format should be
avoided in hot paths.
Claude suggested this type of optimization (assumes non-negative value):
```java
public static String longKey(long value) {
char[] buf = new char[20];
long v = value; // assume non-negative for brevity
for (int i = 19; i >= 0; i--) {
buf[i] = (char) ('0' + (v % 10));
v /= 10;
}
return new String(buf);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]