Hi Everyone, As an update, the fan-in Bolt idea appears to work to a certain extent. Initial throughput as measured by tuples acked/minute nearly matches the configuration where I have localOrShuffleGrouping between Bolt A and Bolt B where I have exclusive local messsaging. Moreover, the time spent in com.lmax.disruptor.BlockingWaitStrategy.waitFor goes frorm 99% (no fan-in Bolt) to 60% (fan-in Bolt). This represents a dramatic improvement because, without the fan-in concept, fieldsGrouping throughput in my topology is 1/4 or less of localOrShuffleGrouping and executors (and corresponding workers) fail quickly. So it appears that the fan-in Bolt concept does help.
The problem is that Bolt B acking lags behind fan-in Bolt emitting by about 10-15% and, eventually I get a series of tuple failures, coupled with executor and worker failures. So, short-term, the fan-in Bolt concept works to dramatically improve throughput with fieldsGrouping, but long-term throughput and topology stability still an issue. --John On Wed, Dec 9, 2015 at 10:09 PM, John Yost <[email protected]> wrote: > Hi Taylor, > > Wowsers, thanks for getting back to me so quickly! Basically the flow is > as follows: > > The Kafka topic is composed of 20 partitions, which 20 KafkaSpout > executors read from. The KafkaSpout uses a shuffle grouping to send tuples > to the 1000 Bolt A executors. Bolt A parses the incoming tuples, generates > 20-25 tuples that are emitted and anchored to the tuples incoming from the > KafkaSpout. The anchored tuples are emitted via fieldsGrouping to 1000 Bolt > B executors. > > When I use localOrShuffleGrouping for the 1000 Bolt A executors to 200 > Bolt B executors portion of the DAG, my throughput is 12 million > tuples/minute. When I use fieldsGrouping, throughput drops to 1 million > tuples/minute initially, dropping to < 500K in 30 minutes and then I start > seeing tuple failures. Again, 99% percent of the Bolt A executor thread is > spent in the com.lmax.disruptor.BlockingWaitStrategy.waitFor method. > > Going from 20 KS executors to 1000 Bolt B executors works great. I am > thinking the fan in from 1000 Bolt A executors to 200 Bolt B executors that > uses remote messaging via fieldsGrouping is the problem. I am hoping that > local messaging via localOrShuffleGrouping as follows will help: > > 200 workers > 1000 Bolt A > 200 Fan In Bolt (1 executor per worker to ensure local shuffle) > 50 Bolt B > > --John > > > On Wed, Dec 9, 2015 at 9:43 PM, P. Taylor Goetz <[email protected]> wrote: > >> Hi John, >> >> I think it *may* make sense, but without more details like code/sample >> data, it is hard to say. >> >> Whenever you use a fields grouping, key distribution can come into play >> and affect scaling. >> >> -Taylor >> >> > On Dec 9, 2015, at 9:31 PM, John Yost <[email protected]> wrote: >> > >> > Hi Everyone, >> > >> > I have a large fan in within my topology where I go from 1000 Bolt A >> executors to 50 Bolt B executors via fieldsGrouping. When I profile via >> jvisualvm, it shows that the Bolt A thread spends 99% of it's time in the >> com.lmax.disruptor.BlockingWaitStrategy.waitFor method. >> > >> > The topology details are as follows: >> > >> > 200 workers >> > 20 KafkaSpout executors >> > 1000 Bolt A executors >> > 50 Bolt B executors >> > >> > fieldsGrouping from Bolt A -> Bolt B because I am caching in Bolt B, >> building up large Key/Value pairs for HFile import into HBase. >> > >> > I am thinking if I add an extra bolt between Bolt A and Bolt B where I >> do a localOrShuffleGrouping to go from 1000 -> 200 locally followed by >> fieldsGrouping to go from 200 -> 50 will lessen Network I/O wait time. >> > >> > Please confirm if this makes sense or if there are any other better >> ideas. >> > >> > Thanks >> > >> > --John >> > >
