This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7567cbc  KAFKA-13476: Increase resilience timestamp decoding Kafka 
Streams (#11535)
7567cbc is described below

commit 7567cbc857eef4f535410a8f6256308197c3b9c8
Author: Richard <[email protected]>
AuthorDate: Thu Jan 6 06:38:10 2022 +0100

    KAFKA-13476: Increase resilience timestamp decoding Kafka Streams (#11535)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../streams/processor/internals/StreamTask.java    | 23 +++++++++++++---------
 .../processor/internals/StreamTaskTest.java        |  7 +++++++
 2 files changed, 21 insertions(+), 9 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 32369c9..6823d2e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1097,15 +1097,20 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         if (encryptedString.isEmpty()) {
             return RecordQueue.UNKNOWN;
         }
-        final ByteBuffer buffer = 
ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
-        final byte version = buffer.get();
-        switch (version) {
-            case LATEST_MAGIC_BYTE:
-                return buffer.getLong();
-            default:
-                log.warn("Unsupported offset metadata version found. Supported 
version {}. Found version {}.",
-                         LATEST_MAGIC_BYTE, version);
-                return RecordQueue.UNKNOWN;
+        try {
+            final ByteBuffer buffer = 
ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
+            final byte version = buffer.get();
+            switch (version) {
+                case LATEST_MAGIC_BYTE:
+                    return buffer.getLong();
+                default:
+                    log.warn("Unsupported offset metadata version found. 
Supported version {}. Found version {}.",
+                            LATEST_MAGIC_BYTE, version);
+                    return RecordQueue.UNKNOWN;
+            }
+        } catch (final Exception exception) {
+            log.warn("Unsupported offset metadata found");
+            return RecordQueue.UNKNOWN;
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1b3c500..3c1814e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1184,6 +1184,13 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldReturnUnknownTimestampIfInvalidMetadata() {
+        task = createStatelessTask(createConfig("100"));
+        final String invalidBase64String = "{}";
+        assertEquals(RecordQueue.UNKNOWN, 
task.decodeTimestamp(invalidBase64String));
+    }
+
+    @Test
     public void shouldBeProcessableIfAllPartitionsBuffered() {
         task = createStatelessTask(createConfig("100"));
         task.initializeIfNeeded();

Reply via email to