[ https://issues.apache.org/jira/browse/SAMZA-254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Riccomini updated SAMZA-254: ---------------------------------- Attachment: SAMZA-254-2.patch Attaching updated patch with ASF headers. RB updated as well. > store.all is slow > ----------------- > > Key: SAMZA-254 > URL: https://issues.apache.org/jira/browse/SAMZA-254 > Project: Samza > Issue Type: Bug > Components: kv > Affects Versions: 0.6.0 > Reporter: Chris Riccomini > Assignee: Chris Riccomini > Fix For: 0.7.0, 0.8.0 > > Attachments: SAMZA-254-0.patch, SAMZA-254-1.patch, SAMZA-254-2.patch > > > A common usage pattern with Samza stores is to periodically clear out the DB. > Right now, we call store.all(), iterate over every key, and delete it. > While using this style of processing, we noticed that the store.all call gets > increasingly slow as we delete more keys from it: > This code runs in 5 seconds: > {code} > @Test > def testPerformance { > System.out.println("Performance test for store: %s" format typeOfStore) > val numLoops = 100 > val messagesPerBatch = 10000 > val stuff = b((0 until 200).map(i => "a").mkString) > val start = System.currentTimeMillis > (0 until numLoops).foreach(i => { > System.out.println("(%sms) Total in store: %s" format > (System.currentTimeMillis - start, i * messagesPerBatch)) > (0 until messagesPerBatch).foreach(j => { > val k = b((i * j).toString) > store.put(k, stuff) > // store.delete(k) > }) > > val allStart = System.currentTimeMillis > val iter = store.all > System.out.println("(%s) all() took %sms." format > (System.currentTimeMillis - start, System.currentTimeMillis - allStart)) > iter.close > }) > System.out.println("Total time: %ss" format ((System.currentTimeMillis - > start) * .001)) > } > {code} > Prints: > {noformat} > Performance test for store: cache > (2ms) Total in store: 0 > (627) all() took 5ms. > (628ms) Total in store: 10000 > (767) all() took 0ms. > (767ms) Total in store: 20000 > (848) all() took 0ms. > (848ms) Total in store: 30000 > (915) all() took 0ms. > (916ms) Total in store: 40000 > (982) all() took 0ms. > (982ms) Total in store: 50000 > (1046) all() took 0ms. > (1046ms) Total in store: 60000 > (1100) all() took 0ms. > (1101ms) Total in store: 70000 > .... > (5360ms) Total in store: 940000 > (5406) all() took 1ms. > (5406ms) Total in store: 950000 > (5447) all() took 0ms. > (5447ms) Total in store: 960000 > (5499) all() took 0ms. > (5499ms) Total in store: 970000 > (5558) all() took 0ms. > (5558ms) Total in store: 980000 > (5605) all() took 0ms. > (5605ms) Total in store: 990000 > (5654) all() took 1ms. > Total time: 5.654s > {noformat} > Identical code, but with the store.delete enabled prints: > {noformat} > Performance test for store: cache > (2ms) Total in store: 0 > (602) all() took 6ms. > (602ms) Total in store: 10000 > (744) all() took 1ms. > (745ms) Total in store: 20000 > (820) all() took 2ms. > (820ms) Total in store: 30000 > (894) all() took 2ms. > (895ms) Total in store: 40000 > (952) all() took 3ms. > (952ms) Total in store: 50000 > (1006) all() took 4ms. > (1006ms) Total in store: 60000 > (1061) all() took 5ms. > (1061ms) Total in store: 70000 > (1116) all() took 5ms. > (1116ms) Total in store: 80000 > .... > (9450) all() took 50ms. > (9451ms) Total in store: 910000 > (9548) all() took 53ms. > (9549ms) Total in store: 920000 > (9650) all() took 56ms. > (9650ms) Total in store: 930000 > (9757) all() took 60ms. > (9757ms) Total in store: 940000 > (9865) all() took 62ms. > (9866ms) Total in store: 950000 > (9977) all() took 64ms. > (9978ms) Total in store: 960000 > (10093) all() took 68ms. > (10093ms) Total in store: 970000 > (10211) all() took 70ms. > (10211ms) Total in store: 980000 > (10346) all() took 74ms. > (10346ms) Total in store: 990000 > (10472) all() took 76ms. > Total time: 10.472s > {noformat} > The latency is clearly increasing in all() when deletes are added. > I dug down a bit, and it's showing that this line in LevelDbKeyValueStore is > the culprit: > {code} > iter.seekToFirst() > {code} > {noformat} > (10478ms) Total in store: 970000 > db.iterator() took 0ms. > iter.seekToFirst() took 98ms. > (10606) all() took 99ms. > (10606ms) Total in store: 980000 > db.iterator() took 0ms. > iter.seekToFirst() took 65ms. > (10699) all() took 65ms. > (10699ms) Total in store: 990000 > db.iterator() took 0ms. > iter.seekToFirst() took 66ms. > (10794) all() took 66ms. > {noformat} > More digging shows that this is a common problem with LevelDB: > http://code.google.com/p/leveldb/issues/detail?id=77 > So, as I see it, we have a few options: > # Upgrade to RocksDB > # Find a new way to clear DBs > # Trigger compactions to clean up the dead keys in the DB > (1) is definitely the ultimate goal. If (3) is do-able, though I'd like to > get it into 0.7.0, since this is a pretty common use case for us. I'm not a > fan of (2), because this problem also manifests itself when people don't want > to drop the entire table, but delete significant chunks of it periodically > (e.g. deleting all expired keys from a join buffer). -- This message was sent by Atlassian JIRA (v6.2#6252)