tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] 
Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365318969
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ##########
 @@ -875,109 +878,194 @@ void 
reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartit
        }
 
        @SuppressWarnings("unchecked")
-       private static KafkaConsumer<byte[], byte[]> createMockConsumer(
+       private static TestConsumer createMockConsumer(
                        final Map<TopicPartition, Long> 
mockConsumerAssignmentAndPosition,
                        final Map<TopicPartition, Long> mockRetrievedPositions,
                        final boolean earlyWakeup,
                        final OneShotLatch midAssignmentLatch,
                        final OneShotLatch continueAssignmentLatch) {
 
-               final KafkaConsumer<byte[], byte[]> mockConsumer = 
mock(KafkaConsumer.class);
+               return new TestConsumer(mockConsumerAssignmentAndPosition, 
mockRetrievedPositions, earlyWakeup, midAssignmentLatch, 
continueAssignmentLatch);
+       }
 
-               when(mockConsumer.assignment()).thenAnswer(new Answer<Object>() 
{
-                       @Override
-                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               if (midAssignmentLatch != null) {
-                                       midAssignmentLatch.trigger();
-                               }
+       private static class TestConsumer implements Consumer<byte[], byte[]> {
+               private final Map<TopicPartition, Long> 
mockConsumerAssignmentAndPosition;
+               private final Map<TopicPartition, Long> mockRetrievedPositions;
+               private final boolean earlyWakeup;
+               private final OneShotLatch midAssignmentLatch;
+               private final OneShotLatch continueAssignmentLatch;
+
+               private int numWakeupCalls = 0;
+
+               private TestConsumer(Map<TopicPartition, Long> 
mockConsumerAssignmentAndPosition, Map<TopicPartition, Long> 
mockRetrievedPositions, boolean earlyWakeup, OneShotLatch midAssignmentLatch, 
OneShotLatch continueAssignmentLatch) {
+                       this.mockConsumerAssignmentAndPosition = 
mockConsumerAssignmentAndPosition;
+                       this.mockRetrievedPositions = mockRetrievedPositions;
+                       this.earlyWakeup = earlyWakeup;
+                       this.midAssignmentLatch = midAssignmentLatch;
+                       this.continueAssignmentLatch = continueAssignmentLatch;
+               }
 
-                               if (continueAssignmentLatch != null) {
+               @Override
+               public Set<TopicPartition> assignment() {
+                       if (midAssignmentLatch != null) {
+                               midAssignmentLatch.trigger();
+                       }
+
+                       if (continueAssignmentLatch != null) {
+                               try {
                                        continueAssignmentLatch.await();
+                               } catch (InterruptedException e) {
+                                       Thread.currentThread().interrupt();
                                }
-                               return 
mockConsumerAssignmentAndPosition.keySet();
                        }
-               });
+                       return mockConsumerAssignmentAndPosition.keySet();
+               }
 
-               
when(mockConsumer.poll(anyLong())).thenReturn(mock(ConsumerRecords.class));
+               @Override
+               public Set<String> subscription() {
+                       return null;
+               }
 
-               if (!earlyWakeup) {
-                       
when(mockConsumer.position(any(TopicPartition.class))).thenAnswer(new 
Answer<Object>() {
-                               @Override
-                               public Object answer(InvocationOnMock 
invocationOnMock) throws Throwable {
-                                       return 
mockConsumerAssignmentAndPosition.get(invocationOnMock.getArgument(0));
-                               }
-                       });
-               } else {
-                       
when(mockConsumer.position(any(TopicPartition.class))).thenThrow(new 
WakeupException());
+               @Override
+               public void subscribe(List<String> list) {
                }
 
-               doAnswer(new Answer() {
-                       @Override
-                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               mockConsumerAssignmentAndPosition.clear();
+               @Override
+               public void subscribe(List<String> list, 
ConsumerRebalanceListener consumerRebalanceListener) {
+               }
 
-                               List<TopicPartition> assignedPartitions = 
invocationOnMock.getArgument(0);
-                               for (TopicPartition assigned : 
assignedPartitions) {
-                                       
mockConsumerAssignmentAndPosition.put(assigned, null);
-                               }
-                               return null;
+               @Override
+               public void assign(List<TopicPartition> assignedPartitions) {
+                       mockConsumerAssignmentAndPosition.clear();
+
+                       for (TopicPartition assigned : assignedPartitions) {
+                               mockConsumerAssignmentAndPosition.put(assigned, 
null);
                        }
-               }).when(mockConsumer).assign(anyListOf(TopicPartition.class));
+               }
 
-               doAnswer(new Answer() {
-                       @Override
-                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               TopicPartition partition = 
invocationOnMock.getArgument(0);
-                               long position = invocationOnMock.getArgument(1);
+               @Override
+               public void subscribe(Pattern pattern, 
ConsumerRebalanceListener consumerRebalanceListener) {
+               }
 
-                               if 
(!mockConsumerAssignmentAndPosition.containsKey(partition)) {
-                                       throw new Exception("the current mock 
assignment does not contain partition " + partition);
-                               } else {
-                                       
mockConsumerAssignmentAndPosition.put(partition, position);
-                               }
-                               return null;
-                       }
-               }).when(mockConsumer).seek(any(TopicPartition.class), 
anyLong());
+               @Override
+               public void unsubscribe() {
+               }
+
+               @Override
+               public ConsumerRecords<byte[], byte[]> poll(long l) {
+                       return mock(ConsumerRecords.class);
+               }
+
+               @Override
+               public void commitSync() {
+               }
+
+               @Override
+               public void commitSync(Map<TopicPartition, OffsetAndMetadata> 
map) {
+               }
+
+               @Override
+               public void commitAsync() {
+               }
+
+               @Override
+               public void commitAsync(OffsetCommitCallback 
offsetCommitCallback) {
+               }
 
-               doAnswer(new Answer() {
-                       @Override
-                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               TopicPartition partition = 
invocationOnMock.getArgument(0);
+               @Override
+               public void commitAsync(Map<TopicPartition, OffsetAndMetadata> 
map, OffsetCommitCallback offsetCommitCallback) {
+               }
 
+               @Override
+               public void seek(TopicPartition partition, long position) {
+                       if 
(!mockConsumerAssignmentAndPosition.containsKey(partition)) {
+                               throw new RuntimeException("the current mock 
assignment does not contain partition " + partition);
+                       } else {
+                               
mockConsumerAssignmentAndPosition.put(partition, position);
+                       }
+               }
+
+               @Override
+               public void seekToBeginning(TopicPartition... partitions) {
+                       for (TopicPartition partition : partitions) {
                                if 
(!mockConsumerAssignmentAndPosition.containsKey(partition)) {
-                                       throw new Exception("the current mock 
assignment does not contain partition " + partition);
+                                       throw new RuntimeException("the current 
mock assignment does not contain partition " + partition);
                                } else {
                                        Long mockRetrievedPosition = 
mockRetrievedPositions.get(partition);
                                        if (mockRetrievedPosition == null) {
-                                               throw new Exception("mock 
consumer needed to retrieve a position, but no value was provided in the mock 
values for retrieval");
+                                               throw new 
RuntimeException("mock consumer needed to retrieve a position, but no value was 
provided in the mock values for retrieval");
                                        } else {
                                                
mockConsumerAssignmentAndPosition.put(partition, 
mockRetrievedPositions.get(partition));
                                        }
                                }
-                               return null;
                        }
-               
}).when(mockConsumer).seekToBeginning(any(TopicPartition.class));
-
-               doAnswer(new Answer() {
-                       @Override
-                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               TopicPartition partition = 
invocationOnMock.getArgument(0);
+               }
 
+               @Override
+               public void seekToEnd(TopicPartition... partitions) {
+                       for (TopicPartition partition : partitions) {
                                if 
(!mockConsumerAssignmentAndPosition.containsKey(partition)) {
-                                       throw new Exception("the current mock 
assignment does not contain partition " + partition);
+                                       throw new RuntimeException("the current 
mock assignment does not contain partition " + partition);
                                } else {
                                        Long mockRetrievedPosition = 
mockRetrievedPositions.get(partition);
                                        if (mockRetrievedPosition == null) {
-                                               throw new Exception("mock 
consumer needed to retrieve a position, but no value was provided in the mock 
values for retrieval");
+                                               throw new 
RuntimeException("mock consumer needed to retrieve a position, but no value was 
provided in the mock values for retrieval");
                                        } else {
                                                
mockConsumerAssignmentAndPosition.put(partition, 
mockRetrievedPositions.get(partition));
                                        }
                                }
-                               return null;
                        }
-               }).when(mockConsumer).seekToEnd(any(TopicPartition.class));
+               }
+
+               @Override
+               public long position(TopicPartition topicPartition) {
+                       if (!earlyWakeup) {
+                               return 
mockConsumerAssignmentAndPosition.get(topicPartition);
+                       } else {
+                               throw new WakeupException();
+                       }
+               }
 
-               return mockConsumer;
+               @Override
+               public OffsetAndMetadata committed(TopicPartition 
topicPartition) {
+                       return null;
+               }
+
+               @Override
+               public Map<MetricName, ? extends Metric> metrics() {
+                       return null;
+               }
+
+               @Override
+               public List<PartitionInfo> partitionsFor(String s) {
+                       return null;
+               }
+
+               @Override
+               public Map<String, List<PartitionInfo>> listTopics() {
+                       return null;
+               }
 
 Review comment:
   Same here with the `UnsupportedOperationException`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to