[ https://issues.apache.org/jira/browse/KAFKA-10208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17407913#comment-17407913 ]
chenchao commented on KAFKA-10208: ---------------------------------- should format your code > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.OffsetFetchResponseHandler > return null when Broker unexpectedly doesn't support requireStable flag on > version while not any information > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-10208 > URL: https://issues.apache.org/jira/browse/KAFKA-10208 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 2.7.0 > Reporter: lqjacklee > Priority: Major > > When the 2.7.0 client try to request the broker whose version is 2.3.0, the > OffsetAndMetadata will be null and miss the Key information. > I have create the test case as below : > @Test > public void testCreateTopicAndCheckTheOffsite() throws > ExecutionException, InterruptedException { > String topicName = UUID.randomUUID().toString(); > String groupId = "DEMO_" + topicName; > final Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, groupId); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > String serializer = StringSerializer.class.getName(); > String deserializer = StringDeserializer.class.getName(); > props.put("auto.offset.reset", "latest"); > props.put("session.timeout.ms", "30000"); > props.put("key.deserializer", deserializer); > props.put("value.deserializer", deserializer); > props.put("key.serializer", serializer); > props.put("value.serializer", serializer); > props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); > AdminClient adminClient = AdminClient.create(props); > boolean topicExist = false; > try { > NewTopic newTopic = new NewTopic(topicName, 1, (short) 1); > CreateTopicsOptions createTopicsOptions = new > CreateTopicsOptions(); > createTopicsOptions.timeoutMs(3000000); > final CreateTopicsResult createTopicsResult = > adminClient.createTopics(Collections.singleton(newTopic), > createTopicsOptions); > createTopicsResult.values().get(topicName).get(); > }catch (TopicExistsException e) { > topicExist = true; > } > try { > List<TopicPartition> topicPartitions = new ArrayList<>(); > KafkaConsumer<String,String> kafkaConsumer = new > KafkaConsumer<>(props); > Field kafkaClientField = > kafkaConsumer.getClass().getDeclaredField("client"); > kafkaClientField.setAccessible(true); > ConsumerNetworkClient client = (ConsumerNetworkClient) > kafkaClientField.get(kafkaConsumer); > FindCoordinatorRequest.Builder findCoordinatorRequest = > new FindCoordinatorRequest.Builder( > new FindCoordinatorRequestData() > > .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id()) > .setKey(groupId)); > Node node = client.leastLoadedNode(); > Node coordinator; > RequestFuture<Node> requestCoordinatorFuture = client.send(node, > findCoordinatorRequest) > .compose(new RequestFutureAdapter<ClientResponse, Node>() > { > @Override > public void onFailure(RuntimeException e, > RequestFuture<Node> future) { > super.onFailure(e, future); > } > @Override > public void onSuccess(ClientResponse value, > RequestFuture<Node> future) { > Node coordinator; > FindCoordinatorResponse findCoordinatorResponse = > (FindCoordinatorResponse) value.responseBody(); > Errors error = findCoordinatorResponse.error(); > if (error == Errors.NONE) { > // use MAX_VALUE - node.id as the coordinator > id to allow separate connections > // for the coordinator in the underlying > network client layer > int coordinatorConnectionId = > Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId(); > coordinator = new Node( > coordinatorConnectionId, > findCoordinatorResponse.data().host(), > > findCoordinatorResponse.data().port()); > client.tryConnect(coordinator); > future.complete(coordinator); > } else if (error == > Errors.GROUP_AUTHORIZATION_FAILED) { > Assert.fail(error.message()); > } else { > future.raise(error); > } > } > }); > client.poll(requestCoordinatorFuture); > if (requestCoordinatorFuture.succeeded()) { > coordinator = requestCoordinatorFuture.value(); > } else { > throw requestCoordinatorFuture.exception(); > } > OffsetFetchRequest.Builder requestBuilder = > new OffsetFetchRequest.Builder(groupId, true, > topicPartitions, true); > RequestFuture<Map<TopicPartition, OffsetAndMetadata>> > topicPartitionMetadataRequestFuture = client.send(coordinator, requestBuilder) > .compose(new RequestFutureAdapter<ClientResponse, > Map<TopicPartition, OffsetAndMetadata>>() { > @Override > public void onSuccess(ClientResponse value, > RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) { > OffsetFetchResponse response = > (OffsetFetchResponse) value.responseBody(); > if (response.hasError()) { > Errors error = response.error(); > if (error == > Errors.COORDINATOR_LOAD_IN_PROGRESS) { > // just retry > future.raise(error); > } else if (error == Errors.NOT_COORDINATOR) { > // re-discover the coordinator and retry > future.raise(error); > } else if (error == > Errors.GROUP_AUTHORIZATION_FAILED) { > > Assert.fail(Errors.GROUP_AUTHORIZATION_FAILED + ""); > } else { > future.raise(new > KafkaException("Unexpected error in fetch offset response: " + > error.message())); > } > return; > } > Set<String> unauthorizedTopics = null; > Map<TopicPartition, OffsetAndMetadata> offsets = > new HashMap<>(response.responseData().size()); > Set<TopicPartition> > unstableTxnOffsetTopicPartitions = new HashSet<>(); > for (Map.Entry<TopicPartition, > OffsetFetchResponse.PartitionData> entry : > response.responseData().entrySet()) { > TopicPartition tp = entry.getKey(); > OffsetFetchResponse.PartitionData > partitionData = entry.getValue(); > if (partitionData.hasError()) { > Errors error = partitionData.error; > if (error == > Errors.UNKNOWN_TOPIC_OR_PARTITION) { > future.raise(new > KafkaException("Topic or Partition " + tp + " does not exist")); > return; > } else if (error == > Errors.TOPIC_AUTHORIZATION_FAILED) { > if (unauthorizedTopics == null) { > unauthorizedTopics = new > HashSet<>(); > } > unauthorizedTopics.add(tp.topic()); > } else if (error == > Errors.UNSTABLE_OFFSET_COMMIT) { > > unstableTxnOffsetTopicPartitions.add(tp); > } else { > future.raise(new > KafkaException("Unexpected error in fetch offset response for partition " + > tp + ": " + error.message())); > return; > } > } else if (partitionData.offset >= 0) { > // record the position with the offset > (-1 indicates no committed offset to fetch); > // if there's no committed offset, record > as null > offsets.put(tp, new > OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, > partitionData.metadata)); > } else { > try { > HashMap<TopicPartition, OffsetSpec> > offsetMap = new HashMap<>(); > offsetMap.put(tp, > OffsetSpec.earliest()); > ListOffsetsResult listOffsetsResult = > adminClient.listOffsets(offsetMap); > Map<TopicPartition, > ListOffsetsResult.ListOffsetsResultInfo> > topicPartitionListOffsetsResultInfoMap = listOffsetsResult.all().get(); > > ListOffsetsResult.ListOffsetsResultInfo offsetsResultInfo = > topicPartitionListOffsetsResultInfoMap.get(tp); > offsets.put(tp, new > OffsetAndMetadata(offsetsResultInfo.offset(), > offsetsResultInfo.leaderEpoch(), "")); > } catch (Exception e) { > Assert.fail(e.getMessage()); > } > } > Assert.fail("not found the topic and > partition"); > } > if (unauthorizedTopics != null) { > future.raise(new > TopicAuthorizationException(unauthorizedTopics)); > } else if > (!unstableTxnOffsetTopicPartitions.isEmpty()) { > // just retry > future.raise(new > UnstableOffsetCommitException("There are unstable offsets for the requested > topic partitions")); > } else { > future.complete(offsets); > } > } > @Override > public void onFailure(RuntimeException e, > RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) { > super.onFailure(e, future); > } > }); > client.poll(topicPartitionMetadataRequestFuture); > if(topicPartitionMetadataRequestFuture.succeeded()) { > Map<TopicPartition, OffsetAndMetadata> value = > topicPartitionMetadataRequestFuture.value(); > Assert.assertNotNull(value); > }else { > > Assert.fail(topicPartitionMetadataRequestFuture.exception().getMessage()); > } > }catch (Exception e) { > Assert.fail(e.getMessage()); > }finally { > if(topicExist) { > List<String> topicToDeleted = new ArrayList<>(); > DeleteTopicsResult deleteTopicsResult = > adminClient.deleteTopics(topicToDeleted); > deleteTopicsResult.all().get(); > } > } > } -- This message was sent by Atlassian Jira (v8.3.4#803005)