Bump. Any more thoughts on this? I went ahead and fixed the Kafka dependency. Didn't help the issue.
On Thu, Nov 20, 2014 at 3:11 PM, Elliott Bradshaw <[email protected]> wrote: > Also, I'm using the following versions: > > Kafka_2.9.2 - 0.8.1.1 (I'm using this as a compile time dependency. > Although I'm just noticing that we're running kafka_2.10 on the server. > Hopefully that isn't the problem......) > Storm - 0.9.2-incubating > Zookeeper - 3.4.5 > Storm-Kafka - 0.9.2-incubating > > On Thu, Nov 20, 2014 at 2:54 PM, Elliott Bradshaw <[email protected]> > wrote: > >> I implemented the Elasticsearch State code myself. As far as I can tell, >> it is logically similar to the other Trident-Elasticsearch packages out >> there. >> >> I'm doubt that the implementation is the source of my problem though. >> Even if I set the updateState function in the BaseStateUpdater to simply >> log the number of tuples it's called with, I still see the same behavior. >> The function is called only early in the topology. Afterwards, I continue >> seeing data flowing out of the spout and through the cluster, but the >> updateState function is not called again. It's pretty odd. >> >> My Elasticsearch cluster is pretty beefy, 20 servers with 8cpu/64 gb of >> ram a piece. Prior to Kafka/Storm, we did some pretty heavy duty >> Elasticsearch loads on a smaller cluster, and were able to achieve load >> rates of 300k rec/second, so I'm fairly familiar with that side of the >> tuning. That said, the files were all loaded directly onto the cluster >> then, and were read in parallel directly from the Elasticsearch disks. >> That will likely be difficult to compete with. >> >> My Kafka/Storm cluster on the other hand is only 5 servers with 4 CPU/8GB >> ram a piece. Right now I'm just doing preliminary testing with a static >> input, roughly 500,000 records are ingested from files on the file system >> into Kafka. Records are pushed at a rate of approximately 40,000 records >> per second (due to bandwidth limitations). >> >> My topology runs with 4 workers, and the initial processing runs with a >> parallelismHint(8). I then do a shuffle followed by a partitionPersist >> with a parallelismHint of 16. (My understanding is that my initial >> parallelism should match the number of Kafka partitions in my topic. I >> would like to hit Elasticsearch with additional parallelism.). I've added >> some logging and here is what I'm seeing on loading 500000 records: >> >> - There are indeed 16 total ElasticsearchStateUpdater classes executing. >> Using counters on these, I've validated that the total number of tuples >> counted on an initial load of 500,000 records is indeed 500,000. So that's >> a good sign in this case. >> - After the initial load, I let the cluster sit still for a while while I >> monitor the logs. Tuples continue to roll into the cluster at an irregular >> rate, even though no more data has been ingested into Kafka. It seems like >> the Trident spout continues to emit data even though it has read and >> "persisted" every tuple in the log. After a few minutes, it looks like my >> cluster has processed over 1 million records when only 500k had been sent >> initially. Total updates remain static at 500k. >> - If I keep the cluster running and kick off another load into Kafka, >> data does begin to flow into the cluster again but the state updates are >> not incremented. No additional data appears to be "persisted". This is in >> line with the behavior I've seen previously, where when doing a much bigger >> load, the StateUpdaters mysteriously stop working. >> - Nodes begin to crash intermittently. I see errors like the >> FileNotFoudnException '/supervisor/stormdist/..../stormconf.ser' does not >> exist. Other nodes report intermittent errors like "Remote address is not >> reachable. We will close this client". The cluster does not appear to >> fully recover. (This does not always happen). >> - Additionally, additional loads show records trickling or flowing in at >> a very inconsistent and halting rate. >> >> - On a final test run with a restarted cluster and another 500,000 record >> load, the updateState stops firing after loading approximately 115880 >> records. >> >> I'm sure that something is either configured wrong or there is an >> incompatibility somewhere, but I'm at a loss for where it might be. >> Hopefully this wasn't TL;DR. Any input would be greatly appreciated! >> >> On Wed, Nov 19, 2014 at 2:54 PM, P. Taylor Goetz <[email protected]> >> wrote: >> >>> What are you using for your partitionPersist to ES? Is it something you >>> implemented yourself, or an open source library? >>> >>> With Kafka —> Storm —> Elastic Search, ES is likely going to be your >>> bottleneck, since indexing is comparatively expensive. So you will likely >>> have to spend a fair amount of effort tuning ES and Storm/Trident. >>> >>> I have accomplished this with solid throughput and reliability, but it >>> took a lot of work to get ES tuned. Chances are your ES cluster will have >>> to be larger than your storm cluster. >>> >>> Any additional information you could add about your environment and use >>> case would help. >>> >>> -Taylor >>> >>> On Nov 19, 2014, at 2:40 PM, Elliott Bradshaw <[email protected]> >>> wrote: >>> >>> I feel like there have to be people out there doing State updates with a >>> Trident-Kafka topology, has anyone successfully accomplished this with >>> solid throughput and reliability? >>> >>> On Tue, Nov 18, 2014 at 2:30 PM, Elliott Bradshaw <[email protected]> >>> wrote: >>> >>>> My apologies if I wasn't clear. >>>> >>>> PartitionPersist is a Trident stream operation that persists a batch of >>>> Trident tuples to a stateful destination, in this case, Elasticsearch. >>>> UpdateState is a function in the BaseStateUpdater class that should be >>>> called when a batch of tuples arrives. >>>> >>>> On Tue, Nov 18, 2014 at 1:26 PM, Itai Frenkel <[email protected]> wrote: >>>> >>>>> Could you please elaborate what is the relation between >>>>> "updateState" and "partitionPersist"? Are those two consecutive topology >>>>> bolts ? >>>>> >>>>> >>>>> ------------------------------ >>>>> *From:* Elliott Bradshaw <[email protected]> >>>>> *Sent:* Tuesday, November 18, 2014 5:25 PM >>>>> *To:* [email protected] >>>>> *Subject:* Fwd: Issues with State updates in >>>>> Kafka-Trident-Elasticsearch topology >>>>> >>>>> >>>>> Hi All, >>>>> >>>>> I'm currently attempting to get a topology running for data into >>>>> Elasticsearch. Tuples go through some minimal marshalling and >>>>> preprocessing before being sent to partitionPersist, where they are >>>>> transformed into JSON and indexed in Elasticsearch. >>>>> >>>>> The cluster appears to work properly in local mode, but when deployed >>>>> to my 4 node cluster, state updates do not seem to fire correctly >>>>> (sometimes they don't fire at all). Tuple counter filters show data >>>>> flowing through the topology at a healthy rate (approx 80,000 rec/second), >>>>> however, the updateState function only rarely appears to be called. After >>>>> a brief period of time, no further calls to updateState are seen. >>>>> >>>>> As a test, I wrote a filter that queues up tuples and batch sends >>>>> them to Elasticsearch once a certain threshold is reached. This works >>>>> perfectly fine and is capable of managing the processing load. >>>>> >>>>> I've seen discussion of this behavior before, but have not managed to >>>>> find an explanation or solution. Has anybody else had similar issues or >>>>> have a solution? >>>>> >>>>> >>>> >>> >>> >> >
