[
https://issues.apache.org/jira/browse/KAFKA-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14715430#comment-14715430
]
Will Funnell edited comment on KAFKA-1977 at 8/26/15 8:12 PM:
--------------------------------------------------------------
I would still definitely like to see this in the new consumer, I think its a
small thing to include, but very useful, especially when determining when you
have reached the end of our log compacted topics.
Our implementation is as follows:
{code}
Iterator<ConsumerMessage> iterator = new Iterator<ConsumerMessage>() {
public boolean finished;
private Integer partition;
private ConsumerIterator<byte[], byte[]> it = stream.iterator();
@Override
public boolean hasNext() {
if (finished) {
return false;
} else {
try {
return it.hasNext();
} catch (Exception e) {
if (hasBeenForciblyShutdownByClient(e)) {
consumer.shutdown();
return false;
}
LOG.error("partition={} description=\"Error while
fetching from Kafka\"", partition, e);
throw e;
}
}
}
@Override
public ConsumerMessage next() {
MessageAndMetadata<byte[], byte[]> messageAndMetadata =
it.next();
count++;
if (partition == null) {
partition = messageAndMetadata.partition();
}
if (messageAndMetadata.offset() ==
messageAndMetadata.logEndOffset() - 1) {
finished = true;
LOG.info("partition=\"{}\", description=\"Finished with
partition\"", messageAndMetadata.partition());
}
return toConsumedMessage(messageAndMetadata);
}
@Override
public void remove() {
it.remove();
}
private boolean hasBeenForciblyShutdownByClient(Exception e) {
return e instanceof InterruptedException;
}
};
{code}
Not quite sure how this translates to the new Consumer yet.
was (Author: willf):
I would still definitely like to see this in the new consumer, I think its a
small thing to include, but very useful, especially when determining when you
have reached the end of our log compacted topics.
Our implementation is as follows:
{code}
Iterator<ConsumerMessage> iterator = new Iterator<ConsumerMessage>() {
public boolean finished;
private Integer partition;
private ConsumerIterator<byte[], byte[]> it = stream.iterator();
private long count;
@Override
public boolean hasNext() {
if (finished) {
return false;
} else {
try {
return it.hasNext();
} catch (Exception e) {
if (hasBeenForciblyShutdownByClient(e)) {
consumer.shutdown();
return false;
}
LOG.error("partition={} description=\"Error while
fetching from Kafka\"", partition, e);
throw e;
}
}
}
@Override
public ConsumerMessage next() {
MessageAndMetadata<byte[], byte[]> messageAndMetadata =
it.next();
count++;
if (partition == null) {
partition = messageAndMetadata.partition();
}
if (LOG.isDebugEnabled()) {
LOG.debug("count={} partition={} description=\"Messages
read from Kafka\"", count, messageAndMetadata.partition());
}
if (messageAndMetadata.offset() ==
messageAndMetadata.logEndOffset() - 1) {
finished = true;
LOG.info("partition=\"{}\", description=\"Finished with
partition\"", messageAndMetadata.partition());
}
return toConsumedMessage(messageAndMetadata);
}
@Override
public void remove() {
it.remove();
}
private boolean hasBeenForciblyShutdownByClient(Exception e) {
return e instanceof InterruptedException;
}
};
{code}
Not quite sure how this translates to the new Consumer yet.
> Make logEndOffset available in the Zookeeper consumer
> -----------------------------------------------------
>
> Key: KAFKA-1977
> URL: https://issues.apache.org/jira/browse/KAFKA-1977
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Reporter: Will Funnell
> Priority: Minor
> Attachments:
> Make_logEndOffset_available_in_the_Zookeeper_consumer.patch
>
>
> The requirement is to create a snapshot from the Kafka topic but NOT do
> continual reads after that point. For example you might be creating a backup
> of the data to a file.
> In order to achieve that, a recommended solution by Joel Koshy and Jay Kreps
> was to expose the high watermark, as maxEndOffset, from the FetchResponse
> object through to each MessageAndMetadata object in order to be aware when
> the consumer has reached the end of each partition.
> The submitted patch achieves this by adding the maxEndOffset to the
> PartitionTopicInfo, which is updated when a new message arrives in the
> ConsumerFetcherThread and then exposed in MessageAndMetadata.
> See here for discussion:
> http://search-hadoop.com/m/4TaT4TpJy71
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)