AndrewJSchofield commented on code in PR #20148:
URL: https://github.com/apache/kafka/pull/20148#discussion_r2205460164


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java:
##########
@@ -123,6 +123,25 @@ public void acknowledge(final ConsumerRecord<K, V> record, 
AcknowledgeType type)
         throw new IllegalStateException("The record cannot be acknowledged.");
     }
 
+    /**
+     * Acknowledge a single record by its topic, partition and offset in the 
current batch.
+     *
+     * @param topic     The topic of the record to acknowledge
+     * @param partition The partition of the record
+     * @param offset    The offset of the record
+     * @param type      The acknowledgment type which indicates whether it was 
processed successfully
+     */
+    public void acknowledge(final String topic, final int partition, final 
long offset, final AcknowledgeType type) {
+        for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : 
batches.entrySet()) {
+            TopicIdPartition tip = tipBatch.getKey();
+            if (tip.topic().equals(topic) && (tip.partition() == partition)) {
+                tipBatch.getValue().addAcknowledgement(offset, type);

Review Comment:
   The checking here is not tight enough. If the user calls 
`acknowledge(ConsumerRecord, AcknowledgeType)`, the code makes sure that the 
in-flight records include the offset (see 
`ShareInFlightBatch.acknowledge(ConsumerRecord, AcknowledgeType)`). However, in 
this case, there is no validation of the offset. The handling of an exceptional 
batch needs to be a bit more sophisticated I think. It should not be possible 
to use `ShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType)` with an 
exceptional batch. It should only be possible to use 
`ShareConsumer.acknowlege(String, int, long, AcknowledgeType)` and only for the 
specific parameters that were retrieved from the 
`RecordDeserializationException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to