[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301019#comment-16301019 ]
ASF GitHub Bot commented on KAFKA-4879: --------------------------------------- hachikuji closed pull request #3637: KAFKA-4879 KafkaConsumer.position may hang forever when deleting a topic URL: https://github.com/apache/kafka/pull/3637 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index b1badefd318..992b0711670 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -116,9 +116,9 @@ public void seekToEnd(Collection<TopicPartition> partitions); /** - * @see KafkaConsumer#position(TopicPartition) + * @see KafkaConsumer#position(TopicPartition, long) */ - public long position(TopicPartition partition); + public long position(TopicPartition partition, long timeout); /** * @see KafkaConsumer#committed(TopicPartition) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3154061bf01..5e287f3b114 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1079,8 +1079,9 @@ public void assign(Collection<TopicPartition> partitions) { // fetch positions if we have partitions we're subscribed to that we // don't know the offset for + //TODO we need to clarify this if (!subscriptions.hasAllFetchPositions()) - updateFetchPositions(this.subscriptions.missingFetchPositions()); + updateFetchPositions(this.subscriptions.missingFetchPositions(), timeout); // if data is available already, return it immediately Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); @@ -1295,6 +1296,7 @@ public void seekToEnd(Collection<TopicPartition> partitions) { * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists). * * @param partition The partition to get the position for + * @param timeout The amount of time to wait for the offset * @return The offset * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for * the partition @@ -1305,8 +1307,10 @@ public void seekToEnd(Collection<TopicPartition> partitions) { * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the * configured groupId * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * + * @throws org.apache.kafka.common.errors.TimeoutException: if the given partition is not available */ - public long position(TopicPartition partition) { + public long position(TopicPartition partition, long timeout) { acquireAndEnsureOpen(); try { if (!this.subscriptions.isAssigned(partition)) @@ -1314,7 +1318,7 @@ public long position(TopicPartition partition) { Long offset = this.subscriptions.position(partition); if (offset == null) { // batch update fetch positions for any partitions without a valid position - updateFetchPositions(subscriptions.assignedPartitions()); + updateFetchPositions(subscriptions.assignedPartitions(), timeout); offset = this.subscriptions.position(partition); } return offset; @@ -1645,15 +1649,18 @@ private void close(long timeoutMs, boolean swallowException) { * or reset it using the offset reset policy the user has configured. * * @param partitions The partitions that needs updating fetch positions + * @param timeout The amount of time to wait for the offset * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is * defined + * + * @throws org.apache.kafka.common.errors.TimeoutException: if one of the given partition is not available */ - private void updateFetchPositions(Set<TopicPartition> partitions) { + private void updateFetchPositions(Set<TopicPartition> partitions, long timeout) { // lookup any positions for partitions which are awaiting reset (which may be the // case if the user called seekToBeginning or seekToEnd. We do this check first to // avoid an unnecessary lookup of committed offsets (which typically occurs when // the user is manually assigning partitions and managing their own offsets). - fetcher.resetOffsetsIfNeeded(partitions); + fetcher.resetOffsetsIfNeeded(partitions, timeout); if (!subscriptions.hasAllFetchPositions(partitions)) { // if we still don't have offsets for the given partitions, then we should either diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 91cb6f1ce7a..303ca62de51 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -241,7 +241,7 @@ public synchronized OffsetAndMetadata committed(TopicPartition partition) { } @Override - public synchronized long position(TopicPartition partition) { + public synchronized long position(TopicPartition partition, long timeout) { ensureNotClosed(); if (!this.subscriptions.isAssigned(partition)) throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer."); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 23c59020a57..61743d0344c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -243,15 +243,18 @@ public void onFailure(RuntimeException e) { /** * Lookup and set offsets for any partitions which are awaiting an explicit reset. * @param partitions the partitions to reset + * @param timeout The amount of time to wait for the offset + * + * @throws org.apache.kafka.common.errors.TimeoutException: if one of the given partition is not available */ - public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) { + public void resetOffsetsIfNeeded(Set<TopicPartition> partitions, long timeout) { final Set<TopicPartition> needsOffsetReset = new HashSet<>(); for (TopicPartition tp : partitions) { if (subscriptions.isAssigned(tp) && subscriptions.isOffsetResetNeeded(tp)) needsOffsetReset.add(tp); } if (!needsOffsetReset.isEmpty()) { - resetOffsets(needsOffsetReset); + resetOffsets(needsOffsetReset, timeout); } } @@ -281,7 +284,7 @@ public void updateFetchPositions(Set<TopicPartition> partitions) { } if (!needsOffsetReset.isEmpty()) { - resetOffsets(needsOffsetReset); + resetOffsets(needsOffsetReset, Long.MAX_VALUE); } } @@ -400,15 +403,17 @@ else if (strategy == OffsetResetStrategy.LATEST) * Reset offsets for the given partition using the offset reset strategy. * * @param partitions The partitions that need offsets reset + * @param timeout The amount of time to wait for the offset * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined + * @throws org.apache.kafka.common.errors.TimeoutException: if one of the given partition is not available */ - private void resetOffsets(final Set<TopicPartition> partitions) { + private void resetOffsets(final Set<TopicPartition> partitions, long timeout) { final Map<TopicPartition, Long> offsetResets = new HashMap<>(); final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>(); for (final TopicPartition partition : partitions) { offsetResetStrategyTimestamp(partition, offsetResets, partitionsWithNoOffsets); } - final Map<TopicPartition, OffsetData> offsetsByTimes = retrieveOffsetsByTimes(offsetResets, Long.MAX_VALUE, false); + final Map<TopicPartition, OffsetData> offsetsByTimes = retrieveOffsetsByTimes(offsetResets, timeout, false); for (final TopicPartition partition : partitions) { final OffsetData offsetData = offsetsByTimes.get(partition); if (offsetData == null) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 9fd7e19810e..42d0c5a2944 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -470,7 +470,7 @@ public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() { ConsumerRecords<String, String> records = consumer.poll(0); assertEquals(5, records.count()); - assertEquals(55L, consumer.position(tp0)); + assertEquals(55L, consumer.position(tp0, Long.MAX_VALUE)); consumer.close(0, TimeUnit.MILLISECONDS); } @@ -697,7 +697,7 @@ public void testWakeupWithFetchDataAvailable() throws Exception { } // make sure the position hasn't been updated - assertEquals(0, consumer.position(tp0)); + assertEquals(0, consumer.position(tp0, Long.MAX_VALUE)); // the next poll should return the completed fetch ConsumerRecords<String, String> records = consumer.poll(0); @@ -861,8 +861,8 @@ public void testSubscriptionChangesWithAutoCommitEnabled() { // verify that the fetch occurred as expected assertEquals(11, records.count()); - assertEquals(1L, consumer.position(tp0)); - assertEquals(10L, consumer.position(t2p0)); + assertEquals(1L, consumer.position(tp0, Long.MAX_VALUE)); + assertEquals(10L, consumer.position(t2p0, Long.MAX_VALUE)); // subscription change consumer.subscribe(Arrays.asList(topic, topic3), getConsumerRebalanceListener(consumer)); @@ -892,8 +892,8 @@ public void testSubscriptionChangesWithAutoCommitEnabled() { // verify that the fetch occurred as expected assertEquals(101, records.count()); - assertEquals(2L, consumer.position(tp0)); - assertEquals(100L, consumer.position(t3p0)); + assertEquals(2L, consumer.position(tp0, Long.MAX_VALUE)); + assertEquals(100L, consumer.position(t3p0, Long.MAX_VALUE)); // verify that the offset commits occurred as expected assertTrue(commitReceived.get()); @@ -1037,7 +1037,7 @@ public void testManualAssignmentChangeWithAutoCommitEnabled() { ConsumerRecords<String, String> records = consumer.poll(0); assertEquals(1, records.count()); - assertEquals(11L, consumer.position(tp0)); + assertEquals(11L, consumer.position(tp0, Long.MAX_VALUE)); // mock the offset commit response for to be revoked partitions AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, tp0, 11); @@ -1102,7 +1102,7 @@ public void testManualAssignmentChangeWithAutoCommitDisabled() { ConsumerRecords<String, String> records = consumer.poll(0); assertEquals(1, records.count()); - assertEquals(11L, consumer.position(tp0)); + assertEquals(11L, consumer.position(tp0, Long.MAX_VALUE)); // new manual assignment consumer.assign(Arrays.asList(t2p0)); @@ -1170,8 +1170,8 @@ public void testOffsetOfPausedPartitions() { offsetResponse.put(tp0, 3L); offsetResponse.put(tp1, 3L); client.prepareResponse(listOffsetsResponse(offsetResponse, Errors.NONE)); - assertEquals(3L, consumer.position(tp0)); - assertEquals(3L, consumer.position(tp1)); + assertEquals(3L, consumer.position(tp0, Long.MAX_VALUE)); + assertEquals(3L, consumer.position(tp1, Long.MAX_VALUE)); client.requests().clear(); consumer.unsubscribe(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 6ed46f711cb..ba6422f782e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -52,7 +52,7 @@ public void testSimpleMock() { assertEquals(rec1, iter.next()); assertEquals(rec2, iter.next()); assertFalse(iter.hasNext()); - assertEquals(2L, consumer.position(new TopicPartition("test", 0))); + assertEquals(2L, consumer.position(new TopicPartition("test", 0), Long.MAX_VALUE)); consumer.commitSync(); assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset()); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index c4567a3693b..3d50fd0da48 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -492,7 +492,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) { lastCommittedOffsets = new HashMap<>(); currentOffsets = new HashMap<>(); for (TopicPartition tp : partitions) { - long pos = consumer.position(tp); + long pos = consumer.position(tp, Long.MAX_VALUE); lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos)); currentOffsets.put(tp, new OffsetAndMetadata(pos)); log.debug("{} assigned topic partition {} with offset {}", id, tp, pos); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 0e190bc3767..c323360375b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -275,7 +275,7 @@ private void readToLogEnd() { Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator(); while (it.hasNext()) { Map.Entry<TopicPartition, Long> entry = it.next(); - if (consumer.position(entry.getKey()) >= entry.getValue()) + if (consumer.position(entry.getKey(), Long.MAX_VALUE) >= entry.getValue()) it.remove(); else { poll(Integer.MAX_VALUE); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index eae3726aea7..d1943e8b7c3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -340,8 +340,8 @@ public void testWakeupInCommitSyncCausesRetry() throws Exception { sinkTask.close(new HashSet<>(partitions)); EasyMock.expectLastCall(); - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION, Long.MAX_VALUE)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION2, Long.MAX_VALUE)).andReturn(FIRST_OFFSET); sinkTask.open(partitions); EasyMock.expectLastCall(); @@ -722,8 +722,8 @@ private void expectRebalanceAssignmentError(RuntimeException e) { sinkTask.preCommit(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject()); EasyMock.expectLastCall().andReturn(Collections.emptyMap()); - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION, Long.MAX_VALUE)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION2, Long.MAX_VALUE)).andReturn(FIRST_OFFSET); sinkTask.open(partitions); EasyMock.expectLastCall().andThrow(e); @@ -752,8 +752,8 @@ private void expectPollInitialAssignment() { return ConsumerRecords.empty(); } }); - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION, Long.MAX_VALUE)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION2, Long.MAX_VALUE)).andReturn(FIRST_OFFSET); sinkTask.put(Collections.<SinkRecord>emptyList()); EasyMock.expectLastCall(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 6f77f650c8d..e718806e7db 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -516,9 +516,9 @@ private void expectPollInitialAssignment() throws Exception { return ConsumerRecords.empty(); } }); - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION, Long.MAX_VALUE)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION2, Long.MAX_VALUE)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION3, Long.MAX_VALUE)).andReturn(FIRST_OFFSET); sinkTask.put(Collections.<SinkRecord>emptyList()); EasyMock.expectLastCall(); @@ -630,9 +630,9 @@ public SinkRecord answer() { } }); - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION, Long.MAX_VALUE)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION2, Long.MAX_VALUE)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION3, Long.MAX_VALUE)).andReturn(FIRST_OFFSET); sinkTask.open(partitions); EasyMock.expectLastCall(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index b2c164dc1c9..37029f86173 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -247,8 +247,8 @@ public void run() { store.start(); assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); - assertEquals(7L, consumer.position(TP0)); - assertEquals(7L, consumer.position(TP1)); + assertEquals(7L, consumer.position(TP0, Long.MAX_VALUE)); + assertEquals(7L, consumer.position(TP1, Long.MAX_VALUE)); store.stop(); @@ -283,8 +283,8 @@ public void testSendAndReadToEnd() throws Exception { consumer.updateEndOffsets(endOffsets); store.start(); assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); - assertEquals(0L, consumer.position(TP0)); - assertEquals(0L, consumer.position(TP1)); + assertEquals(0L, consumer.position(TP0, Long.MAX_VALUE)); + assertEquals(0L, consumer.position(TP1, Long.MAX_VALUE)); // Set some keys final AtomicInteger invoked = new AtomicInteger(0); @@ -412,7 +412,7 @@ public void run() { store.start(); assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS)); assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); - assertEquals(1L, consumer.position(TP0)); + assertEquals(1L, consumer.position(TP0, Long.MAX_VALUE)); store.stop(); @@ -439,8 +439,8 @@ public void testProducerError() throws Exception { consumer.updateEndOffsets(endOffsets); store.start(); assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); - assertEquals(0L, consumer.position(TP0)); - assertEquals(0L, consumer.position(TP1)); + assertEquals(0L, consumer.position(TP0, Long.MAX_VALUE)); + assertEquals(0L, consumer.position(TP1, Long.MAX_VALUE)); final AtomicReference<Throwable> setException = new AtomicReference<>(); store.send(TP0_KEY, TP0_VALUE, new org.apache.kafka.clients.producer.Callback() { diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 55989f46e6d..1e35f86b0bd 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -244,7 +244,7 @@ private void maybeResetInputAndSeekToEndIntermediateTopicOffsets() { if (!dryRun) { for (final TopicPartition p : partitions) { - client.position(p); + client.position(p, Long.MAX_VALUE); } client.commitSync(); } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 1a4de997bc7..c8122ef41bf 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -784,21 +784,21 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test(expected = classOf[AuthorizationException]) def testOffsetFetchWithNoAccess() { this.consumers.head.assign(List(tp).asJava) - this.consumers.head.position(tp) + this.consumers.head.position(tp, Long.MaxValue) } @Test(expected = classOf[GroupAuthorizationException]) def testOffsetFetchWithNoGroupAccess() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) this.consumers.head.assign(List(tp).asJava) - this.consumers.head.position(tp) + this.consumers.head.position(tp, Long.MaxValue) } @Test(expected = classOf[KafkaException]) def testOffsetFetchWithNoTopicAccess() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.assign(List(tp).asJava) - this.consumers.head.position(tp) + this.consumers.head.position(tp, Long.MaxValue) } @Test @@ -834,7 +834,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) this.consumers.head.assign(List(tp).asJava) - this.consumers.head.position(tp) + this.consumers.head.position(tp, Long.MaxValue) } @Test @@ -842,7 +842,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) this.consumers.head.assign(List(tp).asJava) - this.consumers.head.position(tp) + this.consumers.head.position(tp, Long.MaxValue) } @Test diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 27cafd70614..db1f84ab3d3 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -115,9 +115,9 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { if (records.nonEmpty) { consumer.commitSync() - assertEquals(consumer.position(tp), consumer.committed(tp).offset) + assertEquals(consumer.position(tp, Long.MaxValue), consumer.committed(tp).offset) - if (consumer.position(tp) == numRecords) { + if (consumer.position(tp, Long.MaxValue) == numRecords) { consumer.seekToBeginning(Collections.emptyList()) consumed = 0 } @@ -151,16 +151,16 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { if (coin == 0) { info("Seeking to end of log") consumer.seekToEnd(Collections.emptyList()) - assertEquals(numRecords.toLong, consumer.position(tp)) + assertEquals(numRecords.toLong, consumer.position(tp, Long.MaxValue)) } else if (coin == 1) { val pos = TestUtils.random.nextInt(numRecords).toLong info("Seeking to " + pos) consumer.seek(tp, pos) - assertEquals(pos, consumer.position(tp)) + assertEquals(pos, consumer.position(tp, Long.MaxValue)) } else if (coin == 2) { info("Committing offset.") consumer.commitSync() - assertEquals(consumer.position(tp), consumer.committed(tp).offset) + assertEquals(consumer.position(tp, Long.MaxValue), consumer.committed(tp).offset) } } } diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala index c9c40a9e58f..ff282e850d8 100644 --- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala +++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala @@ -85,15 +85,15 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging { sendRecords(producers.head, 10, tp) consumer.seekToBeginning(Collections.singletonList(tp)) - assertEquals(0L, consumer.position(tp)) + assertEquals(0L, consumer.position(tp, Long.MaxValue)) client.deleteRecordsBefore(Map((tp, 5L))).get() consumer.seekToBeginning(Collections.singletonList(tp)) - assertEquals(5L, consumer.position(tp)) + assertEquals(5L, consumer.position(tp, Long.MaxValue)) client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get() consumer.seekToBeginning(Collections.singletonList(tp)) - assertEquals(10L, consumer.position(tp)) + assertEquals(10L, consumer.position(tp, Long.MaxValue)) } @Test diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index aad9b6ad24a..ffe36a789fc 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -190,7 +190,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { // than session timeout and then try a commit. We should still be in the group, // so the commit should succeed Utils.sleep(1500) - committedPosition = consumer0.position(tp) + committedPosition = consumer0.position(tp, Long.MaxValue) consumer0.commitSync(Map(tp -> new OffsetAndMetadata(committedPosition)).asJava) commitCompleted = true } @@ -583,15 +583,15 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer.assign(List(tp).asJava) consumer.seekToEnd(List(tp).asJava) - assertEquals(totalRecords, consumer.position(tp)) + assertEquals(totalRecords, consumer.position(tp, Long.MaxValue)) assertFalse(consumer.poll(totalRecords).iterator().hasNext) consumer.seekToBeginning(List(tp).asJava) - assertEquals(0, consumer.position(tp), 0) + assertEquals(0, consumer.position(tp, Long.MaxValue), 0) consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0) consumer.seek(tp, mid) - assertEquals(mid, consumer.position(tp)) + assertEquals(mid, consumer.position(tp, Long.MaxValue)) consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt, startingTimestamp = mid.toLong) @@ -601,15 +601,15 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer.assign(List(tp2).asJava) consumer.seekToEnd(List(tp2).asJava) - assertEquals(totalRecords, consumer.position(tp2)) + assertEquals(totalRecords, consumer.position(tp2, Long.MaxValue)) assertFalse(consumer.poll(totalRecords).iterator().hasNext) consumer.seekToBeginning(List(tp2).asJava) - assertEquals(0, consumer.position(tp2), 0) + assertEquals(0, consumer.position(tp2, Long.MaxValue), 0) consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, tp = tp2) consumer.seek(tp2, mid) - assertEquals(mid, consumer.position(tp2)) + assertEquals(mid, consumer.position(tp2, Long.MaxValue)) consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt, startingTimestamp = mid.toLong, tp = tp2) } @@ -634,17 +634,17 @@ class PlaintextConsumerTest extends BaseConsumerTest { // position() on a partition that we aren't subscribed to throws an exception intercept[IllegalArgumentException] { - this.consumers.head.position(new TopicPartition(topic, 15)) + this.consumers.head.position(new TopicPartition(topic, 15), Long.MaxValue) } this.consumers.head.assign(List(tp).asJava) - assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers.head.position(tp)) + assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers.head.position(tp, Long.MaxValue)) this.consumers.head.commitSync() assertEquals(0L, this.consumers.head.committed(tp).offset) consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5, startingOffset = 0) - assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers.head.position(tp)) + assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers.head.position(tp, Long.MaxValue)) this.consumers.head.commitSync() assertEquals("Committed offset should be returned", 5L, this.consumers.head.committed(tp).offset) @@ -1309,15 +1309,15 @@ class PlaintextConsumerTest extends BaseConsumerTest { // Need to poll to join the group this.consumers.head.poll(50) - val pos1 = this.consumers.head.position(tp) - val pos2 = this.consumers.head.position(tp2) + val pos1 = this.consumers.head.position(tp, Long.MaxValue) + val pos2 = this.consumers.head.position(tp2, Long.MaxValue) this.consumers.head.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava) assertEquals(3, this.consumers.head.committed(tp).offset) assertNull(this.consumers.head.committed(tp2)) // Positions should not change - assertEquals(pos1, this.consumers.head.position(tp)) - assertEquals(pos2, this.consumers.head.position(tp2)) + assertEquals(pos1, this.consumers.head.position(tp, Long.MaxValue)) + assertEquals(pos2, this.consumers.head.position(tp2, Long.MaxValue)) this.consumers.head.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava) assertEquals(3, this.consumers.head.committed(tp).offset) assertEquals(5, this.consumers.head.committed(tp2).offset) diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 760cc39a974..fbd7025da74 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -168,7 +168,7 @@ class TransactionsTest extends KafkaServerTestHarness { assertEquals(2, readCommittedConsumer.assignment.size) readCommittedConsumer.seekToEnd(readCommittedConsumer.assignment) readCommittedConsumer.assignment.asScala.foreach { tp => - assertEquals(1L, readCommittedConsumer.position(tp)) + assertEquals(1L, readCommittedConsumer.position(tp, Long.MaxValue)) } // undecided timestamps should not be searchable either diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index ad641c01fdb..60d10e6468b 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1436,7 +1436,7 @@ object TestUtils extends Logging { def consumerPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) : Map[TopicPartition, OffsetAndMetadata] = { val offsetsToCommit = new mutable.HashMap[TopicPartition, OffsetAndMetadata]() consumer.assignment.foreach{ topicPartition => - offsetsToCommit.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition))) + offsetsToCommit.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition, Long.MaxValue))) } offsetsToCommit.toMap } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index d9205a0c425..479930a5f04 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -169,7 +169,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, consumer.seekToBeginning(Collections.singletonList(topicPartition)); } - long offset = consumer.position(topicPartition); + long offset = consumer.position(topicPartition, Long.MAX_VALUE); final Long highWatermark = highWatermarks.get(topicPartition); BatchingStateRestoreCallback stateRestoreAdapter = diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 842721db533..84a650027a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -137,7 +137,7 @@ public void restore() { logRestoreOffsets(restorer.partition(), restorer.checkpoint(), endOffsets.get(restorer.partition())); - restorer.setStartingOffset(consumer.position(restorer.partition())); + restorer.setStartingOffset(consumer.position(restorer.partition(), Long.MAX_VALUE)); } else { consumer.seekToBeginning(Collections.singletonList(restorer.partition())); needsPositionUpdate.add(restorer); @@ -145,7 +145,7 @@ public void restore() { } for (final StateRestorer restorer : needsPositionUpdate) { - final long position = consumer.position(restorer.partition()); + final long position = consumer.position(restorer.partition(), Long.MAX_VALUE); restorer.setStartingOffset(position); logRestoreOffsets(restorer.partition(), position, @@ -234,7 +234,7 @@ private long processNext(final List<ConsumerRecord<byte[], byte[]>> records, } if (nextPosition == -1) { - nextPosition = consumer.position(restorer.partition()); + nextPosition = consumer.position(restorer.partition(), Long.MAX_VALUE); } if (!restoreRecords.isEmpty()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java index 8ab6052014a..d173c4995f9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java @@ -61,8 +61,8 @@ public void shouldAssignPartitionsToConsumer() throws Exception { @Test public void shouldSeekToInitialOffsets() throws Exception { stateConsumer.initialize(); - assertEquals(20L, consumer.position(topicOne)); - assertEquals(30L, consumer.position(topicTwo)); + assertEquals(20L, consumer.position(topicOne, Long.MAX_VALUE)); + assertEquals(30L, consumer.position(topicTwo, Long.MAX_VALUE)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index af047a79ccf..0ddf302f1d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -228,7 +228,7 @@ private static void ensureStreamsApplicationDown(final String kafka) { } for (final TopicPartition tp : partitions) { - final long offset = consumer.position(tp); + final long offset = consumer.position(tp, Long.MAX_VALUE); committedOffsets.put(tp, offset); } } @@ -257,7 +257,7 @@ private static void ensureStreamsApplicationDown(final String kafka) { throw new RuntimeException("FAIL: did receive more records than expected for " + tp + " (expected EOL offset: " + readEndOffset + "; current offset: " + record.offset()); } - if (consumer.position(tp) >= readEndOffset) { + if (consumer.position(tp, Long.MAX_VALUE) >= readEndOffset) { consumer.pause(Collections.singletonList(tp)); } } diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java index ce889170bbf..67ce7b75c35 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java +++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java @@ -105,7 +105,7 @@ public synchronized void assign(Collection<TopicPartition> partitions) { } @Override - public synchronized long position(TopicPartition partition) { + public synchronized long position(TopicPartition partition, long timeout) { if (!partition.equals(assignedPartition)) throw new IllegalStateException("RestoreConsumer: unassigned partition"); diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 4c21e5448a6..d0e33160312 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -420,7 +420,7 @@ public synchronized void seekToEnd(final Collection<TopicPartition> partitions) public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {} @Override - public synchronized long position(final TopicPartition partition) { + public synchronized long position(final TopicPartition partition, final long timeout) { return 0L; } }; @@ -447,7 +447,7 @@ public synchronized void seekToEnd(final Collection<TopicPartition> partitions) public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {} @Override - public synchronized long position(final TopicPartition partition) { + public synchronized long position(final TopicPartition partition, final long timeout) { return 0L; } }; diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index 3903a3a8fae..de6b967b8a4 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -183,7 +183,7 @@ private static ArgumentParser argParser() { private static Map<TopicPartition, OffsetAndMetadata> consumerPositions(KafkaConsumer<String, String> consumer) { Map<TopicPartition, OffsetAndMetadata> positions = new HashMap<>(); for (TopicPartition topicPartition : consumer.assignment()) { - positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); + positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition, Long.MAX_VALUE), null)); } return positions; } @@ -199,7 +199,7 @@ private static void resetToLastCommittedPositions(KafkaConsumer<String, String> } private static long messagesRemaining(KafkaConsumer<String, String> consumer, TopicPartition partition) { - long currentPosition = consumer.position(partition); + long currentPosition = consumer.position(partition, Long.MAX_VALUE); Map<TopicPartition, Long> endOffsets = consumer.endOffsets(singleton(partition)); if (endOffsets.containsKey(partition)) { return endOffsets.get(partition) - currentPosition; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaConsumer.position may hang forever when deleting a topic > ------------------------------------------------------------- > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.10.2.0 > Reporter: Shixiong Zhu > Assignee: Jason Gustafson > Fix For: 1.1.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set<TopicPartition> partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(30000); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)