Thanks Johan,

I converted your code to vanilla java with a few small modifications
(included below in case anyone wants to use it) and ran it a few times.
 Seems like it works ok for the quick peek use case, but I wouldn't
recommend anyone rely on the accuracy of it since I find, at least in our
case, anywhere between 1-10% of the result lines to be corrupt on each
call. It looks like in those cases there are a few special chars at the
beginning, probably just a function of the header regex being imprecise as
you mentioned before.

    private List<String> getCurrentLinesFromKafka(String topicName, int
linesToFetch) throws UnsupportedEncodingException {
        int bytesToFetch = linesToFetch*AVG_LINE_SIZE_IN_BYTES;
        SimpleConsumer sConsumer = new SimpleConsumer(BROKER_NAME, 9092,
1000, 1024000);
        long[] currentOffset = sConsumer.getOffsetsBefore(topicName,
PARTITION_ID, -1, 3);

        Long offset = Math.max((currentOffset[0] - bytesToFetch),
(currentOffset[currentOffset.length - 1]));
        FetchRequest fetchRequest = new FetchRequest(topicName, 0, offset,
bytesToFetch);
        ByteBufferMessageSet msgBuffer = sConsumer.fetch(fetchRequest);

        sConsumer.close();

        String decStr = decodeBuffer(msgBuffer.getBuffer(), "UTF-8");

        String header = "\u0000\u0000.?.?.?.?.?.?.?.?";
        String[] strLst = decStr.split(header);

        if (strLst.length > linesToFetch + 2) {
            //take only the last linesToFetch of them, also ignore the
first and last since they may be corrupted
            int end = strLst.length - 1;  //end is excluded in copyOfRange
            int start = end - linesToFetch;
            return Lists.newArrayList(Arrays.copyOfRange(strLst, start,
end));
        } else if (strLst.length > 2) {
            //we can at least return something since we have more than the
corrupt first and last values
            int end = strLst.length - 1;  //end is excluded in copyOfRange
            int start = 1;                //ignore the probably corrupt
first value
            return Lists.newArrayList(Arrays.copyOfRange(strLst, start,
end));
        } else {
            return Lists.newArrayList();
        }
    }

    private String decodeBuffer(ByteBuffer buffer, String encoding) throws
UnsupportedEncodingException {
        Integer size;
        try {
            size = buffer.getInt();
        } catch (Exception e) {
            size = -1;
        }

        if (size < 0) {
            return "No recent messages in topic";
        }

        byte[] bytes = buffer.array();
        return new String(bytes, encoding);
    }


On Fri, Jul 19, 2013 at 1:26 PM, Johan Lundahl <johan.lund...@gmail.com>wrote:

> Here is my current (very hacky) piece of code handling this part:
>
>   def getLastMessages(fetchSize: Int = 10000): List[String] = {
>     val sConsumer = new SimpleConsumer(clusterip, 9092, 1000, 1024000)
>     val currentOffset = sConsumer.getOffsetsBefore(topic, 0, -1, 3)
>
>     val fetchRequest = new FetchRequest(topic, 0, (currentOffset(0) -
> fetchSize).max(currentOffset(currentOffset.length - 1)), fetchSize)
>     val msgBuffer = sConsumer.fetch(fetchRequest)
>
>     sConsumer.close()
>
>     def decodeBuffer(buffer: ByteBuffer, encoding: String, arrSize: Int =
> msgBuffer.sizeInBytes.toInt - 6): String = {
>       val size: Int = Option(try { buffer.getInt } catch { case e:
> Throwable => -1 }).getOrElse(-1)
>       if (size < 0) return s"No recent messages in topic $topic"
>       val bytes = new Array[Byte](arrSize)
>       buffer.get(bytes)
>       new String(bytes, encoding)
>     }
>     val decStr = decodeBuffer(msgBuffer.getBuffer, "UTF-8")
>
>     val header = "\u0000\u0000.?.?.?.?.?.?.?.?"
>     val strLst = decStr.split(header).toList
>
>     if (strLst.size > 1) strLst.tail else strLst
>   }
>
>
> On Fri, Jul 19, 2013 at 10:02 PM, Shane Moriah <shanemor...@gmail.com
> >wrote:
>
> > I have a similar use-case to Johan.  We do stream processing off the
> topics
> > in the backend but I'd like to expose a recent sample of a topic's data
> to
> > a front-end web-app (just in a synchronous,
> click-a-button-and-see-results
> > fashion).  If I can only start from the last file offset 500MB behind
> > current and not (current - n bytes)  then the data might be very stale
> > depending on how fast that topic is being filled. I could iterate from
> the
> > last offset and keep only the final n, but that might mean processing
> 500MB
> > each time just to grab 10 messages.
> >
> > Johan, are you using just the simple FetchRequest?  Did you get around
> the
> > InvalidMessageSizeError when you try to force a fetch offset different
> from
> > those returned by getOffsetsBefore?  Or are you also starting from that
> > last known offset and iterating forwards by the desired amount?
> >
> >
> > On Fri, Jul 19, 2013 at 11:33 AM, Johan Lundahl <johan.lund...@gmail.com
> > >wrote:
> >
> > > I've had a similar use case where we want to browse and display the
> > latest
> > > few messages in different topics in a webapp.
> > >
> > > This kind of works by doing as you describe; submitting a FetchRequest
> > with
> > > an offset of messages_desired * avg_bytes_per_message plus a bit more.
> > > You'll get the ByteBuffer and then you can strip away until you reach a
> > > message. How to find where a message starts is not something that I've
> > > understood completely yet (I've not studied the protocol very
> carefully),
> > > but splitting the buffer by the pattern \u0000\u0000.?.?.?.?.?.?.?.?
> > seems
> > > to work pretty well in our case, at least when there is no batching or
> > > compression involved.
> > >
> > > If someone has hints on a better way to find a message header, I'd also
> > > appreciate this info.
> > >
> > >
> > > On Fri, Jul 19, 2013 at 2:17 PM, David Arthur <mum...@gmail.com>
> wrote:
> > >
> > > > There is not index-based access to messages in 0.7 like there is in
> > 0.8.
> > > > You have to start from a known good offset and iterate through the
> > > messages.
> > > >
> > > > What's your use case? Running a job periodically that reads the
> latest
> > N
> > > > message from the queue? Is it impractical to run from the last known
> > > offset
> > > > and only keep the last N?
> > > >
> > > >
> > > > On 7/19/13 3:45 AM, Shane Moriah wrote:
> > > >
> > > >> We're running Kafka 0.7 and I'm hitting some issues trying to access
> > the
> > > >> newest n messages in a topic (or at least in a broker/partition
> combo)
> > > and
> > > >> wondering if my use case just isn't supported or if I'm missing
> > > something.
> > > >>   What I'd like to be able to do is get the most recent offset from
> a
> > > >> broker/partition combo, subtract an amount of bytes roughly
> equivalent
> > > to
> > > >> messages_desired*bytes_per_**message and then issue a FetchRequest
> > with
> > > >> that
> > > >> offset and amount of bytes.
> > > >>
> > > >> I gathered from this
> > > >> post<http://mail-archives.**apache.org/mod_mbox/kafka-**
> > > >> users/201212.mbox/%3CCCF8F23D.**5e4a%25zhaoyong...@gmail.com%**3E<
> > >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3cccf8f23d.5e4a%25zhaoyong...@gmail.com%3E
> > > >
> > > >> >
> > > >> that
> > > >> I need to use the Simple Consumer in order to do offset manipulation
> > > >> beyond
> > > >> the start from beginning and start from end options.  And I saw from
> > > this
> > > >> post<http://mail-archives.**apache.org/mod_mbox/incubator-**
> > > >> kafka-users/201209.mbox/%**3CCALD69j0iDCZZFF3nm-**
> > > >> wrfvW5Y6wwxRZFOL8A1QQFugQUKdo6**x...@mail.gmail.com%3E<
> > >
> >
> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201209.mbox/%3ccald69j0idczzff3nm-wrfvw5y6wwxrzfol8a1qqfugqukdo...@mail.gmail.com%3E
> > > >
> > > >> >
> > > >> that
> > > >> the offsets returned by getOffsetsBefore are really only the major
> > > >> checkpoints when files are rolled over, every 500MB by default.  I
> > also
> > > >> found that if I take an offset returned from getOffsetsBefore and
> > > subtract
> > > >> a fixed value, say 100KB, and submit that offset with a
> FetchRequest I
> > > get
> > > >> a kafka.common.**InvalidMessageSizeException, presumably since my
> > > >> computed
> > > >> offset didn't align with a real message offset.
> > > >>
> > > >> As far as I can tell, this leaves me only able to find the most
> recent
> > > >> milestone offset, perhaps up to 500MB behind current data, and
> > extract a
> > > >> batch from that point forward. Is there any other way that I'm
> missing
> > > >> here? The two things that seem to be lacking are access to the most
> > > recent
> > > >> offset and the ability to rollback from that offset by a fixed
> amount
> > of
> > > >> bytes or messages without triggering the
> InvalidMessageSizeException.
> > > >>
> > > >> Thanks,
> > > >> Shane
> > > >>
> > > >>
> > > >
> > >
> >
>

Reply via email to