[ 
https://issues.apache.org/jira/browse/SAMZA-254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated SAMZA-254:
----------------------------------

    Attachment: SAMZA-254-2.patch

Attaching updated patch with ASF headers. RB updated as well.

> store.all is slow
> -----------------
>
>                 Key: SAMZA-254
>                 URL: https://issues.apache.org/jira/browse/SAMZA-254
>             Project: Samza
>          Issue Type: Bug
>          Components: kv
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.7.0, 0.8.0
>
>         Attachments: SAMZA-254-0.patch, SAMZA-254-1.patch, SAMZA-254-2.patch
>
>
> A common usage pattern with Samza stores is to periodically clear out the DB. 
> Right now, we call store.all(), iterate over every key, and delete it.
> While using this style of processing, we noticed that the store.all call gets 
> increasingly slow as we delete more keys from it:
> This code runs in 5 seconds:
> {code}
>   @Test
>   def testPerformance {
>     System.out.println("Performance test for store: %s" format typeOfStore)
>     val numLoops = 100
>     val messagesPerBatch = 10000
>     val stuff = b((0 until 200).map(i => "a").mkString)
>     val start = System.currentTimeMillis
>     (0 until numLoops).foreach(i => {
>       System.out.println("(%sms) Total in store: %s" format 
> (System.currentTimeMillis - start, i * messagesPerBatch))
>       (0 until messagesPerBatch).foreach(j => {
>         val k = b((i * j).toString)
>         store.put(k, stuff)
> //        store.delete(k)
>       })
>       
>       val allStart = System.currentTimeMillis
>       val iter = store.all
>       System.out.println("(%s) all() took %sms." format 
> (System.currentTimeMillis - start, System.currentTimeMillis - allStart))
>       iter.close
>     })
>     System.out.println("Total time: %ss" format ((System.currentTimeMillis - 
> start) * .001))
>   }
> {code}
> Prints:
> {noformat}
> Performance test for store: cache
> (2ms) Total in store: 0
> (627) all() took 5ms.
> (628ms) Total in store: 10000
> (767) all() took 0ms.
> (767ms) Total in store: 20000
> (848) all() took 0ms.
> (848ms) Total in store: 30000
> (915) all() took 0ms.
> (916ms) Total in store: 40000
> (982) all() took 0ms.
> (982ms) Total in store: 50000
> (1046) all() took 0ms.
> (1046ms) Total in store: 60000
> (1100) all() took 0ms.
> (1101ms) Total in store: 70000
> ....
> (5360ms) Total in store: 940000
> (5406) all() took 1ms.
> (5406ms) Total in store: 950000
> (5447) all() took 0ms.
> (5447ms) Total in store: 960000
> (5499) all() took 0ms.
> (5499ms) Total in store: 970000
> (5558) all() took 0ms.
> (5558ms) Total in store: 980000
> (5605) all() took 0ms.
> (5605ms) Total in store: 990000
> (5654) all() took 1ms.
> Total time: 5.654s
> {noformat}
> Identical code, but with the store.delete enabled prints:
> {noformat}
> Performance test for store: cache
> (2ms) Total in store: 0
> (602) all() took 6ms.
> (602ms) Total in store: 10000
> (744) all() took 1ms.
> (745ms) Total in store: 20000
> (820) all() took 2ms.
> (820ms) Total in store: 30000
> (894) all() took 2ms.
> (895ms) Total in store: 40000
> (952) all() took 3ms.
> (952ms) Total in store: 50000
> (1006) all() took 4ms.
> (1006ms) Total in store: 60000
> (1061) all() took 5ms.
> (1061ms) Total in store: 70000
> (1116) all() took 5ms.
> (1116ms) Total in store: 80000
> ....
> (9450) all() took 50ms.
> (9451ms) Total in store: 910000
> (9548) all() took 53ms.
> (9549ms) Total in store: 920000
> (9650) all() took 56ms.
> (9650ms) Total in store: 930000
> (9757) all() took 60ms.
> (9757ms) Total in store: 940000
> (9865) all() took 62ms.
> (9866ms) Total in store: 950000
> (9977) all() took 64ms.
> (9978ms) Total in store: 960000
> (10093) all() took 68ms.
> (10093ms) Total in store: 970000
> (10211) all() took 70ms.
> (10211ms) Total in store: 980000
> (10346) all() took 74ms.
> (10346ms) Total in store: 990000
> (10472) all() took 76ms.
> Total time: 10.472s
> {noformat}
> The latency is clearly increasing in all() when deletes are added.
> I dug down a bit, and it's showing that this line in LevelDbKeyValueStore is 
> the culprit:
> {code}
> iter.seekToFirst()
> {code}
> {noformat}
> (10478ms) Total in store: 970000
> db.iterator() took 0ms.
> iter.seekToFirst() took 98ms.
> (10606) all() took 99ms.
> (10606ms) Total in store: 980000
> db.iterator() took 0ms.
> iter.seekToFirst() took 65ms.
> (10699) all() took 65ms.
> (10699ms) Total in store: 990000
> db.iterator() took 0ms.
> iter.seekToFirst() took 66ms.
> (10794) all() took 66ms.
> {noformat}
> More digging shows that this is a common problem with LevelDB:
> http://code.google.com/p/leveldb/issues/detail?id=77
> So, as I see it, we have a few options:
> # Upgrade to RocksDB
> # Find a new way to clear DBs
> # Trigger compactions to clean up the dead keys in the DB
> (1) is definitely the ultimate goal. If (3) is do-able, though I'd like to 
> get it into 0.7.0, since this is a pretty common use case for us. I'm not a 
> fan of (2), because this problem also manifests itself when people don't want 
> to drop the entire table, but delete significant chunks of it periodically 
> (e.g. deleting all expired keys from a join buffer).



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to