mjsax commented on a change in pull request #9255:
URL: https://github.com/apache/kafka/pull/9255#discussion_r487321249



##########
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##########
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
         assertEquals(msg, exception.get().getMessage());
         assertEquals(1, count.get());
     }
+
+    @Test
+    public void shouldAcceptValidDateFormats() throws ParseException {
+        //check valid formats
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
+    }
+
+    @Test
+    public void shouldThrowOnInvalidDateFormat() {
+        //check some invalid formats
+        try {
+            invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));
+            fail("Call to getDateTime should fail");
+        } catch (final Exception e) {
+            e.printStackTrace();

Review comment:
       We should not print the stacktrace imho, but verify the exception 
message using `assertThat`.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
         }
         return map;
     }
+
+    /**
+     * Convert an ISO8601 based timestamp to an epoch value

Review comment:
       nit: I don't know from the top of my head what the standard dictates. 
Might be worth adding to the comment?

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##########
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
         assertEquals(msg, exception.get().getMessage());
         assertEquals(1, count.get());
     }
+
+    @Test
+    public void shouldAcceptValidDateFormats() throws ParseException {
+        //check valid formats
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
+    }
+
+    @Test
+    public void shouldThrowOnInvalidDateFormat() {
+        //check some invalid formats
+        try {
+            invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));

Review comment:
       We should update this test and use `assertThrows` instead of 
`try-fail-catch`.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
         }
         return map;
     }
+
+    /**
+     * Convert an ISO8601 based timestamp to an epoch value
+     * @param timestamp to be converted
+     * @return epoch value of a given timestamp
+     * @throws ParseException for timestamp that doesn't follow ISO8601 format
+     */
+    public static long getDateTime(String timestamp) throws ParseException {
+        final String[] timestampParts = timestamp.split("T");
+        if (timestampParts.length < 2) {
+            throw new ParseException("Error parsing timestamp. It does not 
contain a 'T' according to ISO8601 format", timestamp.length());
+        }
+
+        final String secondPart = timestampParts[1];
+        if (secondPart == null || secondPart.isEmpty()) {
+            throw new ParseException("Error parsing timestamp. Time part after 
'T' is null or empty", timestamp.length());
+        }
+
+        if (!(secondPart.contains("+") || secondPart.contains("-") || 
secondPart.contains("Z"))) {

Review comment:
       I know that this is exiting logic that was just moved, but I am not sure 
if I understand how it works?

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -823,7 +809,7 @@ object ConsumerGroupCommand extends Logging {
           case (topicPartition, newOffset) => (topicPartition, new 
OffsetAndMetadata(newOffset))
         }
       } else if (opts.options.has(opts.resetToDatetimeOpt)) {
-        val timestamp = 
convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
+        val timestamp = 
Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt))

Review comment:
       I am must wondering if we can have a dependency from `core` to `clients` 
module? \cc @ijuma 

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##########
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
         assertEquals(msg, exception.get().getMessage());
         assertEquals(1, count.get());
     }
+
+    @Test
+    public void shouldAcceptValidDateFormats() throws ParseException {
+        //check valid formats
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
+    }
+
+    @Test
+    public void shouldThrowOnInvalidDateFormat() {
+        //check some invalid formats
+        try {
+            invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));
+            fail("Call to getDateTime should fail");
+        } catch (final Exception e) {
+            e.printStackTrace();

Review comment:
       We should not print the stacktrace imho, but verify the exception 
message using `assertThat`.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
         }
         return map;
     }
+
+    /**
+     * Convert an ISO8601 based timestamp to an epoch value

Review comment:
       nit: I don't know from the top of my head what the standard dictates. 
Might be worth adding to the comment?

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##########
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
         assertEquals(msg, exception.get().getMessage());
         assertEquals(1, count.get());
     }
+
+    @Test
+    public void shouldAcceptValidDateFormats() throws ParseException {
+        //check valid formats
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
+    }
+
+    @Test
+    public void shouldThrowOnInvalidDateFormat() {
+        //check some invalid formats
+        try {
+            invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));

Review comment:
       We should update this test and use `assertThrows` instead of 
`try-fail-catch`.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
         }
         return map;
     }
+
+    /**
+     * Convert an ISO8601 based timestamp to an epoch value
+     * @param timestamp to be converted
+     * @return epoch value of a given timestamp
+     * @throws ParseException for timestamp that doesn't follow ISO8601 format
+     */
+    public static long getDateTime(String timestamp) throws ParseException {
+        final String[] timestampParts = timestamp.split("T");
+        if (timestampParts.length < 2) {
+            throw new ParseException("Error parsing timestamp. It does not 
contain a 'T' according to ISO8601 format", timestamp.length());
+        }
+
+        final String secondPart = timestampParts[1];
+        if (secondPart == null || secondPart.isEmpty()) {
+            throw new ParseException("Error parsing timestamp. Time part after 
'T' is null or empty", timestamp.length());
+        }
+
+        if (!(secondPart.contains("+") || secondPart.contains("-") || 
secondPart.contains("Z"))) {

Review comment:
       I know that this is exiting logic that was just moved, but I am not sure 
if I understand how it works?

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -823,7 +809,7 @@ object ConsumerGroupCommand extends Logging {
           case (topicPartition, newOffset) => (topicPartition, new 
OffsetAndMetadata(newOffset))
         }
       } else if (opts.options.has(opts.resetToDatetimeOpt)) {
-        val timestamp = 
convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
+        val timestamp = 
Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt))

Review comment:
       I am must wondering if we can have a dependency from `core` to `clients` 
module? \cc @ijuma 

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##########
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
         assertEquals(msg, exception.get().getMessage());
         assertEquals(1, count.get());
     }
+
+    @Test
+    public void shouldAcceptValidDateFormats() throws ParseException {
+        //check valid formats
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
+    }
+
+    @Test
+    public void shouldThrowOnInvalidDateFormat() {
+        //check some invalid formats
+        try {
+            invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));
+            fail("Call to getDateTime should fail");
+        } catch (final Exception e) {
+            e.printStackTrace();

Review comment:
       We should not print the stacktrace imho, but verify the exception 
message using `assertThat`.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
         }
         return map;
     }
+
+    /**
+     * Convert an ISO8601 based timestamp to an epoch value

Review comment:
       nit: I don't know from the top of my head what the standard dictates. 
Might be worth adding to the comment?

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##########
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
         assertEquals(msg, exception.get().getMessage());
         assertEquals(1, count.get());
     }
+
+    @Test
+    public void shouldAcceptValidDateFormats() throws ParseException {
+        //check valid formats
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
+    }
+
+    @Test
+    public void shouldThrowOnInvalidDateFormat() {
+        //check some invalid formats
+        try {
+            invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));

Review comment:
       We should update this test and use `assertThrows` instead of 
`try-fail-catch`.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
         }
         return map;
     }
+
+    /**
+     * Convert an ISO8601 based timestamp to an epoch value
+     * @param timestamp to be converted
+     * @return epoch value of a given timestamp
+     * @throws ParseException for timestamp that doesn't follow ISO8601 format
+     */
+    public static long getDateTime(String timestamp) throws ParseException {
+        final String[] timestampParts = timestamp.split("T");
+        if (timestampParts.length < 2) {
+            throw new ParseException("Error parsing timestamp. It does not 
contain a 'T' according to ISO8601 format", timestamp.length());
+        }
+
+        final String secondPart = timestampParts[1];
+        if (secondPart == null || secondPart.isEmpty()) {
+            throw new ParseException("Error parsing timestamp. Time part after 
'T' is null or empty", timestamp.length());
+        }
+
+        if (!(secondPart.contains("+") || secondPart.contains("-") || 
secondPart.contains("Z"))) {

Review comment:
       I know that this is exiting logic that was just moved, but I am not sure 
if I understand how it works?

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -823,7 +809,7 @@ object ConsumerGroupCommand extends Logging {
           case (topicPartition, newOffset) => (topicPartition, new 
OffsetAndMetadata(newOffset))
         }
       } else if (opts.options.has(opts.resetToDatetimeOpt)) {
-        val timestamp = 
convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
+        val timestamp = 
Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt))

Review comment:
       I am must wondering if we can have a dependency from `core` to `clients` 
module? \cc @ijuma 

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##########
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
         assertEquals(msg, exception.get().getMessage());
         assertEquals(1, count.get());
     }
+
+    @Test
+    public void shouldAcceptValidDateFormats() throws ParseException {
+        //check valid formats
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
+    }
+
+    @Test
+    public void shouldThrowOnInvalidDateFormat() {
+        //check some invalid formats
+        try {
+            invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));
+            fail("Call to getDateTime should fail");
+        } catch (final Exception e) {
+            e.printStackTrace();

Review comment:
       We should not print the stacktrace imho, but verify the exception 
message using `assertThat`.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
         }
         return map;
     }
+
+    /**
+     * Convert an ISO8601 based timestamp to an epoch value

Review comment:
       nit: I don't know from the top of my head what the standard dictates. 
Might be worth adding to the comment?

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##########
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
         assertEquals(msg, exception.get().getMessage());
         assertEquals(1, count.get());
     }
+
+    @Test
+    public void shouldAcceptValidDateFormats() throws ParseException {
+        //check valid formats
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
+    }
+
+    @Test
+    public void shouldThrowOnInvalidDateFormat() {
+        //check some invalid formats
+        try {
+            invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));

Review comment:
       We should update this test and use `assertThrows` instead of 
`try-fail-catch`.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
         }
         return map;
     }
+
+    /**
+     * Convert an ISO8601 based timestamp to an epoch value
+     * @param timestamp to be converted
+     * @return epoch value of a given timestamp
+     * @throws ParseException for timestamp that doesn't follow ISO8601 format
+     */
+    public static long getDateTime(String timestamp) throws ParseException {
+        final String[] timestampParts = timestamp.split("T");
+        if (timestampParts.length < 2) {
+            throw new ParseException("Error parsing timestamp. It does not 
contain a 'T' according to ISO8601 format", timestamp.length());
+        }
+
+        final String secondPart = timestampParts[1];
+        if (secondPart == null || secondPart.isEmpty()) {
+            throw new ParseException("Error parsing timestamp. Time part after 
'T' is null or empty", timestamp.length());
+        }
+
+        if (!(secondPart.contains("+") || secondPart.contains("-") || 
secondPart.contains("Z"))) {

Review comment:
       I know that this is exiting logic that was just moved, but I am not sure 
if I understand how it works?

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -823,7 +809,7 @@ object ConsumerGroupCommand extends Logging {
           case (topicPartition, newOffset) => (topicPartition, new 
OffsetAndMetadata(newOffset))
         }
       } else if (opts.options.has(opts.resetToDatetimeOpt)) {
-        val timestamp = 
convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
+        val timestamp = 
Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt))

Review comment:
       I am must wondering if we can have a dependency from `core` to `clients` 
module? \cc @ijuma 

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##########
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
         assertEquals(msg, exception.get().getMessage());
         assertEquals(1, count.get());
     }
+
+    @Test
+    public void shouldAcceptValidDateFormats() throws ParseException {
+        //check valid formats
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
+    }
+
+    @Test
+    public void shouldThrowOnInvalidDateFormat() {
+        //check some invalid formats
+        try {
+            invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));
+            fail("Call to getDateTime should fail");
+        } catch (final Exception e) {
+            e.printStackTrace();

Review comment:
       We should not print the stacktrace imho, but verify the exception 
message using `assertThat`.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
         }
         return map;
     }
+
+    /**
+     * Convert an ISO8601 based timestamp to an epoch value

Review comment:
       nit: I don't know from the top of my head what the standard dictates. 
Might be worth adding to the comment?

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##########
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
         assertEquals(msg, exception.get().getMessage());
         assertEquals(1, count.get());
     }
+
+    @Test
+    public void shouldAcceptValidDateFormats() throws ParseException {
+        //check valid formats
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
+        invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
+    }
+
+    @Test
+    public void shouldThrowOnInvalidDateFormat() {
+        //check some invalid formats
+        try {
+            invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));

Review comment:
       We should update this test and use `assertThrows` instead of 
`try-fail-catch`.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
         }
         return map;
     }
+
+    /**
+     * Convert an ISO8601 based timestamp to an epoch value
+     * @param timestamp to be converted
+     * @return epoch value of a given timestamp
+     * @throws ParseException for timestamp that doesn't follow ISO8601 format
+     */
+    public static long getDateTime(String timestamp) throws ParseException {
+        final String[] timestampParts = timestamp.split("T");
+        if (timestampParts.length < 2) {
+            throw new ParseException("Error parsing timestamp. It does not 
contain a 'T' according to ISO8601 format", timestamp.length());
+        }
+
+        final String secondPart = timestampParts[1];
+        if (secondPart == null || secondPart.isEmpty()) {
+            throw new ParseException("Error parsing timestamp. Time part after 
'T' is null or empty", timestamp.length());
+        }
+
+        if (!(secondPart.contains("+") || secondPart.contains("-") || 
secondPart.contains("Z"))) {

Review comment:
       I know that this is exiting logic that was just moved, but I am not sure 
if I understand how it works?

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -823,7 +809,7 @@ object ConsumerGroupCommand extends Logging {
           case (topicPartition, newOffset) => (topicPartition, new 
OffsetAndMetadata(newOffset))
         }
       } else if (opts.options.has(opts.resetToDatetimeOpt)) {
-        val timestamp = 
convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
+        val timestamp = 
Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt))

Review comment:
       I am must wondering if we can have a dependency from `core` to `clients` 
module? \cc @ijuma 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to