Hi Aaron, Recently I have written a Kafka Blur Consumer for indexing real time streams into Blur cluster. Just pushed it to git ( https://github.com/dibbhatt/kafka-blur-consumer)
This utility will help to pull messages from Kafka Cluster and Index into target Apache Blur Cluster. Kafka-Blur consumer will detect the number of Partitions for a Kafka Topic and spawn that many threads to index kafka partitions in parallel into Target Blur Table. Kafka-Blur Consumer uses Blur Thrift Client's enqueueMutate . Kafka-Blur Consumer uses Zookeeper for storing the latest offset of the indexed messages, which will help to recover in case of failure . Let me know your view. Let me know if this is possible to push into Blur Contrib ? Is there any progress on the persistent queue (HDFS backed) which you have started working sometime back ? Regards, Dibyendu On Fri, Mar 14, 2014 at 7:15 AM, Aaron McCurry <[email protected]> wrote: > > > > On Thu, Mar 13, 2014 at 6:54 AM, Dibyendu Bhattacharya < > [email protected]> wrote: > >> Hi Aaron, >> >> Sorry for late reply. Yes, the three node AWS I used for Blur also >> contains HDFS. I have not created dedicated Blur cluster for the test. >> >> I have used 3 node Blur cluster , where 1 node is Controller ( Shared by >> Hadoop NameNode and JT) and 2 Shard Server ( shared by DataNodes and TT). >> And my Blur table has 3 shards. >> >> I did a small modification in my Client code . My Kafka topic has 3 >> partition, in earlier test I was creating single threaded client to read >> form all kafka partition and index into Blur. I have modified the logic and >> now created three threads for 3 Kafka partition and indexing using >> enqueueMutate >> in parallel for all three Kafka partitions . With multi threaded client, >> I am now able to achieve around 1200 records/sec throughput which is much >> better than earlier result. Obviously this number will be better if >> dedicated blur cluster and high compute aws nodes being used. >> > > That's good, also there is an AsyncClient pool in Blur that could provide > some more throughput if needed. > > >> >> For future work on this , do you have any plan to change the in-memory >> queue to HDFS backed queue ? >> > > Yes. I have a very basic start to one in a library on github called mele. > github.com/amccurry/mele > > I am currently working on getting from prototype to prod quality. > > >> Also as you mentioned the shard queues are an in-memory only data >> structure. So data can be lost at this point if a shard fails and because >> they have a finite length they can block under heavy load. Do think this >> might be taken care in 0.2.2 ? >> > > I am planning on releasing 0.2.2 this week, so it will go as is. However > if you need the persistent queue (for no data loss) I would be happy to > work on that for a 0.2.3 release. I'm hoping that now that I have Blur to > a stable state that releases will flow about once a month with a single > focus for each release (and bug fixes as they show up). > > Hope this helps. > > Aaron > > >> >> Regards, >> Dibyendu >> >> >> >> >> On Wed, Mar 12, 2014 at 7:49 AM, Aaron McCurry <[email protected]>wrote: >> >>> >>> >>> >>> On Tue, Mar 11, 2014 at 4:08 AM, Dibyendu Bhattacharya < >>> [email protected]> wrote: >>> >>>> Hi Aaron, >>>> >>>> I am finally able to test the new queue capabilities . The results are >>>> promising compared to traditional thrift client. Here is the details.. >>>> >>> >>> That's great! >>> >>> >>>> >>>> I have done the test on AWS EMR cluster, 3 node m1.large nodes. For all >>>> the test, environment and memory set to default. >>>> >>>> *Test 1* : Tested without the Queue feature ( doing client.mutate) for >>>> 10,000 records took almost 300 Seconds with a rate of *33 records/sec*. >>>> >>>> *Test 2* : Tested with queue feature ( client.enqueueMutate ) for same >>>> 10,000 records in same cluster, and now it took just 23 Seconds ! with a >>>> rate of >>>> *435 records/sec . This is jump of 13 times* >>>> >>>> *Test 3*: Wanted to index around 90,000 documents in same cluster, but >>>> this time using the client.enqueueMutate I got error in the log and all >>>> shards server Hang . Below was the logs shows the error after that >>>> everything was hang and shards become unresponsive. >>>> >>>> org.apache.blur.thrift.BadConnectionException: Could not connect to >>>> controller/shard server. All connections are bad. >>>> at >>>> org.apache.blur.thrift.BlurClientManager.execute(BlurClientManager.java:235) >>>> at >>>> org.apache.blur.thrift.BlurClient$BlurClientInvocationHandler.invoke(BlurClient.java:56) >>>> at com.sun.proxy.$Proxy0.enqueueMutate(Unknown Source) >>>> >>>> >>>> In the log I found this code giving the error.. >>>> >>>> >>>> stackTraceStr:null, errorType:UNKNOWN) >>>> at >>>> org.apache.blur.manager.writer.MutatableAction.merge(MutatableAction.java:460) >>>> at >>>> org.apache.blur.manager.writer.MutatableAction.reduceMutates(MutatableAction.java:439) >>>> at >>>> org.apache.blur.manager.writer.BaseQueueReader$1.run(BaseQueueReader.java:65) >>>> at java.lang.Thread.run(Thread.java:724) >>>> >>>> >>>> Just to test if MutatableAction.reduceMutates is the culprit, I >>>> modified the BaseQueueReader.java and only perform doMutate(mutations); >>>> >>>> I commented out the mutations = >>>> MutatableAction.reduceMutates(mutations); >>>> >>> >>> Hmm, ok. I suppose there is a bug in there. The queue reader code is >>> in need of some better error handling. I am trying to clean it up a bit >>> now. >>> >>> >>>> >>>> With this changes, when I run the test again for 90,000 documents, this >>>> time all documents got indexed properly, and it took around 157 seconds >>>> with indexing rate of *575 records/seconds with 17 times jump. * >>>> >>>> Just to give an idea of index size, 90K documents shows table size of >>>> 270MB in Blur. >>>> >>> >>> Just out of curiosity, if you are using 3 nodes in AWS I assume that you >>> are running HDFS on those 3 nodes? Also how many shards are you testing >>> with? NRT updates require more CPU than MapReduce indexes so the shard >>> count will play a part in the performance. >>> >>> Also there is room for improvement with a smarter client. Currently all >>> the data is being routed through the controller. Which means the data is >>> serialized and deserialized in the controller before it ever reaches the >>> shard servers. >>> >>> Thank you very much for the feedback! I am working to improve the queue >>> code a bit before the release. I believe at this point it's the final >>> thing to work on. >>> >>> Aaron >>> >>> >>>> >>>> Regards, >>>> Dibyendu >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Mar 7, 2014 at 9:51 PM, Dibyendu Bhattacharya < >>>> [email protected]> wrote: >>>> >>>>> Thanks Aaron for detail explanation. Just now browse through the >>>>> changes. I think we do not need the TableQueueReader now. I will try out >>>>> the thrift enqueMethod to see how it performs. We will be using Kafka >>>>> client to populate the queue. Will let you know how that goes. >>>>> >>>>> Regards, >>>>> Dibyendu >>>>> >>>>> >>>>> On Fri, Mar 7, 2014 at 9:12 PM, Aaron McCurry <[email protected]>wrote: >>>>> >>>>>> I have pushed most of the feature needed for the queue and here's a >>>>>> run down on how it works. I have left the original QueueReader in place >>>>>> at >>>>>> the shards but renamed it to ShardQueueReader which requires the data to >>>>>> be >>>>>> partitioned correctly. It also makes use of an in-memory blocking queue >>>>>> that I will be changing to an HDFS backed queue so the memory resources >>>>>> won't be effected under heavy write load. Then I have created another >>>>>> set >>>>>> of thrift calls enqueueMutate, and enqueueMatchBatch that feeds the >>>>>> internal queue. Both of these methods are implemented on the controller >>>>>> and the shard server. There will be a TableQueueReader that can run >>>>>> inside >>>>>> the controller to read from a queue and dealing with the partitioning >>>>>> inside the controller. The class is written and committed but the logic >>>>>> to >>>>>> instantiate and run it has not been written. >>>>>> >>>>>> However using the controller api (standard Thrift Client) to write >>>>>> RowMutations via the enqueueMethod from Storm could be an option right >>>>>> now >>>>>> without needing to implement anything that runs inside of Blur. The only >>>>>> issue now is the blocking natural of the in-memory queue. I will be >>>>>> working to finish this feature before the release, but I believe that it >>>>>> is >>>>>> mostly in a state to evaluate. The only issue that I can see is that >>>>>> writing data in via the enqueueMutate method could have some performance >>>>>> slow downs once it hits the max queue length and once the HDFS back >>>>>> version >>>>>> is in place that slow down will be less apparent. >>>>>> >>>>>> So here's a run down on where the feature lacks: >>>>>> >>>>>> 1. The shard queues are an in-memory only data structure. So data >>>>>> can be lost at this point if a shard fails and because they have a finite >>>>>> length they can block under heavy load. This one is the I see as a must >>>>>> before the release. >>>>>> 2. A way to run the table queue reader in the controllers, but with >>>>>> the rework of the API I'm not sure you all would really need this >>>>>> anymore. >>>>>> >>>>>> Let me know if you all need any help getting started with this >>>>>> updated code. >>>>>> >>>>>> Thanks! >>>>>> >>>>>> Aaron >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Mar 7, 2014 at 10:00 AM, Dibyendu Bhattacharya < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi Aaron, >>>>>>> >>>>>>> Do you still plan to have This Real Time Queue based indexing >>>>>>> feature for Blur 0.2.2 ? I know you are very busy on 2.2. release, just >>>>>>> wanted to know if this will be coming soon. >>>>>>> >>>>>>> Regards, >>>>>>> Dibyendu >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Mar 4, 2014 at 8:50 AM, Jonathan Hodges >>>>>>> <[email protected]>wrote: >>>>>>> >>>>>>>> Nothing to add, I agree the Kafka partitions don't need to match >>>>>>>> the Blur partitions. >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Mar 3, 2014 at 7:17 PM, Dibyendu Bhattacharya < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi Aaron, >>>>>>>>> >>>>>>>>> No, I do not see we need to match Kafka Partitions with Blur >>>>>>>>> Partitions. In fact, the number of partitions in Kafka and Number of >>>>>>>>> shards >>>>>>>>> in Blur may not match also. >>>>>>>>> >>>>>>>>> Jonathan, do you have anything to add here. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Dibyendu >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Mar 3, 2014 at 11:35 PM, Aaron McCurry <[email protected] >>>>>>>>> > wrote: >>>>>>>>> >>>>>>>>>> Ok, I do have a question. Do you see a use for the current use >>>>>>>>>> case where Kafka partitions match Blur partitions and the clients >>>>>>>>>> pushing >>>>>>>>>> messages into Kafka partition the data into the Kakfa partitions to >>>>>>>>>> match >>>>>>>>>> Blur partitions? The reason I ask is i want to know if I should >>>>>>>>>> keep the >>>>>>>>>> current low level API pluggable or not. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Mar 3, 2014 at 10:29 AM, Dibyendu Bhattacharya < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks Aaron, yes I figured it out how Blur uses this API, you >>>>>>>>>>> do not need to take a look at this. >>>>>>>>>>> >>>>>>>>>>> Once you are done with the new design of the queue feature, do >>>>>>>>>>> let me know, I will try to integrate Kafka into it and test it . >>>>>>>>>>> >>>>>>>>>>> Dibyendu >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Mar 3, 2014 at 8:07 PM, Aaron McCurry < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Based on your post on the mail list I assume that you got what >>>>>>>>>>>> you needed working, or at least figured out how Blur was using the >>>>>>>>>>>> API. >>>>>>>>>>>> Let me know if you need me to take a look at this or not. Also >>>>>>>>>>>> I'm >>>>>>>>>>>> planning on spending some time this afternoon working through >>>>>>>>>>>> making this >>>>>>>>>>>> feature easier to use. >>>>>>>>>>>> >>>>>>>>>>>> Aaron >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Feb 28, 2014 at 10:03 AM, Aaron McCurry < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hey Dibyendu, >>>>>>>>>>>>> >>>>>>>>>>>>> It will take me a little while to digest this. I will try to >>>>>>>>>>>>> get back to you later this afternoon. Thanks! >>>>>>>>>>>>> >>>>>>>>>>>>> Aaron >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Feb 28, 2014 at 8:29 AM, Dibyendu Bhattacharya < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Just forwarding you to guide me if the approach is correct >>>>>>>>>>>>>> for Kafka Consumer with multiple kafka partitions indexed into >>>>>>>>>>>>>> multiple >>>>>>>>>>>>>> blur table shards. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Sorry that code is not that clean. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Dibyendu >>>>>>>>>>>>>> >>>>>>>>>>>>>> ---------- Forwarded message ---------- >>>>>>>>>>>>>> From: Dibyendu Bhattacharya <[email protected]> >>>>>>>>>>>>>> Date: Fri, Feb 28, 2014 at 5:32 PM >>>>>>>>>>>>>> Subject: Re: new queue capability >>>>>>>>>>>>>> To: [email protected], Jonathan Hodges < >>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I was just playing with the new QueueReader API, and as Tim >>>>>>>>>>>>>> pointed out , its very low level . I still tried to implement a >>>>>>>>>>>>>> KafkaConsumer . >>>>>>>>>>>>>> >>>>>>>>>>>>>> Here is my use case. Let me know if I have approached >>>>>>>>>>>>>> correctly. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have a given topic in Kafka, which has 3 Partitions. And in >>>>>>>>>>>>>> Blur I have a table with 2 Shards . I need to index all messages >>>>>>>>>>>>>> from Kafka >>>>>>>>>>>>>> Topic to Blur Table. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have used Kafka ConsumerGroupAPI to consume in parallel in >>>>>>>>>>>>>> 2 streams ( from 3 partitions) for indexing into 2 Blur shards. >>>>>>>>>>>>>> As >>>>>>>>>>>>>> ConsumerGroup API allow me to split any Kafka Topic into N >>>>>>>>>>>>>> number of >>>>>>>>>>>>>> streams, I can choose N for my target shard count, here it is 2. >>>>>>>>>>>>>> >>>>>>>>>>>>>> For both shards I created two ShardContext and >>>>>>>>>>>>>> two BlurIndexSimpleWriter. ( Is this okay ?) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Now, I modified the BlurIndexSimpleWriter to get handle to >>>>>>>>>>>>>> the _queueReader object. I used this _queueReader to populate >>>>>>>>>>>>>> the >>>>>>>>>>>>>> respective shards queue taking messages from KafkaStreams. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Here is the TestCase (KafkaReaderTest) , KafkaStreamReader ( >>>>>>>>>>>>>> which reads the Kafka Stream) , and the KafkaQueueReader ( The Q >>>>>>>>>>>>>> interface >>>>>>>>>>>>>> for Blur) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also attached the modified BlurIndexSimpleWriter. Just added >>>>>>>>>>>>>> >>>>>>>>>>>>>> public QueueReader getQueueReader(){ >>>>>>>>>>>>>> >>>>>>>>>>>>>> return _queueReader; >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> With these changes, I am able to read Kafka messages in >>>>>>>>>>>>>> parallel streams and index them into 2 shards. All documents >>>>>>>>>>>>>> from Kafka >>>>>>>>>>>>>> getting indexed properly. But after TestCases run , I can see >>>>>>>>>>>>>> two Index >>>>>>>>>>>>>> Directory for two path I created. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Let me know if this approach is correct ? In this code, I >>>>>>>>>>>>>> have not taken care of Shard Failure logic and as Tim pointed >>>>>>>>>>>>>> out, if that >>>>>>>>>>>>>> can be abstracted form client that will be great. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>> Dibyendu >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Feb 27, 2014 at 9:40 PM, Aaron McCurry < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> What if we provide an implementation of the QueueReader >>>>>>>>>>>>>>> concept that does >>>>>>>>>>>>>>> what you are discussing. That way in more extreme cases >>>>>>>>>>>>>>> when the user is >>>>>>>>>>>>>>> forced into implementing the lower level api (perhaps for >>>>>>>>>>>>>>> performance) they >>>>>>>>>>>>>>> can still do it, but for the normal case the partitioning >>>>>>>>>>>>>>> (and other >>>>>>>>>>>>>>> difficult issues) are handled by the controllers. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I could see adding an enqueueMutate call to the controllers >>>>>>>>>>>>>>> that pushes the >>>>>>>>>>>>>>> mutates to the correct buckets for the user. At the same >>>>>>>>>>>>>>> time we could >>>>>>>>>>>>>>> allow each of the controllers to pull from an external and >>>>>>>>>>>>>>> push the mutates >>>>>>>>>>>>>>> to the correct buckets for the shards. I could see a couple >>>>>>>>>>>>>>> of different >>>>>>>>>>>>>>> ways of handling this. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> However I do agree that right now there is too much burden >>>>>>>>>>>>>>> on the user for >>>>>>>>>>>>>>> the 95% case. We should make this simpler. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Aaron >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Feb 27, 2014 at 10:07 AM, Tim Williams < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > I've been playing around with the new QueueReader stuff >>>>>>>>>>>>>>> and I'm >>>>>>>>>>>>>>> > starting to believe it's at the wrong level of abstraction >>>>>>>>>>>>>>> - in the >>>>>>>>>>>>>>> > shard context - for a user. >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Between having to know about the BlurPartioner and >>>>>>>>>>>>>>> handling all the >>>>>>>>>>>>>>> > failure nuances, I'm thinking a much friendlier approach >>>>>>>>>>>>>>> would be to >>>>>>>>>>>>>>> > have the client implement a single message pump that Blur >>>>>>>>>>>>>>> take's from >>>>>>>>>>>>>>> > and handles. >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Maybe on startup the Controllers compete for the lead >>>>>>>>>>>>>>> QueueReader >>>>>>>>>>>>>>> > position, create it from the TableContext and run with it? >>>>>>>>>>>>>>> The user >>>>>>>>>>>>>>> > would still need to deal with Controller failures but >>>>>>>>>>>>>>> that seems >>>>>>>>>>>>>>> > easier to reason about then shard failures. >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > The way it's crafted right now, the user seems burdened >>>>>>>>>>>>>>> with a lot of >>>>>>>>>>>>>>> > the hard problems that Blur otherwise solves. Obviously, >>>>>>>>>>>>>>> it trades >>>>>>>>>>>>>>> > off a high burden for one of the controllers. >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Thoughts? >>>>>>>>>>>>>>> > --tim >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
