[ 
https://issues.apache.org/jira/browse/KAFKA-7258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ansel Zandegran updated KAFKA-7258:
-----------------------------------
    Description: 
We are building a metrics reporter and one of the tests find different offset 
(6) in Linux. The following tests pass in Windows
{code:java}
createTopic(TOPIC1, 1, 1); // 1 partition 1 replication
runConsumer(TOPIC1); 
runProducer(TOPIC1, "key", "value", 5); // 5 messages
zkMetricsReporter.sendConsumerLagMetrics(Clock.defaultClock().time() /
1000);
PartionOffsets partionOffsets = getPartionOffset(); // For topic1-0
assertEquals(5, partionOffsets.getCurrentOffset());
assertEquals(5, partionOffsets.getEndOffset());
{code}
and this is how we get offsets
{code:java}
Map<TopicPartition, Long> logEndOffset = getLogEndOffset(topic, host);

KafkaConsumer<?, ?> consumer = createNewConsumer(groupId, host);

BinaryOperator<PartionOffsets> mergeFunction = (a, b) -> {
throw new IllegalStateException();
};

Map<TopicPartition, PartionOffsets> result =
logEndOffset.entrySet()
.stream()
.collect(Collectors.toMap(entry -> (entry.getKey()),
entry -> {
OffsetAndMetadata committed =
consumer.committed(entry.getKey());
return new PartionOffsets(entry.getValue(),
committed.offset(),
entry.getKey()
.partition(),
topic);
},
mergeFunction));
{code}

  was:
We are building a metrics reporter and one of the tests find different offset 
(6) in Linux. The following tests pass in Windows
{code:java}
createTopic(TOPIC1, 1, 1);
runConsumer(TOPIC1);
runProducer(TOPIC1, "key1", "value1", 5); // 5 messages
zkMetricsReporter.sendConsumerLagMetrics(Clock.defaultClock().time() /
1000);
PartionOffsets partionOffsets = getPartionOffset(); // For topic1-0
assertEquals(5, partionOffsets.getCurrentOffset());
assertEquals(5, partionOffsets.getEndOffset());
{code}
and this is how we get offsets
{code:java}
Map<TopicPartition, Long> logEndOffset = getLogEndOffset(topic, host);

KafkaConsumer<?, ?> consumer = createNewConsumer(groupId, host);

BinaryOperator<PartionOffsets> mergeFunction = (a, b) -> {
throw new IllegalStateException();
};

Map<TopicPartition, PartionOffsets> result =
logEndOffset.entrySet()
.stream()
.collect(Collectors.toMap(entry -> (entry.getKey()),
entry -> {
OffsetAndMetadata committed =
consumer.committed(entry.getKey());
return new PartionOffsets(entry.getValue(),
committed.offset(),
entry.getKey()
.partition(),
topic);
},
mergeFunction));
{code}


> Different offset numbers in Windows and Linux
> ---------------------------------------------
>
>                 Key: KAFKA-7258
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7258
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 1.0.1
>         Environment: windows10
>            Reporter: Ansel Zandegran
>            Priority: Major
>
> We are building a metrics reporter and one of the tests find different offset 
> (6) in Linux. The following tests pass in Windows
> {code:java}
> createTopic(TOPIC1, 1, 1); // 1 partition 1 replication
> runConsumer(TOPIC1); 
> runProducer(TOPIC1, "key", "value", 5); // 5 messages
> zkMetricsReporter.sendConsumerLagMetrics(Clock.defaultClock().time() /
> 1000);
> PartionOffsets partionOffsets = getPartionOffset(); // For topic1-0
> assertEquals(5, partionOffsets.getCurrentOffset());
> assertEquals(5, partionOffsets.getEndOffset());
> {code}
> and this is how we get offsets
> {code:java}
> Map<TopicPartition, Long> logEndOffset = getLogEndOffset(topic, host);
> KafkaConsumer<?, ?> consumer = createNewConsumer(groupId, host);
> BinaryOperator<PartionOffsets> mergeFunction = (a, b) -> {
> throw new IllegalStateException();
> };
> Map<TopicPartition, PartionOffsets> result =
> logEndOffset.entrySet()
> .stream()
> .collect(Collectors.toMap(entry -> (entry.getKey()),
> entry -> {
> OffsetAndMetadata committed =
> consumer.committed(entry.getKey());
> return new PartionOffsets(entry.getValue(),
> committed.offset(),
> entry.getKey()
> .partition(),
> topic);
> },
> mergeFunction));
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to