tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9 URL: https://github.com/apache/flink/pull/10762#discussion_r365318969
########## File path: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java ########## @@ -875,109 +878,194 @@ void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartit } @SuppressWarnings("unchecked") - private static KafkaConsumer<byte[], byte[]> createMockConsumer( + private static TestConsumer createMockConsumer( final Map<TopicPartition, Long> mockConsumerAssignmentAndPosition, final Map<TopicPartition, Long> mockRetrievedPositions, final boolean earlyWakeup, final OneShotLatch midAssignmentLatch, final OneShotLatch continueAssignmentLatch) { - final KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class); + return new TestConsumer(mockConsumerAssignmentAndPosition, mockRetrievedPositions, earlyWakeup, midAssignmentLatch, continueAssignmentLatch); + } - when(mockConsumer.assignment()).thenAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - if (midAssignmentLatch != null) { - midAssignmentLatch.trigger(); - } + private static class TestConsumer implements Consumer<byte[], byte[]> { + private final Map<TopicPartition, Long> mockConsumerAssignmentAndPosition; + private final Map<TopicPartition, Long> mockRetrievedPositions; + private final boolean earlyWakeup; + private final OneShotLatch midAssignmentLatch; + private final OneShotLatch continueAssignmentLatch; + + private int numWakeupCalls = 0; + + private TestConsumer(Map<TopicPartition, Long> mockConsumerAssignmentAndPosition, Map<TopicPartition, Long> mockRetrievedPositions, boolean earlyWakeup, OneShotLatch midAssignmentLatch, OneShotLatch continueAssignmentLatch) { + this.mockConsumerAssignmentAndPosition = mockConsumerAssignmentAndPosition; + this.mockRetrievedPositions = mockRetrievedPositions; + this.earlyWakeup = earlyWakeup; + this.midAssignmentLatch = midAssignmentLatch; + this.continueAssignmentLatch = continueAssignmentLatch; + } - if (continueAssignmentLatch != null) { + @Override + public Set<TopicPartition> assignment() { + if (midAssignmentLatch != null) { + midAssignmentLatch.trigger(); + } + + if (continueAssignmentLatch != null) { + try { continueAssignmentLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } - return mockConsumerAssignmentAndPosition.keySet(); } - }); + return mockConsumerAssignmentAndPosition.keySet(); + } - when(mockConsumer.poll(anyLong())).thenReturn(mock(ConsumerRecords.class)); + @Override + public Set<String> subscription() { + return null; + } - if (!earlyWakeup) { - when(mockConsumer.position(any(TopicPartition.class))).thenAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - return mockConsumerAssignmentAndPosition.get(invocationOnMock.getArgument(0)); - } - }); - } else { - when(mockConsumer.position(any(TopicPartition.class))).thenThrow(new WakeupException()); + @Override + public void subscribe(List<String> list) { } - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - mockConsumerAssignmentAndPosition.clear(); + @Override + public void subscribe(List<String> list, ConsumerRebalanceListener consumerRebalanceListener) { + } - List<TopicPartition> assignedPartitions = invocationOnMock.getArgument(0); - for (TopicPartition assigned : assignedPartitions) { - mockConsumerAssignmentAndPosition.put(assigned, null); - } - return null; + @Override + public void assign(List<TopicPartition> assignedPartitions) { + mockConsumerAssignmentAndPosition.clear(); + + for (TopicPartition assigned : assignedPartitions) { + mockConsumerAssignmentAndPosition.put(assigned, null); } - }).when(mockConsumer).assign(anyListOf(TopicPartition.class)); + } - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - TopicPartition partition = invocationOnMock.getArgument(0); - long position = invocationOnMock.getArgument(1); + @Override + public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) { + } - if (!mockConsumerAssignmentAndPosition.containsKey(partition)) { - throw new Exception("the current mock assignment does not contain partition " + partition); - } else { - mockConsumerAssignmentAndPosition.put(partition, position); - } - return null; - } - }).when(mockConsumer).seek(any(TopicPartition.class), anyLong()); + @Override + public void unsubscribe() { + } + + @Override + public ConsumerRecords<byte[], byte[]> poll(long l) { + return mock(ConsumerRecords.class); + } + + @Override + public void commitSync() { + } + + @Override + public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) { + } + + @Override + public void commitAsync() { + } + + @Override + public void commitAsync(OffsetCommitCallback offsetCommitCallback) { + } - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - TopicPartition partition = invocationOnMock.getArgument(0); + @Override + public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) { + } + @Override + public void seek(TopicPartition partition, long position) { + if (!mockConsumerAssignmentAndPosition.containsKey(partition)) { + throw new RuntimeException("the current mock assignment does not contain partition " + partition); + } else { + mockConsumerAssignmentAndPosition.put(partition, position); + } + } + + @Override + public void seekToBeginning(TopicPartition... partitions) { + for (TopicPartition partition : partitions) { if (!mockConsumerAssignmentAndPosition.containsKey(partition)) { - throw new Exception("the current mock assignment does not contain partition " + partition); + throw new RuntimeException("the current mock assignment does not contain partition " + partition); } else { Long mockRetrievedPosition = mockRetrievedPositions.get(partition); if (mockRetrievedPosition == null) { - throw new Exception("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval"); + throw new RuntimeException("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval"); } else { mockConsumerAssignmentAndPosition.put(partition, mockRetrievedPositions.get(partition)); } } - return null; } - }).when(mockConsumer).seekToBeginning(any(TopicPartition.class)); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - TopicPartition partition = invocationOnMock.getArgument(0); + } + @Override + public void seekToEnd(TopicPartition... partitions) { + for (TopicPartition partition : partitions) { if (!mockConsumerAssignmentAndPosition.containsKey(partition)) { - throw new Exception("the current mock assignment does not contain partition " + partition); + throw new RuntimeException("the current mock assignment does not contain partition " + partition); } else { Long mockRetrievedPosition = mockRetrievedPositions.get(partition); if (mockRetrievedPosition == null) { - throw new Exception("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval"); + throw new RuntimeException("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval"); } else { mockConsumerAssignmentAndPosition.put(partition, mockRetrievedPositions.get(partition)); } } - return null; } - }).when(mockConsumer).seekToEnd(any(TopicPartition.class)); + } + + @Override + public long position(TopicPartition topicPartition) { + if (!earlyWakeup) { + return mockConsumerAssignmentAndPosition.get(topicPartition); + } else { + throw new WakeupException(); + } + } - return mockConsumer; + @Override + public OffsetAndMetadata committed(TopicPartition topicPartition) { + return null; + } + + @Override + public Map<MetricName, ? extends Metric> metrics() { + return null; + } + + @Override + public List<PartitionInfo> partitionsFor(String s) { + return null; + } + + @Override + public Map<String, List<PartitionInfo>> listTopics() { + return null; + } Review comment: Same here with the `UnsupportedOperationException`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services