[ https://issues.apache.org/jira/browse/KAFKA-7258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ansel Zandegran updated KAFKA-7258: ----------------------------------- Description: We are building a metrics reporter and one of the tests find different offset (6) in Linux. The following tests pass in Windows {code:java} createTopic(TOPIC1, 1, 1); runConsumer(TOPIC1); runProducer(TOPIC1, "key1", "value1", 5); // 5 messages zkMetricsReporter.sendConsumerLagMetrics(Clock.defaultClock().time() / 1000); PartionOffsets partionOffsets = getPartionOffset(); // For topic1-0 assertEquals(5, partionOffsets.getCurrentOffset()); assertEquals(5, partionOffsets.getEndOffset()); {code} and this is how we get offsets {code:java} Map<TopicPartition, Long> logEndOffset = getLogEndOffset(topic, host); KafkaConsumer<?, ?> consumer = createNewConsumer(groupId, host); BinaryOperator<PartionOffsets> mergeFunction = (a, b) -> { throw new IllegalStateException(); }; Map<TopicPartition, PartionOffsets> result = logEndOffset.entrySet() .stream() .collect(Collectors.toMap(entry -> (entry.getKey()), entry -> { OffsetAndMetadata committed = consumer.committed(entry.getKey()); return new PartionOffsets(entry.getValue(), committed.offset(), entry.getKey() .partition(), topic); }, mergeFunction)); {code} was: We are building a metrics reporter and one of the tests find different offset numbers (6) in Linux. The following tests pass in Windows {code:java} createTopic(TOPIC1, 1, 1); runConsumer(TOPIC1); runProducer(TOPIC1, "key1", "value1", 5); // 5 messages zkMetricsReporter.sendConsumerLagMetrics(Clock.defaultClock().time() / 1000); PartionOffsets partionOffsets = getPartionOffset(); // For topic1-0 assertEquals(5, partionOffsets.getCurrentOffset()); assertEquals(5, partionOffsets.getEndOffset()); {code} and this is how we get offsets {code:java} Map<TopicPartition, Long> logEndOffset = getLogEndOffset(topic, host); KafkaConsumer<?, ?> consumer = createNewConsumer(groupId, host); BinaryOperator<PartionOffsets> mergeFunction = (a, b) -> { throw new IllegalStateException(); }; Map<TopicPartition, PartionOffsets> result = logEndOffset.entrySet() .stream() .collect(Collectors.toMap(entry -> (entry.getKey()), entry -> { OffsetAndMetadata committed = consumer.committed(entry.getKey()); return new PartionOffsets(entry.getValue(), committed.offset(), entry.getKey() .partition(), topic); }, mergeFunction)); {code} > Different offset numbers in Windows and Linux > --------------------------------------------- > > Key: KAFKA-7258 > URL: https://issues.apache.org/jira/browse/KAFKA-7258 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 1.0.1 > Environment: windows10 > Reporter: Ansel Zandegran > Priority: Major > > We are building a metrics reporter and one of the tests find different offset > (6) in Linux. The following tests pass in Windows > {code:java} > createTopic(TOPIC1, 1, 1); > runConsumer(TOPIC1); > runProducer(TOPIC1, "key1", "value1", 5); // 5 messages > zkMetricsReporter.sendConsumerLagMetrics(Clock.defaultClock().time() / > 1000); > PartionOffsets partionOffsets = getPartionOffset(); // For topic1-0 > assertEquals(5, partionOffsets.getCurrentOffset()); > assertEquals(5, partionOffsets.getEndOffset()); > {code} > and this is how we get offsets > {code:java} > Map<TopicPartition, Long> logEndOffset = getLogEndOffset(topic, host); > KafkaConsumer<?, ?> consumer = createNewConsumer(groupId, host); > BinaryOperator<PartionOffsets> mergeFunction = (a, b) -> { > throw new IllegalStateException(); > }; > Map<TopicPartition, PartionOffsets> result = > logEndOffset.entrySet() > .stream() > .collect(Collectors.toMap(entry -> (entry.getKey()), > entry -> { > OffsetAndMetadata committed = > consumer.committed(entry.getKey()); > return new PartionOffsets(entry.getValue(), > committed.offset(), > entry.getKey() > .partition(), > topic); > }, > mergeFunction)); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)