[ 
https://issues.apache.org/jira/browse/KAFKA-10208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17407913#comment-17407913
 ] 

chenchao commented on KAFKA-10208:
----------------------------------

 should format your code

> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.OffsetFetchResponseHandler
>  return null when Broker unexpectedly doesn't support requireStable flag on 
> version while not any information
> -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10208
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10208
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.7.0
>            Reporter: lqjacklee
>            Priority: Major
>
> When the 2.7.0 client try to request the broker whose version is 2.3.0, the 
> OffsetAndMetadata will be null and miss the Key information. 
> I have create the test case as below : 
> @Test
>     public void testCreateTopicAndCheckTheOffsite() throws 
> ExecutionException, InterruptedException {
>         String topicName = UUID.randomUUID().toString();
>         String groupId = "DEMO_" + topicName;
>         final Properties props = new Properties();
>         props.put(StreamsConfig.APPLICATION_ID_CONFIG, groupId);
>         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>         String serializer = StringSerializer.class.getName();
>         String deserializer = StringDeserializer.class.getName();
>         props.put("auto.offset.reset", "latest");
>         props.put("session.timeout.ms", "30000");
>         props.put("key.deserializer", deserializer);
>         props.put("value.deserializer", deserializer);
>         props.put("key.serializer", serializer);
>         props.put("value.serializer", serializer);
>         props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
>         AdminClient adminClient = AdminClient.create(props);
>         boolean topicExist = false;
>         try {
>             NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
>             CreateTopicsOptions createTopicsOptions = new 
> CreateTopicsOptions();
>             createTopicsOptions.timeoutMs(3000000);
>             final CreateTopicsResult createTopicsResult = 
> adminClient.createTopics(Collections.singleton(newTopic), 
> createTopicsOptions);
>             createTopicsResult.values().get(topicName).get();
>         }catch (TopicExistsException e) {
>             topicExist = true;
>         }
>         try {
>             List<TopicPartition> topicPartitions = new ArrayList<>();
>             KafkaConsumer<String,String> kafkaConsumer = new 
> KafkaConsumer<>(props);
>             Field kafkaClientField = 
> kafkaConsumer.getClass().getDeclaredField("client");
>             kafkaClientField.setAccessible(true);
>             ConsumerNetworkClient client = (ConsumerNetworkClient) 
> kafkaClientField.get(kafkaConsumer);
>             FindCoordinatorRequest.Builder findCoordinatorRequest =
>                     new FindCoordinatorRequest.Builder(
>                             new FindCoordinatorRequestData()
>                                     
> .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
>                                     .setKey(groupId));
>             Node node = client.leastLoadedNode();
>             Node coordinator;
>             RequestFuture<Node> requestCoordinatorFuture = client.send(node, 
> findCoordinatorRequest)
>                     .compose(new RequestFutureAdapter<ClientResponse, Node>() 
> {
>                         @Override
>                         public void onFailure(RuntimeException e, 
> RequestFuture<Node> future) {
>                             super.onFailure(e, future);
>                         }
>                         @Override
>                         public void onSuccess(ClientResponse value, 
> RequestFuture<Node> future) {
>                             Node coordinator;
>                             FindCoordinatorResponse findCoordinatorResponse = 
> (FindCoordinatorResponse) value.responseBody();
>                             Errors error = findCoordinatorResponse.error();
>                             if (error == Errors.NONE) {
>                                 // use MAX_VALUE - node.id as the coordinator 
> id to allow separate connections
>                                 // for the coordinator in the underlying 
> network client layer
>                                 int coordinatorConnectionId = 
> Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
>                                 coordinator = new Node(
>                                         coordinatorConnectionId,
>                                         findCoordinatorResponse.data().host(),
>                                         
> findCoordinatorResponse.data().port());
>                                 client.tryConnect(coordinator);
>                                 future.complete(coordinator);
>                             } else if (error == 
> Errors.GROUP_AUTHORIZATION_FAILED) {
>                                 Assert.fail(error.message());
>                             } else {
>                                 future.raise(error);
>                             }
>                         }
>                     });
>             client.poll(requestCoordinatorFuture);
>             if (requestCoordinatorFuture.succeeded()) {
>                 coordinator = requestCoordinatorFuture.value();
>             } else {
>                 throw requestCoordinatorFuture.exception();
>             }
>             OffsetFetchRequest.Builder requestBuilder =
>                     new OffsetFetchRequest.Builder(groupId, true, 
> topicPartitions, true);
>             RequestFuture<Map<TopicPartition, OffsetAndMetadata>> 
> topicPartitionMetadataRequestFuture = client.send(coordinator, requestBuilder)
>                     .compose(new RequestFutureAdapter<ClientResponse, 
> Map<TopicPartition, OffsetAndMetadata>>() {
>                         @Override
>                         public void onSuccess(ClientResponse value, 
> RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
>                             OffsetFetchResponse response = 
> (OffsetFetchResponse) value.responseBody();
>                             if (response.hasError()) {
>                                 Errors error = response.error();
>                                 if (error == 
> Errors.COORDINATOR_LOAD_IN_PROGRESS) {
>                                     // just retry
>                                     future.raise(error);
>                                 } else if (error == Errors.NOT_COORDINATOR) {
>                                     // re-discover the coordinator and retry
>                                     future.raise(error);
>                                 } else if (error == 
> Errors.GROUP_AUTHORIZATION_FAILED) {
>                                     
> Assert.fail(Errors.GROUP_AUTHORIZATION_FAILED + "");
>                                 } else {
>                                     future.raise(new 
> KafkaException("Unexpected error in fetch offset response: " + 
> error.message()));
>                                 }
>                                 return;
>                             }
>                             Set<String> unauthorizedTopics = null;
>                             Map<TopicPartition, OffsetAndMetadata> offsets = 
> new HashMap<>(response.responseData().size());
>                             Set<TopicPartition> 
> unstableTxnOffsetTopicPartitions = new HashSet<>();
>                             for (Map.Entry<TopicPartition, 
> OffsetFetchResponse.PartitionData> entry : 
> response.responseData().entrySet()) {
>                                 TopicPartition tp = entry.getKey();
>                                 OffsetFetchResponse.PartitionData 
> partitionData = entry.getValue();
>                                 if (partitionData.hasError()) {
>                                     Errors error = partitionData.error;
>                                     if (error == 
> Errors.UNKNOWN_TOPIC_OR_PARTITION) {
>                                         future.raise(new 
> KafkaException("Topic or Partition " + tp + " does not exist"));
>                                         return;
>                                     } else if (error == 
> Errors.TOPIC_AUTHORIZATION_FAILED) {
>                                         if (unauthorizedTopics == null) {
>                                             unauthorizedTopics = new 
> HashSet<>();
>                                         }
>                                         unauthorizedTopics.add(tp.topic());
>                                     } else if (error == 
> Errors.UNSTABLE_OFFSET_COMMIT) {
>                                         
> unstableTxnOffsetTopicPartitions.add(tp);
>                                     } else {
>                                         future.raise(new 
> KafkaException("Unexpected error in fetch offset response for partition " +
>                                                 tp + ": " + error.message()));
>                                         return;
>                                     }
>                                 } else if (partitionData.offset >= 0) {
>                                     // record the position with the offset 
> (-1 indicates no committed offset to fetch);
>                                     // if there's no committed offset, record 
> as null
>                                     offsets.put(tp, new 
> OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, 
> partitionData.metadata));
>                                 } else {
>                                     try {
>                                         HashMap<TopicPartition, OffsetSpec> 
> offsetMap = new HashMap<>();
>                                         offsetMap.put(tp, 
> OffsetSpec.earliest());
>                                         ListOffsetsResult listOffsetsResult = 
> adminClient.listOffsets(offsetMap);
>                                         Map<TopicPartition, 
> ListOffsetsResult.ListOffsetsResultInfo> 
> topicPartitionListOffsetsResultInfoMap = listOffsetsResult.all().get();
>                                         
> ListOffsetsResult.ListOffsetsResultInfo offsetsResultInfo = 
> topicPartitionListOffsetsResultInfoMap.get(tp);
>                                         offsets.put(tp, new 
> OffsetAndMetadata(offsetsResultInfo.offset(), 
> offsetsResultInfo.leaderEpoch(), ""));
>                                     } catch (Exception e) {
>                                         Assert.fail(e.getMessage());
>                                     }
>                                 }
>                                 Assert.fail("not found the topic and 
> partition");
>                             }
>                             if (unauthorizedTopics != null) {
>                                 future.raise(new 
> TopicAuthorizationException(unauthorizedTopics));
>                             } else if 
> (!unstableTxnOffsetTopicPartitions.isEmpty()) {
>                                 // just retry
>                                 future.raise(new 
> UnstableOffsetCommitException("There are unstable offsets for the requested 
> topic partitions"));
>                             } else {
>                                 future.complete(offsets);
>                             }
>                         }
>                         @Override
>                         public void onFailure(RuntimeException e, 
> RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
>                             super.onFailure(e, future);
>                         }
>                     });
>             client.poll(topicPartitionMetadataRequestFuture);
>             if(topicPartitionMetadataRequestFuture.succeeded()) {
>                 Map<TopicPartition, OffsetAndMetadata> value = 
> topicPartitionMetadataRequestFuture.value();
>                 Assert.assertNotNull(value);
>             }else {
>                 
> Assert.fail(topicPartitionMetadataRequestFuture.exception().getMessage());
>             }
>         }catch (Exception e) {
>             Assert.fail(e.getMessage());
>         }finally {
>             if(topicExist) {
>                 List<String> topicToDeleted = new ArrayList<>();
>                 DeleteTopicsResult deleteTopicsResult = 
> adminClient.deleteTopics(topicToDeleted);
>                 deleteTopicsResult.all().get();
>             }
>         }
>     }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to