Re: Retrieve most-recent-n messages from kafka topic

2013-07-21 Thread Shane Moriah
Thanks Johan,

I converted your code to vanilla java with a few small modifications
(included below in case anyone wants to use it) and ran it a few times.
 Seems like it works ok for the quick peek use case, but I wouldn't
recommend anyone rely on the accuracy of it since I find, at least in our
case, anywhere between 1-10% of the result lines to be corrupt on each
call. It looks like in those cases there are a few special chars at the
beginning, probably just a function of the header regex being imprecise as
you mentioned before.

private List getCurrentLinesFromKafka(String topicName, int
linesToFetch) throws UnsupportedEncodingException {
int bytesToFetch = linesToFetch*AVG_LINE_SIZE_IN_BYTES;
SimpleConsumer sConsumer = new SimpleConsumer(BROKER_NAME, 9092,
1000, 1024000);
long[] currentOffset = sConsumer.getOffsetsBefore(topicName,
PARTITION_ID, -1, 3);

Long offset = Math.max((currentOffset[0] - bytesToFetch),
(currentOffset[currentOffset.length - 1]));
FetchRequest fetchRequest = new FetchRequest(topicName, 0, offset,
bytesToFetch);
ByteBufferMessageSet msgBuffer = sConsumer.fetch(fetchRequest);

sConsumer.close();

String decStr = decodeBuffer(msgBuffer.getBuffer(), "UTF-8");

String header = "\u\u.?.?.?.?.?.?.?.?";
String[] strLst = decStr.split(header);

if (strLst.length > linesToFetch + 2) {
//take only the last linesToFetch of them, also ignore the
first and last since they may be corrupted
int end = strLst.length - 1;  //end is excluded in copyOfRange
int start = end - linesToFetch;
return Lists.newArrayList(Arrays.copyOfRange(strLst, start,
end));
} else if (strLst.length > 2) {
//we can at least return something since we have more than the
corrupt first and last values
int end = strLst.length - 1;  //end is excluded in copyOfRange
int start = 1;//ignore the probably corrupt
first value
return Lists.newArrayList(Arrays.copyOfRange(strLst, start,
end));
} else {
return Lists.newArrayList();
}
}

private String decodeBuffer(ByteBuffer buffer, String encoding) throws
UnsupportedEncodingException {
Integer size;
try {
size = buffer.getInt();
} catch (Exception e) {
size = -1;
}

if (size < 0) {
return "No recent messages in topic";
}

byte[] bytes = buffer.array();
return new String(bytes, encoding);
}


On Fri, Jul 19, 2013 at 1:26 PM, Johan Lundahl wrote:

> Here is my current (very hacky) piece of code handling this part:
>
>   def getLastMessages(fetchSize: Int = 1): List[String] = {
> val sConsumer = new SimpleConsumer(clusterip, 9092, 1000, 1024000)
> val currentOffset = sConsumer.getOffsetsBefore(topic, 0, -1, 3)
>
> val fetchRequest = new FetchRequest(topic, 0, (currentOffset(0) -
> fetchSize).max(currentOffset(currentOffset.length - 1)), fetchSize)
> val msgBuffer = sConsumer.fetch(fetchRequest)
>
> sConsumer.close()
>
> def decodeBuffer(buffer: ByteBuffer, encoding: String, arrSize: Int =
> msgBuffer.sizeInBytes.toInt - 6): String = {
>   val size: Int = Option(try { buffer.getInt } catch { case e:
> Throwable => -1 }).getOrElse(-1)
>   if (size < 0) return s"No recent messages in topic $topic"
>   val bytes = new Array[Byte](arrSize)
>   buffer.get(bytes)
>   new String(bytes, encoding)
> }
> val decStr = decodeBuffer(msgBuffer.getBuffer, "UTF-8")
>
> val header = "\u\u.?.?.?.?.?.?.?.?"
>     val strLst = decStr.split(header).toList
>
> if (strLst.size > 1) strLst.tail else strLst
>   }
>
>
> On Fri, Jul 19, 2013 at 10:02 PM, Shane Moriah  >wrote:
>
> > I have a similar use-case to Johan.  We do stream processing off the
> topics
> > in the backend but I'd like to expose a recent sample of a topic's data
> to
> > a front-end web-app (just in a synchronous,
> click-a-button-and-see-results
> > fashion).  If I can only start from the last file offset 500MB behind
> > current and not (current - n bytes)  then the data might be very stale
> > depending on how fast that topic is being filled. I could iterate from
> the
> > last offset and keep only the final n, but that might mean processing
> 500MB
> > each time just to grab 10 messages.
> >
> > Johan, are you using just the simple FetchRequest?  Did you get around
> the
> > InvalidMessageSizeError when you try to force a fetch offset different
> from
> > those returned by getOffsetsBefore?  Or are you also starting fro

Re: Retrieve most-recent-n messages from kafka topic

2013-07-19 Thread Shane Moriah
I have a similar use-case to Johan.  We do stream processing off the topics
in the backend but I'd like to expose a recent sample of a topic's data to
a front-end web-app (just in a synchronous, click-a-button-and-see-results
fashion).  If I can only start from the last file offset 500MB behind
current and not (current - n bytes)  then the data might be very stale
depending on how fast that topic is being filled. I could iterate from the
last offset and keep only the final n, but that might mean processing 500MB
each time just to grab 10 messages.

Johan, are you using just the simple FetchRequest?  Did you get around the
InvalidMessageSizeError when you try to force a fetch offset different from
those returned by getOffsetsBefore?  Or are you also starting from that
last known offset and iterating forwards by the desired amount?


On Fri, Jul 19, 2013 at 11:33 AM, Johan Lundahl wrote:

> I've had a similar use case where we want to browse and display the latest
> few messages in different topics in a webapp.
>
> This kind of works by doing as you describe; submitting a FetchRequest with
> an offset of messages_desired * avg_bytes_per_message plus a bit more.
> You'll get the ByteBuffer and then you can strip away until you reach a
> message. How to find where a message starts is not something that I've
> understood completely yet (I've not studied the protocol very carefully),
> but splitting the buffer by the pattern \u\u.?.?.?.?.?.?.?.? seems
> to work pretty well in our case, at least when there is no batching or
> compression involved.
>
> If someone has hints on a better way to find a message header, I'd also
> appreciate this info.
>
>
> On Fri, Jul 19, 2013 at 2:17 PM, David Arthur  wrote:
>
> > There is not index-based access to messages in 0.7 like there is in 0.8.
> > You have to start from a known good offset and iterate through the
> messages.
> >
> > What's your use case? Running a job periodically that reads the latest N
> > message from the queue? Is it impractical to run from the last known
> offset
> > and only keep the last N?
> >
> >
> > On 7/19/13 3:45 AM, Shane Moriah wrote:
> >
> >> We're running Kafka 0.7 and I'm hitting some issues trying to access the
> >> newest n messages in a topic (or at least in a broker/partition combo)
> and
> >> wondering if my use case just isn't supported or if I'm missing
> something.
> >>   What I'd like to be able to do is get the most recent offset from a
> >> broker/partition combo, subtract an amount of bytes roughly equivalent
> to
> >> messages_desired*bytes_per_**message and then issue a FetchRequest with
> >> that
> >> offset and amount of bytes.
> >>
> >> I gathered from this
> >> post<http://mail-archives.**apache.org/mod_mbox/kafka-**
> >> users/201212.mbox/%3CCCF8F23D.**5e4a%25zhaoyong...@gmail.com%**3E<
> http://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3cccf8f23d.5e4a%25zhaoyong...@gmail.com%3E
> >
> >> >
> >> that
> >> I need to use the Simple Consumer in order to do offset manipulation
> >> beyond
> >> the start from beginning and start from end options.  And I saw from
> this
> >> post<http://mail-archives.**apache.org/mod_mbox/incubator-**
> >> kafka-users/201209.mbox/%**3CCALD69j0iDCZZFF3nm-**
> >> wrfvW5Y6wwxRZFOL8A1QQFugQUKdo6**x...@mail.gmail.com%3E<
> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201209.mbox/%3ccald69j0idczzff3nm-wrfvw5y6wwxrzfol8a1qqfugqukdo...@mail.gmail.com%3E
> >
> >> >
> >> that
> >> the offsets returned by getOffsetsBefore are really only the major
> >> checkpoints when files are rolled over, every 500MB by default.  I also
> >> found that if I take an offset returned from getOffsetsBefore and
> subtract
> >> a fixed value, say 100KB, and submit that offset with a FetchRequest I
> get
> >> a kafka.common.**InvalidMessageSizeException, presumably since my
> >> computed
> >> offset didn't align with a real message offset.
> >>
> >> As far as I can tell, this leaves me only able to find the most recent
> >> milestone offset, perhaps up to 500MB behind current data, and extract a
> >> batch from that point forward. Is there any other way that I'm missing
> >> here? The two things that seem to be lacking are access to the most
> recent
> >> offset and the ability to rollback from that offset by a fixed amount of
> >> bytes or messages without triggering the InvalidMessageSizeException.
> >>
> >> Thanks,
> >> Shane
> >>
> >>
> >
>


Retrieve most-recent-n messages from kafka topic

2013-07-19 Thread Shane Moriah
We're running Kafka 0.7 and I'm hitting some issues trying to access the
newest n messages in a topic (or at least in a broker/partition combo) and
wondering if my use case just isn't supported or if I'm missing something.
 What I'd like to be able to do is get the most recent offset from a
broker/partition combo, subtract an amount of bytes roughly equivalent to
messages_desired*bytes_per_message and then issue a FetchRequest with that
offset and amount of bytes.

I gathered from this
post
that
I need to use the Simple Consumer in order to do offset manipulation beyond
the start from beginning and start from end options.  And I saw from this
post
that
the offsets returned by getOffsetsBefore are really only the major
checkpoints when files are rolled over, every 500MB by default.  I also
found that if I take an offset returned from getOffsetsBefore and subtract
a fixed value, say 100KB, and submit that offset with a FetchRequest I get
a kafka.common.InvalidMessageSizeException, presumably since my computed
offset didn't align with a real message offset.

As far as I can tell, this leaves me only able to find the most recent
milestone offset, perhaps up to 500MB behind current data, and extract a
batch from that point forward. Is there any other way that I'm missing
here? The two things that seem to be lacking are access to the most recent
offset and the ability to rollback from that offset by a fixed amount of
bytes or messages without triggering the InvalidMessageSizeException.

Thanks,
Shane