This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6e32f36b6456 fix: Follow-ups to JsonKinesisSource: numeric sequence
comparison and call-site fixes (#18689)
6e32f36b6456 is described below
commit 6e32f36b64566738e747986a67543801be3da3d8
Author: Lin Liu <[email protected]>
AuthorDate: Fri May 15 14:23:19 2026 -0700
fix: Follow-ups to JsonKinesisSource: numeric sequence comparison and
call-site fixes (#18689)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/utilities/sources/JsonKinesisSource.java | 4 ++--
.../hudi/utilities/sources/KinesisSource.java | 6 ++++--
.../sources/helpers/KinesisOffsetGen.java | 22 ++++++++++++++++++----
.../sources/helpers/KinesisReadConfig.java | 4 ++--
4 files changed, 26 insertions(+), 10 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java
index 7ad947830097..3a297467c82e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java
@@ -143,12 +143,12 @@ public class JsonKinesisSource extends
KinesisSource<JavaRDD<String>> {
KinesisSource.ShardRecordIterator recordIt =
KinesisSource.readShardRecords(
client, readConfig.getStreamName(), range,
readConfig.getStartingPosition(),
readConfig.getMaxRecordsPerRequest(),
readConfig.getIntervalMilliSeconds(),
- readConfig.getMaxRecordsPerShard(),
readConfig.isEnableDeaggregation(),
+ readConfig.getMaxRecordsPerShard(),
readConfig.isDeaggregationEnabled(),
readConfig.getRetryInitialIntervalMs(),
readConfig.getRetryMaxIntervalMs(),
readConfig.getThrottleTimeoutMs());
String shardId = range.getShardId();
- boolean addMetaFields = readConfig.isShouldAddMetaFields();
+ boolean addMetaFields = readConfig.isMetaFieldsEnabled();
List<String> jsonRecords = new ArrayList<>();
long numNull = 0;
java.time.Instant lastArrivalTimestamp = null;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
index 919bac890581..a5ec35994d8e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
@@ -237,7 +237,7 @@ public abstract class KinesisSource<T> extends Source<T> {
response = client.getRecords(
GetRecordsRequest.builder()
.shardIterator(shardIteratorStr)
- .limit(Math.min(currentMaxRecords, (int) (maxTotalRecords -
totalConsumed)))
+ .limit(Math.min(currentMaxRecords,
Math.toIntExact(maxTotalRecords - totalConsumed)))
.build());
lastSuccessTimeMs = System.currentTimeMillis();
break;
@@ -290,7 +290,9 @@ public abstract class KinesisSource<T> extends Source<T> {
// Process records first (done above), then decide whether to stop.
// millisBehindLatest can be 0 in LocalStack even when the response
contained records.
- if (response.millisBehindLatest() == 0) {
+ // It is documented as nullable on AWS SDK responses, so guard against
NPE on auto-unbox.
+ Long millisBehind = response.millisBehindLatest();
+ if (millisBehind != null && millisBehind == 0) {
fetchingDone = true;
}
if (shardIteratorStr == null) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
index b0f1779a50be..27d160c3de0f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
@@ -42,6 +42,7 @@ import
software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceed
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;
+import java.math.BigInteger;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
@@ -162,6 +163,17 @@ public class KinesisOffsetGen {
Option.ofNullable(getEndSeqFromValue(value)));
}
+ /**
+ * Compares two Kinesis sequence numbers numerically.
+ * Kinesis sequence numbers are 128-bit integers represented as decimal
strings whose lengths
+ * can vary, so lexicographic {@link String#compareTo} is not correct for
numeric ordering.
+ *
+ * @return negative if a < b, zero if equal, positive if a > b
+ */
+ public static int compareSequenceNumbers(String a, String b) {
+ return new BigInteger(a).compareTo(new BigInteger(b));
+ }
+
/**
* Build checkpoint value without arrival time: "lastSeq" or
"lastSeq|endSeq".
*/
@@ -262,7 +274,7 @@ public class KinesisOffsetGen {
return !useLatestWhenNoCheckpoint;
}
// CASE 3: Closed shard: lastSeq >= endSeq means fully consumed
- if (lastSeq.compareTo(endSeq) >= 0) {
+ if (CheckpointUtils.compareSequenceNumbers(lastSeq, endSeq) >= 0) {
return false;
}
// CASE 4: lastSeq < endSeq: may have unread records
@@ -459,9 +471,11 @@ public class KinesisOffsetGen {
Option<String> lastSeqOpt = seqs.getLeft();
Option<String> endSeqOpt = seqs.getRight();
// endSeq absent = was open shard; conservatively assume not fully
consumed.
- // endSeq present: fully consumed iff lastSeq >= endSeq.
+ // endSeq present with non-empty lastSeq: fully consumed iff lastSeq >=
endSeq.
+ // Empty lastSeq (e.g. "|endSeq" checkpoint) is treated as not consumed.
boolean fullyConsumed = endSeqOpt.isPresent()
- && lastSeqOpt.map(last -> last.compareTo(endSeqOpt.get()) >=
0).orElse(false);
+ && lastSeqOpt.map(last -> !last.isEmpty()
+ && CheckpointUtils.compareSequenceNumbers(last, endSeqOpt.get())
>= 0).orElse(false);
if (fullyConsumed) {
log.info("Expired shard {} was fully consumed (lastSeq >= endSeq);
pruning from checkpoint", shardId);
} else {
@@ -495,7 +509,7 @@ public class KinesisOffsetGen {
continue;
}
// lastSeq < shardStartSeq: records between the checkpoint and the trim
horizon were dropped.
- if (lastSeq.compareTo(shardStartSeq) < 0) {
+ if (CheckpointUtils.compareSequenceNumbers(lastSeq, shardStartSeq) < 0) {
reportDataLoss("Shard " + shardId + " checkpoint lastSeq=" + lastSeq
+ " is before the shard's earliest available sequence number " +
shardStartSeq
+ ". Records may have been trimmed due to retention expiry (data
loss).");
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java
index e6bf12c5f8b1..7fe53f24d0f9 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java
@@ -41,8 +41,8 @@ public class KinesisReadConfig implements Serializable {
private final String accessKey; // null if not set
private final String secretKey; // null if not set
private final KinesisSourceConfig.KinesisStartingPositionStrategy
startingPosition;
- private final boolean shouldAddMetaFields;
- private final boolean enableDeaggregation;
+ private final boolean metaFieldsEnabled;
+ private final boolean deaggregationEnabled;
private final int maxRecordsPerRequest;
private final long intervalMilliSeconds;
private final long maxRecordsPerShard;