thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r654459694



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> 
{
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = 
result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof 
UnsupportedVersionException);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});

Review comment:
       checkstyle is not letting me :-(

##########
File path: 
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
+
+  val topicName = "foo"
+  var adminClient: Admin = null
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    super.setUp()
+    createTopic(topicName,1,1.asInstanceOf[Short])
+    produceMessages()
+    adminClient = Admin.create(Map[String, Object](
+      AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
+    ).asJava)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
+    super.tearDown()
+  }
+
+  @Test
+  def testEarliestOffset(): Unit = {
+    val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
+    assertEquals(0,earliestOffset.offset())
+  }
+
+  @Test
+  def testLatestOffset(): Unit = {
+    val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
+    assertEquals(3,latestOffset.offset())
+  }
+
+  @Test
+  def testMaxTimestampOffset(): Unit = {
+    val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp())
+    assertEquals(1,maxTimestampOffset.offset())
+  }
+
+  private def runFetchOffsets(adminClient: Admin,
+                              offsetSpec: OffsetSpec): 
ListOffsetsResult.ListOffsetsResultInfo = {
+    println(s"==> listOffsets(${topicName} -> ${offsetSpec}, new 
ListOffsetsOptions())")

Review comment:
       removed

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> 
{
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = 
result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof 
UnsupportedVersionException);

Review comment:
       updated

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> 
{
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = 
result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof 
UnsupportedVersionException);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            ListOffsetsTopicResponse topicResponse = 
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 
345L, 543);
+            ListOffsetsResponseData responseData = new 
ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(new 
ListOffsetsResponse(responseData), node);

Review comment:
       done

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> 
{
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = 
result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof 
UnsupportedVersionException);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            ListOffsetsTopicResponse topicResponse = 
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 
345L, 543);
+            ListOffsetsResponseData responseData = new 
ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(new 
ListOffsetsResponse(responseData), node);
+
+            Exception exception = assertThrows(ExecutionException.class, () -> 
{
+                result.partitionResult(tp0).get();
+            });
+            assertTrue(exception.getCause() instanceof 
UnsupportedVersionException);
+
+            ListOffsetsResultInfo tp1Offset = 
result.partitionResult(tp1).get();
+            assertEquals(345L, tp1Offset.offset());
+            assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
+            assertEquals(-1L, tp1Offset.timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampAndNoBrokerResponse() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            ListOffsetsResponseData responseData = new 
ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(new ArrayList<>());
+            env.kafkaClient().prepareResponseFrom(new 
ListOffsetsResponse(responseData), node);
+
+            Exception maxTimestampException = 
assertThrows(ExecutionException.class, () -> {
+                result.partitionResult(tp0).get();
+            });
+            assertTrue(maxTimestampException.getCause() instanceof 
UnsupportedVersionException);
+
+            Exception nopResponseException = 
assertThrows(ExecutionException.class, () -> {
+                result.partitionResult(tp1).get();
+            });
+            assertTrue(nopResponseException.getCause() instanceof 
ApiException);
+        }
+    }

Review comment:
       I added it to the existing happy path test

##########
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##########
@@ -2072,6 +2072,30 @@ class LogTest {
       log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
   }
 
+  @Test
+  def testFetchOffsetByTimestampWithMaxTimestampIncludesTimestamp(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1)
+    val log = createLog(logDir, logConfig)
+
+    assertEquals(None, log.fetchOffsetByTimestamp(0L))
+
+    val firstTimestamp = mockTime.milliseconds
+    val leaderEpoch = 0
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = firstTimestamp),
+      leaderEpoch = leaderEpoch)
+
+    val secondTimestamp = firstTimestamp + 1

Review comment:
       done

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -92,6 +92,57 @@ class LogOffsetTest extends BaseRequestTest {
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), 
consumerOffsets)
   }
 
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, 1, 1)
+
+    val logManager = server.getLogManager
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(topicPartition).get
+
+    for (timestamp <- 0 until 20)
+      log.appendAsLeader(TestUtils.singletonRecords(value = 
Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0)
+    log.flush()
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    val firstOffset = 
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(19L, firstOffset.get.offset)
+
+    log.truncateTo(0)
+
+    val secondOffset = 
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(0L, secondOffset.get.offset)
+
+  }
+
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit 
= {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, 1, 1)
+
+    val logManager = server.getLogManager
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(topicPartition).get
+
+    for (timestamp <- List(0L, 1L, 2L, 3L, 4L, 6L, 5L))
+      log.appendAsLeader(TestUtils.singletonRecords(value = 
Integer.toString(42).getBytes(), timestamp = timestamp), leaderEpoch = 0)
+    log.flush()
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    val maxTimestampOffset = 
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(7L, log.logEndOffset)
+    assertEquals(5L, maxTimestampOffset.get.offset)

Review comment:
       updated

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -92,6 +92,57 @@ class LogOffsetTest extends BaseRequestTest {
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), 
consumerOffsets)
   }
 
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, 1, 1)
+
+    val logManager = server.getLogManager
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(topicPartition).get

Review comment:
       sure, I refactored it out.

##########
File path: core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
##########
@@ -162,11 +162,15 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, replicationFactor = 3, servers)
     val firstLeaderId = partitionToLeader(partition.partition)
 
-    TestUtils.generateAndProduceMessages(servers, topic, 10)
+    // produce in 2 batches to ensure the max timestamp matches the last 
message
+    TestUtils.generateAndProduceMessages(servers, topic, 9)
+    Thread.sleep(10)
+    TestUtils.generateAndProduceMessages(servers, topic, 1)

Review comment:
       This test had become flaky because multiple messages could have the same 
timestamp. This pause ensured the last message had the highest timestamp. I've 
removed the sleep now and explicitly set the timestamp in the test messages.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to