Hi Aaron,

I am finally able to test the new queue capabilities . The results are
promising compared to traditional thrift client. Here is the details..

I have done the test on AWS EMR cluster, 3 node m1.large nodes. For all the
test, environment and memory set to default.

*Test 1* : Tested without the Queue feature ( doing client.mutate) for
10,000 records took almost 300 Seconds with a rate of *33 records/sec*.

*Test 2* : Tested with queue feature ( client.enqueueMutate ) for same
10,000 records in same cluster, and now it took just 23 Seconds ! with a
rate of
 *435 records/sec . This is jump of 13 times*

*Test 3*: Wanted to index around 90,000 documents in same cluster, but this
time using the client.enqueueMutate I got error in the log and all shards
server Hang . Below was the logs shows the error after that everything was
hang and shards become unresponsive.

org.apache.blur.thrift.BadConnectionException: Could not connect to
controller/shard server. All connections are bad.
        at
org.apache.blur.thrift.BlurClientManager.execute(BlurClientManager.java:235)
        at
org.apache.blur.thrift.BlurClient$BlurClientInvocationHandler.invoke(BlurClient.java:56)
        at com.sun.proxy.$Proxy0.enqueueMutate(Unknown Source)


In the log I found this code giving the error..


stackTraceStr:null, errorType:UNKNOWN)
        at
org.apache.blur.manager.writer.MutatableAction.merge(MutatableAction.java:460)
        at
org.apache.blur.manager.writer.MutatableAction.reduceMutates(MutatableAction.java:439)
        at
org.apache.blur.manager.writer.BaseQueueReader$1.run(BaseQueueReader.java:65)
        at java.lang.Thread.run(Thread.java:724)


Just to test if MutatableAction.reduceMutates is the culprit, I modified
the BaseQueueReader.java and only perform doMutate(mutations);

I commented out the  mutations = MutatableAction.reduceMutates(mutations);

With this changes, when I run the test again for 90,000 documents, this
time all documents got indexed properly, and it took around 157 seconds
with indexing rate of *575 records/seconds with 17 times jump. *

Just to give an idea of index size, 90K documents shows table size of 270MB
in Blur.

Regards,
Dibyendu








On Fri, Mar 7, 2014 at 9:51 PM, Dibyendu Bhattacharya <
[email protected]> wrote:

> Thanks Aaron for detail explanation. Just now browse through the changes.
> I think we do not need the TableQueueReader now. I will try out the thrift
> enqueMethod to see how it performs. We will be using Kafka client to
> populate the queue. Will let you know how that goes.
>
> Regards,
> Dibyendu
>
>
> On Fri, Mar 7, 2014 at 9:12 PM, Aaron McCurry <[email protected]> wrote:
>
>> I have pushed most of the feature needed for the queue and here's a run
>> down on how it works.  I have left the original QueueReader in place at the
>> shards but renamed it to ShardQueueReader which requires the data to be
>> partitioned correctly.  It also makes use of an in-memory blocking queue
>> that I will be changing to an HDFS backed queue so the memory resources
>> won't be effected under heavy write load.  Then I have created another set
>> of thrift calls enqueueMutate, and enqueueMatchBatch that feeds the
>> internal queue.  Both of these methods are implemented on the controller
>> and the shard server.  There will be a TableQueueReader that can run inside
>> the controller to read from a queue and dealing with the partitioning
>> inside the controller.  The class is written and committed but the logic to
>> instantiate and run it has not been written.
>>
>> However using the controller api (standard Thrift Client) to write
>> RowMutations via the enqueueMethod from Storm could be an option right now
>> without needing to implement anything that runs inside of Blur.  The only
>> issue now is the blocking natural of the in-memory queue.  I will be
>> working to finish this feature before the release, but I believe that it is
>> mostly in a state to evaluate.  The only issue that I can see is that
>> writing data in via the enqueueMutate method could have some performance
>> slow downs once it hits the max queue length and once the HDFS back version
>> is in place that slow down will be less apparent.
>>
>> So here's a run down on where the feature lacks:
>>
>> 1.  The shard queues are an in-memory only data structure.  So data can
>> be lost at this point if a shard fails and because they have a finite
>> length they can block under heavy load.  This one is the I see as a must
>> before the release.
>> 2.  A way to run the table queue reader in the controllers, but with the
>> rework of the API I'm not sure you all would really need this anymore.
>>
>> Let me know if you all need any help getting started with this updated
>> code.
>>
>> Thanks!
>>
>> Aaron
>>
>>
>>
>>
>>
>> On Fri, Mar 7, 2014 at 10:00 AM, Dibyendu Bhattacharya <
>> [email protected]> wrote:
>>
>>> Hi Aaron,
>>>
>>> Do you still plan to have This Real Time Queue based indexing feature
>>> for Blur 0.2.2 ? I know you are very busy on 2.2. release, just wanted to
>>> know if this will be coming soon.
>>>
>>> Regards,
>>> Dibyendu
>>>
>>>
>>>
>>> On Tue, Mar 4, 2014 at 8:50 AM, Jonathan Hodges <[email protected]>wrote:
>>>
>>>> Nothing to add, I agree the Kafka partitions don't need to match the
>>>> Blur partitions.
>>>>
>>>>
>>>> On Mon, Mar 3, 2014 at 7:17 PM, Dibyendu Bhattacharya <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi Aaron,
>>>>>
>>>>> No, I do not see we need to match Kafka Partitions with Blur
>>>>> Partitions. In fact, the number of partitions in Kafka and Number of 
>>>>> shards
>>>>> in Blur may not match also.
>>>>>
>>>>> Jonathan, do you have anything to add here.
>>>>>
>>>>> Regards,
>>>>> Dibyendu
>>>>>
>>>>>
>>>>> On Mon, Mar 3, 2014 at 11:35 PM, Aaron McCurry <[email protected]>wrote:
>>>>>
>>>>>> Ok, I do have a question.  Do you see a use for the current use case
>>>>>> where Kafka partitions match Blur partitions and the clients pushing
>>>>>> messages into Kafka partition the data into the Kakfa partitions to match
>>>>>> Blur partitions?  The reason I ask is i want to know if I should keep the
>>>>>> current low level API pluggable or not.
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 3, 2014 at 10:29 AM, Dibyendu Bhattacharya <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Thanks Aaron, yes I figured it out how Blur uses this API, you do
>>>>>>> not need to take a look at this.
>>>>>>>
>>>>>>> Once you are done with the new design of the queue feature, do let
>>>>>>> me know, I will try to integrate Kafka into it and test it .
>>>>>>>
>>>>>>> Dibyendu
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Mar 3, 2014 at 8:07 PM, Aaron McCurry <[email protected]>wrote:
>>>>>>>
>>>>>>>> Based on your post on the mail list I assume that you got what you
>>>>>>>> needed working, or at least figured out how Blur was using the API.  
>>>>>>>> Let me
>>>>>>>> know if you need me to take a look at this or not.  Also I'm planning 
>>>>>>>> on
>>>>>>>> spending some time this afternoon working through making this feature
>>>>>>>> easier to use.
>>>>>>>>
>>>>>>>> Aaron
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Feb 28, 2014 at 10:03 AM, Aaron McCurry <[email protected]
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hey Dibyendu,
>>>>>>>>>
>>>>>>>>> It will take me a little while to digest this.  I will try to get
>>>>>>>>> back to you later this afternoon.  Thanks!
>>>>>>>>>
>>>>>>>>> Aaron
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Feb 28, 2014 at 8:29 AM, Dibyendu Bhattacharya <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Just forwarding you to guide me if the approach is correct for
>>>>>>>>>> Kafka Consumer with multiple kafka partitions indexed into multiple 
>>>>>>>>>> blur
>>>>>>>>>> table shards.
>>>>>>>>>>
>>>>>>>>>> Sorry that code is not that clean.
>>>>>>>>>>
>>>>>>>>>> Dibyendu
>>>>>>>>>>
>>>>>>>>>> ---------- Forwarded message ----------
>>>>>>>>>> From: Dibyendu Bhattacharya <[email protected]>
>>>>>>>>>> Date: Fri, Feb 28, 2014 at 5:32 PM
>>>>>>>>>> Subject: Re: new queue capability
>>>>>>>>>> To: [email protected], Jonathan Hodges <
>>>>>>>>>> [email protected]>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I was just playing with the new QueueReader API, and as Tim
>>>>>>>>>> pointed out , its very low level . I still tried to implement a
>>>>>>>>>> KafkaConsumer .
>>>>>>>>>>
>>>>>>>>>> Here is my use case. Let me know if I have approached correctly.
>>>>>>>>>>
>>>>>>>>>> I have a given topic in Kafka, which has 3 Partitions. And in
>>>>>>>>>> Blur I have a table with 2 Shards . I need to index all messages 
>>>>>>>>>> from Kafka
>>>>>>>>>> Topic to Blur Table.
>>>>>>>>>>
>>>>>>>>>>  I have used Kafka ConsumerGroupAPI to consume in parallel in 2
>>>>>>>>>> streams ( from 3 partitions) for indexing into 2 Blur shards. As
>>>>>>>>>> ConsumerGroup API allow me to split any Kafka Topic into N number of
>>>>>>>>>> streams, I can choose N for my target shard count, here it is 2.
>>>>>>>>>>
>>>>>>>>>> For both shards I created two ShardContext and
>>>>>>>>>> two BlurIndexSimpleWriter. ( Is this okay ?)
>>>>>>>>>>
>>>>>>>>>> Now, I modified the BlurIndexSimpleWriter  to get handle to
>>>>>>>>>> the _queueReader object.  I used this _queueReader  to populate the
>>>>>>>>>> respective shards queue taking messages from KafkaStreams.
>>>>>>>>>>
>>>>>>>>>> Here is the TestCase (KafkaReaderTest) , KafkaStreamReader (
>>>>>>>>>> which reads the Kafka Stream) , and the KafkaQueueReader ( The Q 
>>>>>>>>>> interface
>>>>>>>>>> for Blur)
>>>>>>>>>>
>>>>>>>>>> Also attached the modified BlurIndexSimpleWriter. Just added
>>>>>>>>>>
>>>>>>>>>>   public QueueReader getQueueReader(){
>>>>>>>>>>
>>>>>>>>>>   return _queueReader;
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> With these changes, I am able to read Kafka messages in parallel
>>>>>>>>>> streams and index them into 2 shards. All documents from Kafka 
>>>>>>>>>> getting
>>>>>>>>>> indexed properly. But after TestCases run , I can see two Index 
>>>>>>>>>> Directory
>>>>>>>>>> for two path I created.
>>>>>>>>>>
>>>>>>>>>> Let me know if this approach is correct ? In this code, I have
>>>>>>>>>> not taken care of Shard Failure logic and as Tim pointed out, if 
>>>>>>>>>> that can
>>>>>>>>>> be abstracted form client that will be great.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Dibyendu
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Feb 27, 2014 at 9:40 PM, Aaron McCurry <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> What if we provide an implementation of the QueueReader concept
>>>>>>>>>>> that does
>>>>>>>>>>> what you are discussing.  That way in more extreme cases when
>>>>>>>>>>> the user is
>>>>>>>>>>> forced into implementing the lower level api (perhaps for
>>>>>>>>>>> performance) they
>>>>>>>>>>> can still do it, but for the normal case the partitioning (and
>>>>>>>>>>> other
>>>>>>>>>>> difficult issues) are handled by the controllers.
>>>>>>>>>>>
>>>>>>>>>>> I could see adding an enqueueMutate call to the controllers that
>>>>>>>>>>> pushes the
>>>>>>>>>>> mutates to the correct buckets for the user.  At the same time
>>>>>>>>>>> we could
>>>>>>>>>>> allow each of the controllers to pull from an external and push
>>>>>>>>>>> the mutates
>>>>>>>>>>> to the correct buckets for the shards.  I could see a couple of
>>>>>>>>>>> different
>>>>>>>>>>> ways of handling this.
>>>>>>>>>>>
>>>>>>>>>>> However I do agree that right now there is too much burden on
>>>>>>>>>>> the user for
>>>>>>>>>>> the 95% case.  We should make this simpler.
>>>>>>>>>>>
>>>>>>>>>>> Aaron
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Feb 27, 2014 at 10:07 AM, Tim Williams <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>> > I've been playing around with the new QueueReader stuff and I'm
>>>>>>>>>>> > starting to believe it's at the wrong level of abstraction -
>>>>>>>>>>> in the
>>>>>>>>>>> > shard context - for a user.
>>>>>>>>>>> >
>>>>>>>>>>> > Between having to know about the BlurPartioner and handling
>>>>>>>>>>> all the
>>>>>>>>>>> > failure nuances, I'm thinking a much friendlier approach would
>>>>>>>>>>> be to
>>>>>>>>>>> > have the client implement a single message pump that Blur
>>>>>>>>>>> take's from
>>>>>>>>>>> > and handles.
>>>>>>>>>>> >
>>>>>>>>>>> > Maybe on startup the Controllers compete for the lead
>>>>>>>>>>> QueueReader
>>>>>>>>>>> > position, create it from the TableContext and run with it?
>>>>>>>>>>>  The user
>>>>>>>>>>> > would still need to deal with  Controller failures but that
>>>>>>>>>>> seems
>>>>>>>>>>> > easier to reason about then shard failures.
>>>>>>>>>>> >
>>>>>>>>>>> > The way it's crafted right now, the user seems burdened with a
>>>>>>>>>>> lot of
>>>>>>>>>>> > the hard problems that Blur otherwise solves.  Obviously, it
>>>>>>>>>>> trades
>>>>>>>>>>> > off a high burden for one of the controllers.
>>>>>>>>>>> >
>>>>>>>>>>> > Thoughts?
>>>>>>>>>>> > --tim
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to