Hi Jagdish,

I failed to mention an important detail, which is that we had change-logging on 
for the store. What is interesting is that most of our time is spent in the 
“send” method. I see from the code that we only send  changelogs when we 
“putAllDirtyEntries()” from the object cache.

Is there a way to make that async or buffer sends to kafka?

Also, big thank you for answering questions!

Ankit
On 3/9/17, 3:12 PM, "Jagadish Venkatraman" <jagadish1...@gmail.com> wrote:

    1. What's the on-disk size of the store? (In one of earlier experiments, if
    the state size is larger than 10G per partition, we 've observed writes
    slow down).
    
    2. Can you benchmark how long writing to RocksDb takes on your SSD? You can
    look at
    
https://github.com/apache/samza/blob/master/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
    .
    
    3. The increase in blockNs is not surprising. If the tasks are taking long
    themselves, it is expected that the runLoop blocks.
    
    4. You don't have to configure the thread-pool size larger than the # of
    tasks per container. So, I believe you can nuke the thread-pool size config.
    
    Thanks,
    Jagadish
    
    On Thu, Mar 9, 2017 at 10:38 AM, Ankit Malhotra <amalho...@appnexus.com>
    wrote:
    
    > Also, the PUTs are taking 10x of GETs is what baffles me a little bit.
    > We’re running on SSDs and here is our config:
    >
    > stores.stage- store.write.batch.size=100000
    > stores.stage- store.object.cache.size=200000
    > stores.stage- store.rocksdb.num.write.buffers=3
    > stores.stage- store.rocksdb.compaction.style=level
    > stores.stage- store.rocksdb.block.size.bytes=16384
    > stores.stage-auctions-store.container.write.buffer.size.bytes=268435456
    > #256MB
    > stores.stage-auctions-store.container.cache.size.bytes=268435456 #256MB
    >
    > This is for a container that has 1 task per container since our earlier 2
    > tasks/container experiment did not look too promising. Each message is ~
    > 1KB.
    >
    > We do not have a lot of cache hits (cache hit % ~ 10% or less) and most
    > GETs do end up going to the KeyValueStorageEngine but even with that 
get-ns
    > is ~ 25,000ns but put-ns can be as high as 200,000ns
    >
    > I can send other metrics, if they are useful.
    >
    > On 3/9/17, 11:45 AM, "Ankit Malhotra" <amalho...@appnexus.com> wrote:
    >
    >     Replies inline
    >
    >     On 3/9/17, 11:24 AM, "Jagadish Venkatraman" <jagadish1...@gmail.com>
    > wrote:
    >
    >         I understand you are receiving messages from *all* partitions (but
    > fewer
    >         messages from some partitions).
    >
    >         Some questions:
    >
    >         1. Is it possible that you may have saturated the capacity of the
    > entire
    >         container?
    >     >> Possible but what metric/metrics would tell me that?
    >
    >         2. What is the time you spend inside *process* and *window* for 
the
    >         affected container? (How does it compare with other containers?).
    > The
    >         metrics of interest are *process_ns* and *window_ns.*
    >     >> process-ns is ~ 200,000ns window-ns happens once every 60 secs.and
    > is < 1 ms
    >
    >         3. What is the number of messages per-second you process for the
    > affected
    >         containers ? (How does it compare with other containers?). The
    > metric of
    >         interest is *process-calls*. You can also look at per-task process
    > calls
    >         <https://github.com/apache/samza/blob/master/samza-core/
    > src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala>,
    >         and per-partition messages read
    >         <https://github.com/apache/samza/blob/master/samza-kafka/
    > src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.
    > scala>
    >
    >     >> I was quoting process-calls from the metrics registry. For task 2,
    > its lesser than those for task 1 as I stated earlier.
    >         .
    >         4. How many partitions do you have on the input topics?
    >     >> 100 each that we are trying to do in 50 containers (2 tasks each,
    > with a pool of size 5)
    >
    >         4.1 Have you tried increasing the # of partitions, and
    > re-deploying with
    >         more containers? ( I take it that you are running on Yarn).
    >     >> No, since we don’t have the ability to change input partitions
    > right now.
    >
    >         *>> Another thing to add is our amount of time being blocked is
    > generally
    >         quite high which we believe is mainly because our processing is
    > not fast
    >         enough? *
    >
    >         5. What metric are you referring to here? Do you mean the time
    > spent in the
    >         *process* and *window* call? If you're not processing "fast
    > enough", it
    >         will be helpful to instrument where you are spending most of the
    > time.
    >     >> Referring to block-ns. From my understanding, since the stores are
    > synchronized, once the run loop asks the tasks to process, it then waits
    > (blocks) until at least one of them is ready to receive more. So multiple
    > threads may not really be helping us?
    >
    >         You can rely on Samza's MetricRegistry to configure / report your
    > custom
    >         metrics.
    >     >> Most of the time spent is in the get and put from RocksDB.
    > According to the samza metrics (KeyValueStorageEngine.store-put-ns), our
    > PUTs are much more expensive than GETs. Any idea on how one would go about
    > profiling that? We are using Level compaction with 3 buffers each 500M.
    > Average input message is ~ 1KB and we commit (thus, flush the store) every
    > 90 seconds. We are also using an object cache of 2million objects with a
    > write cache of 1m objects.
    >
    >     Input messages are ~ 5k/sec/partition aggregate for both input streams
    > that we are joining.
    >
    >
    >         On Thu, Mar 9, 2017 at 5:20 AM, Ankit Malhotra <
    > amalho...@appnexus.com>
    >         wrote:
    >
    >         > Replies inline.
    >         >
    >         > --
    >         > Ankit
    >         >
    >         > > On Mar 9, 2017, at 12:34 AM, Jagadish Venkatraman <
    >         > jagadish1...@gmail.com> wrote:
    >         > >
    >         > > We can certainly help you debug this more. Some questions:
    >         > >
    >         > > 1. Are you processing messages (at all) from the "suffering"
    > containers?
    >         > > (You can verify that by observing metrics/ logging etc.)
    >         > Processing messages for sure. But mainly from one of the 2
    > partitions that
    >         > the container is reading from.
    >         >
    >         > The overall number of process calls for task 1 is much higher
    > than the
    >         > process calls for task 2 and the diff is approx the lag on the
    > container
    >         > which is all from task 2.
    >         > >
    >         > > 2. If you are indeed processing messages, is it possible the
    > impacted
    >         > > containers not able to keep up with the surge in the data? You
    > can try
    >         > > re-partitioning your input topics (and increasing the number 
of
    >         > containers)
    >         > We were trying the async loop by having 2 tasks be on the same
    > container
    >         > and multiple threads processing messages. The process is a
    > simple inner
    >         > join with a get and put into the RocksDB store.
    >         >
    >         > We saw that both get and put for the suffering task was higher
    > than the
    >         > task that was chugging log.
    >         > >
    >         > > 3. If you are not processing messages, maybe can you provide
    > us with your
    >         > > stack trace? It will be super-helpful to find out if (or 
where)
    >         > containers
    >         > > are stuck.
    >         > >
    >         >
    >         > Another thing to add is our amount of time being blocked is
    > generally
    >         > quite high which we believe is mainly because our processing is
    > not fast
    >         > enough? To add more color, our rocks store's average get across
    > all tasks
    >         > is around 20,000ns BUT average put is 5X or more. We have the
    > object cache
    >         > enabled.
    >         >
    >         > > Thanks,
    >         > > Jagadish
    >         > >
    >         > >
    >         > > On Wed, Mar 8, 2017 at 1:05 PM, Ankit Malhotra <
    > amalho...@appnexus.com>
    >         > > wrote:
    >         > >
    >         > >> Hi,
    >         > >>
    >         > >> While joining streams from 2 partitions to join 2 streams, we
    > see that
    >         > >> some containers start suffering in that, lag (messages behind
    > high
    >         > >> watermark) for one of the tasks starts sky rocketing while
    > the other
    >         > one is
    >         > >> ~ 0.
    >         > >>
    >         > >> We are using default values for buffer sizes, fetch
    > threshold, are
    >         > using 4
    >         > >> threads as part of the pool and are using the default
    >         > >> RoundRobinMessageChooser.
    >         > >>
    >         > >> Happy to share more details/config if it can help to debug
    > this further.
    >         > >>
    >         > >> Thanks
    >         > >> Ankit
    >         > >>
    >         > >>
    >         > >>
    >         > >>
    >         > >>
    >         > >
    >         > >
    >         > > --
    >         > > Jagadish V,
    >         > > Graduate Student,
    >         > > Department of Computer Science,
    >         > > Stanford University
    >         >
    >
    >
    >
    >         --
    >         Jagadish V,
    >         Graduate Student,
    >         Department of Computer Science,
    >         Stanford University
    >
    >
    >
    >
    >
    
    
    -- 
    Jagadish V,
    Graduate Student,
    Department of Computer Science,
    Stanford University
    

Reply via email to