This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e32f36b6456 fix: Follow-ups to JsonKinesisSource: numeric sequence 
comparison and call-site fixes (#18689)
6e32f36b6456 is described below

commit 6e32f36b64566738e747986a67543801be3da3d8
Author: Lin Liu <[email protected]>
AuthorDate: Fri May 15 14:23:19 2026 -0700

    fix: Follow-ups to JsonKinesisSource: numeric sequence comparison and 
call-site fixes (#18689)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/utilities/sources/JsonKinesisSource.java  |  4 ++--
 .../hudi/utilities/sources/KinesisSource.java      |  6 ++++--
 .../sources/helpers/KinesisOffsetGen.java          | 22 ++++++++++++++++++----
 .../sources/helpers/KinesisReadConfig.java         |  4 ++--
 4 files changed, 26 insertions(+), 10 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java
index 7ad947830097..3a297467c82e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java
@@ -143,12 +143,12 @@ public class JsonKinesisSource extends 
KinesisSource<JavaRDD<String>> {
               KinesisSource.ShardRecordIterator recordIt = 
KinesisSource.readShardRecords(
                   client, readConfig.getStreamName(), range, 
readConfig.getStartingPosition(),
                   readConfig.getMaxRecordsPerRequest(), 
readConfig.getIntervalMilliSeconds(),
-                  readConfig.getMaxRecordsPerShard(), 
readConfig.isEnableDeaggregation(),
+                  readConfig.getMaxRecordsPerShard(), 
readConfig.isDeaggregationEnabled(),
                   readConfig.getRetryInitialIntervalMs(), 
readConfig.getRetryMaxIntervalMs(),
                   readConfig.getThrottleTimeoutMs());
 
               String shardId = range.getShardId();
-              boolean addMetaFields = readConfig.isShouldAddMetaFields();
+              boolean addMetaFields = readConfig.isMetaFieldsEnabled();
               List<String> jsonRecords = new ArrayList<>();
               long numNull = 0;
               java.time.Instant lastArrivalTimestamp = null;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
index 919bac890581..a5ec35994d8e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
@@ -237,7 +237,7 @@ public abstract class KinesisSource<T> extends Source<T> {
           response = client.getRecords(
               GetRecordsRequest.builder()
                   .shardIterator(shardIteratorStr)
-                  .limit(Math.min(currentMaxRecords, (int) (maxTotalRecords - 
totalConsumed)))
+                  .limit(Math.min(currentMaxRecords, 
Math.toIntExact(maxTotalRecords - totalConsumed)))
                   .build());
           lastSuccessTimeMs = System.currentTimeMillis();
           break;
@@ -290,7 +290,9 @@ public abstract class KinesisSource<T> extends Source<T> {
 
       // Process records first (done above), then decide whether to stop.
       // millisBehindLatest can be 0 in LocalStack even when the response 
contained records.
-      if (response.millisBehindLatest() == 0) {
+      // It is documented as nullable on AWS SDK responses, so guard against 
NPE on auto-unbox.
+      Long millisBehind = response.millisBehindLatest();
+      if (millisBehind != null && millisBehind == 0) {
         fetchingDone = true;
       }
       if (shardIteratorStr == null) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
index b0f1779a50be..27d160c3de0f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
@@ -42,6 +42,7 @@ import 
software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceed
 import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 
+import java.math.BigInteger;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -162,6 +163,17 @@ public class KinesisOffsetGen {
           Option.ofNullable(getEndSeqFromValue(value)));
     }
 
+    /**
+     * Compares two Kinesis sequence numbers numerically.
+     * Kinesis sequence numbers are 128-bit integers represented as decimal 
strings whose lengths
+     * can vary, so lexicographic {@link String#compareTo} is not correct for 
numeric ordering.
+     *
+     * @return negative if a &lt; b, zero if equal, positive if a &gt; b
+     */
+    public static int compareSequenceNumbers(String a, String b) {
+      return new BigInteger(a).compareTo(new BigInteger(b));
+    }
+
     /**
      * Build checkpoint value without arrival time: "lastSeq" or 
"lastSeq|endSeq".
      */
@@ -262,7 +274,7 @@ public class KinesisOffsetGen {
         return !useLatestWhenNoCheckpoint;
       }
       // CASE 3: Closed shard: lastSeq >= endSeq means fully consumed
-      if (lastSeq.compareTo(endSeq) >= 0) {
+      if (CheckpointUtils.compareSequenceNumbers(lastSeq, endSeq) >= 0) {
         return false;
       }
       // CASE 4: lastSeq < endSeq: may have unread records
@@ -459,9 +471,11 @@ public class KinesisOffsetGen {
       Option<String> lastSeqOpt = seqs.getLeft();
       Option<String> endSeqOpt = seqs.getRight();
       // endSeq absent = was open shard; conservatively assume not fully 
consumed.
-      // endSeq present: fully consumed iff lastSeq >= endSeq.
+      // endSeq present with non-empty lastSeq: fully consumed iff lastSeq >= 
endSeq.
+      // Empty lastSeq (e.g. "|endSeq" checkpoint) is treated as not consumed.
       boolean fullyConsumed = endSeqOpt.isPresent()
-          && lastSeqOpt.map(last -> last.compareTo(endSeqOpt.get()) >= 
0).orElse(false);
+          && lastSeqOpt.map(last -> !last.isEmpty()
+              && CheckpointUtils.compareSequenceNumbers(last, endSeqOpt.get()) 
>= 0).orElse(false);
       if (fullyConsumed) {
         log.info("Expired shard {} was fully consumed (lastSeq >= endSeq); 
pruning from checkpoint", shardId);
       } else {
@@ -495,7 +509,7 @@ public class KinesisOffsetGen {
         continue;
       }
       // lastSeq < shardStartSeq: records between the checkpoint and the trim 
horizon were dropped.
-      if (lastSeq.compareTo(shardStartSeq) < 0) {
+      if (CheckpointUtils.compareSequenceNumbers(lastSeq, shardStartSeq) < 0) {
         reportDataLoss("Shard " + shardId + " checkpoint lastSeq=" + lastSeq
             + " is before the shard's earliest available sequence number " + 
shardStartSeq
             + ". Records may have been trimmed due to retention expiry (data 
loss).");
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java
index e6bf12c5f8b1..7fe53f24d0f9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java
@@ -41,8 +41,8 @@ public class KinesisReadConfig implements Serializable {
   private final String accessKey; // null if not set
   private final String secretKey; // null if not set
   private final KinesisSourceConfig.KinesisStartingPositionStrategy 
startingPosition;
-  private final boolean shouldAddMetaFields;
-  private final boolean enableDeaggregation;
+  private final boolean metaFieldsEnabled;
+  private final boolean deaggregationEnabled;
   private final int maxRecordsPerRequest;
   private final long intervalMilliSeconds;
   private final long maxRecordsPerShard;

Reply via email to