Also, the PUTs are taking 10x of GETs is what baffles me a little bit. We’re 
running on SSDs and here is our config:

stores.stage- store.write.batch.size=100000
stores.stage- store.object.cache.size=200000
stores.stage- store.rocksdb.num.write.buffers=3
stores.stage- store.rocksdb.compaction.style=level
stores.stage- store.rocksdb.block.size.bytes=16384
stores.stage-auctions-store.container.write.buffer.size.bytes=268435456 #256MB
stores.stage-auctions-store.container.cache.size.bytes=268435456 #256MB

This is for a container that has 1 task per container since our earlier 2 
tasks/container experiment did not look too promising. Each message is ~ 1KB.

We do not have a lot of cache hits (cache hit % ~ 10% or less) and most GETs do 
end up going to the KeyValueStorageEngine but even with that get-ns is ~ 
25,000ns but put-ns can be as high as 200,000ns

I can send other metrics, if they are useful.

On 3/9/17, 11:45 AM, "Ankit Malhotra" <amalho...@appnexus.com> wrote:

    Replies inline
    
    On 3/9/17, 11:24 AM, "Jagadish Venkatraman" <jagadish1...@gmail.com> wrote:
    
        I understand you are receiving messages from *all* partitions (but fewer
        messages from some partitions).
        
        Some questions:
        
        1. Is it possible that you may have saturated the capacity of the entire
        container?
    >> Possible but what metric/metrics would tell me that?
    
        2. What is the time you spend inside *process* and *window* for the
        affected container? (How does it compare with other containers?). The
        metrics of interest are *process_ns* and *window_ns.*
    >> process-ns is ~ 200,000ns window-ns happens once every 60 secs.and is < 
1 ms
    
        3. What is the number of messages per-second you process for the 
affected
        containers ? (How does it compare with other containers?). The metric of
        interest is *process-calls*. You can also look at per-task process calls
        
<https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala>,
        and per-partition messages read
        
<https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala>
    
    >> I was quoting process-calls from the metrics registry. For task 2, its 
lesser than those for task 1 as I stated earlier.
        .
        4. How many partitions do you have on the input topics?
    >> 100 each that we are trying to do in 50 containers (2 tasks each, with a 
pool of size 5)
    
        4.1 Have you tried increasing the # of partitions, and re-deploying with
        more containers? ( I take it that you are running on Yarn).
    >> No, since we don’t have the ability to change input partitions right now.
        
        *>> Another thing to add is our amount of time being blocked is 
generally
        quite high which we believe is mainly because our processing is not fast
        enough? *
        
        5. What metric are you referring to here? Do you mean the time spent in 
the
        *process* and *window* call? If you're not processing "fast enough", it
        will be helpful to instrument where you are spending most of the time.
    >> Referring to block-ns. From my understanding, since the stores are 
synchronized, once the run loop asks the tasks to process, it then waits 
(blocks) until at least one of them is ready to receive more. So multiple 
threads may not really be helping us?
    
        You can rely on Samza's MetricRegistry to configure / report your custom
        metrics.
    >> Most of the time spent is in the get and put from RocksDB. According to 
the samza metrics (KeyValueStorageEngine.store-put-ns), our PUTs are much more 
expensive than GETs. Any idea on how one would go about profiling that? We are 
using Level compaction with 3 buffers each 500M. Average input message is ~ 1KB 
and we commit (thus, flush the store) every 90 seconds. We are also using an 
object cache of 2million objects with a write cache of 1m objects.
     
    Input messages are ~ 5k/sec/partition aggregate for both input streams that 
we are joining.
        
        
        On Thu, Mar 9, 2017 at 5:20 AM, Ankit Malhotra <amalho...@appnexus.com>
        wrote:
        
        > Replies inline.
        >
        > --
        > Ankit
        >
        > > On Mar 9, 2017, at 12:34 AM, Jagadish Venkatraman <
        > jagadish1...@gmail.com> wrote:
        > >
        > > We can certainly help you debug this more. Some questions:
        > >
        > > 1. Are you processing messages (at all) from the "suffering" 
containers?
        > > (You can verify that by observing metrics/ logging etc.)
        > Processing messages for sure. But mainly from one of the 2 partitions 
that
        > the container is reading from.
        >
        > The overall number of process calls for task 1 is much higher than the
        > process calls for task 2 and the diff is approx the lag on the 
container
        > which is all from task 2.
        > >
        > > 2. If you are indeed processing messages, is it possible the 
impacted
        > > containers not able to keep up with the surge in the data? You can 
try
        > > re-partitioning your input topics (and increasing the number of
        > containers)
        > We were trying the async loop by having 2 tasks be on the same 
container
        > and multiple threads processing messages. The process is a simple 
inner
        > join with a get and put into the RocksDB store.
        >
        > We saw that both get and put for the suffering task was higher than 
the
        > task that was chugging log.
        > >
        > > 3. If you are not processing messages, maybe can you provide us 
with your
        > > stack trace? It will be super-helpful to find out if (or where)
        > containers
        > > are stuck.
        > >
        >
        > Another thing to add is our amount of time being blocked is generally
        > quite high which we believe is mainly because our processing is not 
fast
        > enough? To add more color, our rocks store's average get across all 
tasks
        > is around 20,000ns BUT average put is 5X or more. We have the object 
cache
        > enabled.
        >
        > > Thanks,
        > > Jagadish
        > >
        > >
        > > On Wed, Mar 8, 2017 at 1:05 PM, Ankit Malhotra 
<amalho...@appnexus.com>
        > > wrote:
        > >
        > >> Hi,
        > >>
        > >> While joining streams from 2 partitions to join 2 streams, we see 
that
        > >> some containers start suffering in that, lag (messages behind high
        > >> watermark) for one of the tasks starts sky rocketing while the 
other
        > one is
        > >> ~ 0.
        > >>
        > >> We are using default values for buffer sizes, fetch threshold, are
        > using 4
        > >> threads as part of the pool and are using the default
        > >> RoundRobinMessageChooser.
        > >>
        > >> Happy to share more details/config if it can help to debug this 
further.
        > >>
        > >> Thanks
        > >> Ankit
        > >>
        > >>
        > >>
        > >>
        > >>
        > >
        > >
        > > --
        > > Jagadish V,
        > > Graduate Student,
        > > Department of Computer Science,
        > > Stanford University
        >
        
        
        
        -- 
        Jagadish V,
        Graduate Student,
        Department of Computer Science,
        Stanford University
        
    
    

Reply via email to