Thanks Johan, I converted your code to vanilla java with a few small modifications (included below in case anyone wants to use it) and ran it a few times. Seems like it works ok for the quick peek use case, but I wouldn't recommend anyone rely on the accuracy of it since I find, at least in our case, anywhere between 1-10% of the result lines to be corrupt on each call. It looks like in those cases there are a few special chars at the beginning, probably just a function of the header regex being imprecise as you mentioned before.
private List<String> getCurrentLinesFromKafka(String topicName, int linesToFetch) throws UnsupportedEncodingException { int bytesToFetch = linesToFetch*AVG_LINE_SIZE_IN_BYTES; SimpleConsumer sConsumer = new SimpleConsumer(BROKER_NAME, 9092, 1000, 1024000); long[] currentOffset = sConsumer.getOffsetsBefore(topicName, PARTITION_ID, -1, 3); Long offset = Math.max((currentOffset[0] - bytesToFetch), (currentOffset[currentOffset.length - 1])); FetchRequest fetchRequest = new FetchRequest(topicName, 0, offset, bytesToFetch); ByteBufferMessageSet msgBuffer = sConsumer.fetch(fetchRequest); sConsumer.close(); String decStr = decodeBuffer(msgBuffer.getBuffer(), "UTF-8"); String header = "\u0000\u0000.?.?.?.?.?.?.?.?"; String[] strLst = decStr.split(header); if (strLst.length > linesToFetch + 2) { //take only the last linesToFetch of them, also ignore the first and last since they may be corrupted int end = strLst.length - 1; //end is excluded in copyOfRange int start = end - linesToFetch; return Lists.newArrayList(Arrays.copyOfRange(strLst, start, end)); } else if (strLst.length > 2) { //we can at least return something since we have more than the corrupt first and last values int end = strLst.length - 1; //end is excluded in copyOfRange int start = 1; //ignore the probably corrupt first value return Lists.newArrayList(Arrays.copyOfRange(strLst, start, end)); } else { return Lists.newArrayList(); } } private String decodeBuffer(ByteBuffer buffer, String encoding) throws UnsupportedEncodingException { Integer size; try { size = buffer.getInt(); } catch (Exception e) { size = -1; } if (size < 0) { return "No recent messages in topic"; } byte[] bytes = buffer.array(); return new String(bytes, encoding); } On Fri, Jul 19, 2013 at 1:26 PM, Johan Lundahl <johan.lund...@gmail.com>wrote: > Here is my current (very hacky) piece of code handling this part: > > def getLastMessages(fetchSize: Int = 10000): List[String] = { > val sConsumer = new SimpleConsumer(clusterip, 9092, 1000, 1024000) > val currentOffset = sConsumer.getOffsetsBefore(topic, 0, -1, 3) > > val fetchRequest = new FetchRequest(topic, 0, (currentOffset(0) - > fetchSize).max(currentOffset(currentOffset.length - 1)), fetchSize) > val msgBuffer = sConsumer.fetch(fetchRequest) > > sConsumer.close() > > def decodeBuffer(buffer: ByteBuffer, encoding: String, arrSize: Int = > msgBuffer.sizeInBytes.toInt - 6): String = { > val size: Int = Option(try { buffer.getInt } catch { case e: > Throwable => -1 }).getOrElse(-1) > if (size < 0) return s"No recent messages in topic $topic" > val bytes = new Array[Byte](arrSize) > buffer.get(bytes) > new String(bytes, encoding) > } > val decStr = decodeBuffer(msgBuffer.getBuffer, "UTF-8") > > val header = "\u0000\u0000.?.?.?.?.?.?.?.?" > val strLst = decStr.split(header).toList > > if (strLst.size > 1) strLst.tail else strLst > } > > > On Fri, Jul 19, 2013 at 10:02 PM, Shane Moriah <shanemor...@gmail.com > >wrote: > > > I have a similar use-case to Johan. We do stream processing off the > topics > > in the backend but I'd like to expose a recent sample of a topic's data > to > > a front-end web-app (just in a synchronous, > click-a-button-and-see-results > > fashion). If I can only start from the last file offset 500MB behind > > current and not (current - n bytes) then the data might be very stale > > depending on how fast that topic is being filled. I could iterate from > the > > last offset and keep only the final n, but that might mean processing > 500MB > > each time just to grab 10 messages. > > > > Johan, are you using just the simple FetchRequest? Did you get around > the > > InvalidMessageSizeError when you try to force a fetch offset different > from > > those returned by getOffsetsBefore? Or are you also starting from that > > last known offset and iterating forwards by the desired amount? > > > > > > On Fri, Jul 19, 2013 at 11:33 AM, Johan Lundahl <johan.lund...@gmail.com > > >wrote: > > > > > I've had a similar use case where we want to browse and display the > > latest > > > few messages in different topics in a webapp. > > > > > > This kind of works by doing as you describe; submitting a FetchRequest > > with > > > an offset of messages_desired * avg_bytes_per_message plus a bit more. > > > You'll get the ByteBuffer and then you can strip away until you reach a > > > message. How to find where a message starts is not something that I've > > > understood completely yet (I've not studied the protocol very > carefully), > > > but splitting the buffer by the pattern \u0000\u0000.?.?.?.?.?.?.?.? > > seems > > > to work pretty well in our case, at least when there is no batching or > > > compression involved. > > > > > > If someone has hints on a better way to find a message header, I'd also > > > appreciate this info. > > > > > > > > > On Fri, Jul 19, 2013 at 2:17 PM, David Arthur <mum...@gmail.com> > wrote: > > > > > > > There is not index-based access to messages in 0.7 like there is in > > 0.8. > > > > You have to start from a known good offset and iterate through the > > > messages. > > > > > > > > What's your use case? Running a job periodically that reads the > latest > > N > > > > message from the queue? Is it impractical to run from the last known > > > offset > > > > and only keep the last N? > > > > > > > > > > > > On 7/19/13 3:45 AM, Shane Moriah wrote: > > > > > > > >> We're running Kafka 0.7 and I'm hitting some issues trying to access > > the > > > >> newest n messages in a topic (or at least in a broker/partition > combo) > > > and > > > >> wondering if my use case just isn't supported or if I'm missing > > > something. > > > >> What I'd like to be able to do is get the most recent offset from > a > > > >> broker/partition combo, subtract an amount of bytes roughly > equivalent > > > to > > > >> messages_desired*bytes_per_**message and then issue a FetchRequest > > with > > > >> that > > > >> offset and amount of bytes. > > > >> > > > >> I gathered from this > > > >> post<http://mail-archives.**apache.org/mod_mbox/kafka-** > > > >> users/201212.mbox/%3CCCF8F23D.**5e4a%25zhaoyong...@gmail.com%**3E< > > > > > > http://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3cccf8f23d.5e4a%25zhaoyong...@gmail.com%3E > > > > > > > >> > > > > >> that > > > >> I need to use the Simple Consumer in order to do offset manipulation > > > >> beyond > > > >> the start from beginning and start from end options. And I saw from > > > this > > > >> post<http://mail-archives.**apache.org/mod_mbox/incubator-** > > > >> kafka-users/201209.mbox/%**3CCALD69j0iDCZZFF3nm-** > > > >> wrfvW5Y6wwxRZFOL8A1QQFugQUKdo6**x...@mail.gmail.com%3E< > > > > > > http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201209.mbox/%3ccald69j0idczzff3nm-wrfvw5y6wwxrzfol8a1qqfugqukdo...@mail.gmail.com%3E > > > > > > > >> > > > > >> that > > > >> the offsets returned by getOffsetsBefore are really only the major > > > >> checkpoints when files are rolled over, every 500MB by default. I > > also > > > >> found that if I take an offset returned from getOffsetsBefore and > > > subtract > > > >> a fixed value, say 100KB, and submit that offset with a > FetchRequest I > > > get > > > >> a kafka.common.**InvalidMessageSizeException, presumably since my > > > >> computed > > > >> offset didn't align with a real message offset. > > > >> > > > >> As far as I can tell, this leaves me only able to find the most > recent > > > >> milestone offset, perhaps up to 500MB behind current data, and > > extract a > > > >> batch from that point forward. Is there any other way that I'm > missing > > > >> here? The two things that seem to be lacking are access to the most > > > recent > > > >> offset and the ability to rollback from that offset by a fixed > amount > > of > > > >> bytes or messages without triggering the > InvalidMessageSizeException. > > > >> > > > >> Thanks, > > > >> Shane > > > >> > > > >> > > > > > > > > > >