Thanks a lot Jagadish for bearing with us for this long:
We were able to locate the configuration that ensures there is at max one 
request on the wire per partition:

if(producerProperties.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)
        && 
producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).asInstanceOf[String].toInt
 > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) {
      warn("Setting '%s' to a value other than %d does not guarantee message 
ordering because new messages will be sent without waiting for previous ones to 
be acknowledged." 
          format (ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT))
    } else {
      
producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)
    }


On 10/03/17, 11:19 AM, "Jagadish Venkatraman" <jagadish1...@gmail.com> wrote:

    *"If I make two send() calls (writing to same Kafka topic/partition),
    Samza is going to call those two send via KafkaProucer's send() method and
    wait for last send()'s future to complete. Is it guaranteed that these
    two messages will be delivered to Kafka broker in the order in which the
    send() were called on those? Even if Kafka client is set to retry
    delivering a message K times?"*
    
    Yes, that understanding is right. Just because a send is async does not
    necessarily mean there are multiple produce requests in-flight on the wire.
    We will have only one un-acknowledged produce request on a single
    connection ensuring that we are immune to potential re-orderings (occurring
    due to retries).
    
    
    
    
    
    On Thu, Mar 9, 2017 at 4:49 PM, Gaurav Agarwal <gauravagarw...@gmail.com>
    wrote:
    
    > Hi Jagadish, I must be grossly overlooking something here:
    >
    > Samza already guarantees that for you. Currently, two *send()* calls that
    > produce to the same topic partition are *always* delivered in-order.
    > (regardless of whether you called commit, batched up or otherwise)
    >
    > > If I make two send() calls (writing to same Kafka topic/partition), 
Samza
    > is going to call those two send via KafkaProucer's send() method and wait
    > for last send()'s future to complete. Is it guaranteed that these two
    > messages will be delivered to Kafka broker in the order in which the 
send()
    > were called on those? Even if Kafka client is set to retry delivering a
    > message K times?
    >
    > If this is so, then I have been needlessly worrying about the behavior.
    >
    > On Fri, Mar 10, 2017 at 3:06 AM, Jagadish Venkatraman <
    > jagadish1...@gmail.com> wrote:
    >
    > > >  we use "synchronous" guarantee to ensure that the messages we emit
    > from
    > > samza are delivered to Kafka linearly.
    > >
    > > Samza already guarantees that for you. Currently, two *send()* calls 
that
    > > produce to the same topic partition are *always* delivered in-order.
    > > (regardless of whether you called commit, batched up or otherwise)
    > >
    > > Do you want ordering multiple send calls across partitions? (We have not
    > > had a scenario for that yet).
    > >
    > > Thanks,
    > > Jagadish
    > >
    > >
    > >
    > >
    > >
    > > On Thu, Mar 9, 2017 at 10:19 AM, Gaurav Agarwal <
    > gauravagarw...@gmail.com>
    > > wrote:
    > >
    > > > There is one more case to consider - what if a single process call
    > sends
    > > > multiple messages? In this case would we need to call checkpoint after
    > > > every send() call inside same process() call.. that seems to be
    > > > problematic, as once checkpointed, there is no safety net against any
    > > > failures in subsequent sends() from same process call.
    > > >
    > > > Thanks for being patient with my questions!
    > > >
    > > >
    > > > ________________________________
    > > > From: Gaurav Agarwal <gauravagarw...@gmail.com>
    > > > Sent: Thursday, March 9, 2017 11:27:17 PM
    > > > To: dev@samza.apache.org
    > > > Cc: Mukul Gupta; Kshitij Gupta
    > > > Subject: Re: Samza 0.12.0 + synchronous KafkaProducer ?
    > > >
    > > > Hi Jagadish, please find reply inline:
    > > >
    > > > (it appears that there is no easy way today to guarantee ordered
    > delivery
    > > > of messages to Kafka from Samza without consuming the checkpointing
    > > > flexibility).
    > > >
    > > > On Thu, Mar 9, 2017 at 11:01 PM, Jagadish Venkatraman <
    > > > jagadish1...@gmail.com<mailto:jagadish1...@gmail.com>> wrote:
    > > > Hi Gaurav,
    > > >
    > > > >> process->process->....->doWork()->checkpoint->process..
    > > >
    > > > What does *doWork()* do? Does it actually iterate over accumulated
    > > > in-memory state, and send messages to Kafka?
    > > > >> In process calls, some "work" gets accumulated in memory (think of
    > > them
    > > > as strings representing some work). Frequently, many process() calls
    > > > produce identical work - so accumulating them in a set has effect of
    > > > "deduping" those. In window call, we really process the accumulated
    > work
    > > > and call Samza's checkpoint explicitly after that.
    > > >
    > > > *>>  I found the configuration 'batch.size' which says that ''a batch
    > > size
    > > > of zero will disable batching entirely". We can probably use this
    > > property
    > > > to force Kafka client to send every message inline.*
    > > >
    > > > Yes, but I'm not entirely sure it will help your use-case. Just 
because
    > > > "batch.size" is zero, does not necessarily mean the "send" is
    > > synchronous.
    > > > For instance, You can totally have an async-send with a zero batch
    > size.
    > > > It's still possible (and likely) that the message did not make it to
    > the
    > > > broker when the send returns.
    > > > >> Yes you are correct. We would require stronger sync guarantees if 
we
    > > > need to ensure ordered delivery of message
    > > >
    > > > *>> This exception is only notified back to task on next invocation of
    > > > send(). This is a bit puzzling (a) The exception being thrown is for
    > send
    > > > of a message which did not do anything wrong.*
    > > >
    > > > Thanks for the diligent walkthrough. However, that's perfectly fine as
    > we
    > > > will still preserve Kafka guarantees of atleast once in-order 
delivery.
    > > >
    > > > *>> what if a checkpoint was called after calling send() of the
    > > > message that caused exception? Will we lose processing of that 
message?
    > > > Wouldn't it be too late by the time the exception is thrown back to 
the
    > > > client?*
    > > >
    > > > Great observation! We will never lose processing of that message. The
    > > call
    > > > to commit will wait until pending futures or until there's an
    > exception.
    > > In
    > > > your specific case, the commit call will fail. On a re-start you will
    > > > replay from the previous check-pointed offset. (as of the last
    > successful
    > > > commit)
    > > > >> Thanks for clarifying this.
    > > >
    > > > Users have reported these exact issues when running Samza at scale and
    > > > fixed them. Please refer SAMZA-1069, SAMZA-960.
    > > >
    > > > *>> The point I am driving at is that should the send() method
    > > > in MessageCollector interface provide an optional mechanism to operate
    > it
    > > > in **strictly synchronous mode?*
    > > >
    > > > Even if you did, would n't you still need to tie it with the offset
    > > > commits, changelog commits, store-buffer flushes to disk etc. to
    > achieve
    > > > your guarantees across restarts? task.commit does that.
    > > > >> I think synchronous sends and checkpointing/commits are not 
entirely
    > > > connected - they are two distinct aspects of the system. In the
    > > application
    > > > use case that I've described, we use "synchronous" guarantee to ensure
    > > that
    > > > the messages we emit from samza are delivered to Kafka linearly.
    > Whereas
    > > > the checkpoint concept is being used to "batch" up some amount of work
    > > and
    > > > then increment the offset once that batch is processed (which I 
believe
    > > is
    > > > the original intent of checkpointing).
    > > >
    > > > If we used up "checkpointing" for guaranteeing ordered delivery, we
    > would
    > > > loose the capability of batching.
    > > >
    > > > Do let us know if you have more follow-ups.
    > > >
    > > > Thanks,
    > > > Jagadish
    > > >
    > > > On Wed, Mar 8, 2017 at 10:16 PM, Gaurav Agarwal <
    > > gauravagarw...@gmail.com<
    > > > mailto:gauravagarw...@gmail.com>>
    > > > wrote:
    > > >
    > > > > Hi Jagadish,
    > > > >
    > > > > Thank you for very quick and detailed response.
    > > > >
    > > > > We have already set the task.commit.ms<http://task.commit.ms> = -1
    > and
    > > > are using the
    > > > > checkpointing
    > > > > mechanism to accumulate some work in memory in order to do it more
    > > > > efficiently in batches. So the flow is
    > > > > process->process->....->doWork()->checkpoint->process..
    > > > > Doing checkpointing after every process call will defeat the above
    > > > strategy
    > > > > that we have been following in our application.
    > > > >
    > > > > However, looking through Kafka docs, I found the configuration
    > > > 'batch.size'
    > > > > which says that ''a batch size of zero will disable batching
    > entirely".
    > > > We
    > > > > can probably use this property to force Kafka client to send every
    > > > message
    > > > > inline.
    > > > > Does that sound reasonable to you?
    > > > >
    > > > > There is another related question here (and please excuse me if this
    > is
    > > > > stupid one!) -
    > > > >
    > > > > If there is any exception in Samza's KafkaSystemProducer.send()
    > method,
    > > > > that exception is stored in the SourceData object. This exception is
    > > only
    > > > > notified back to task on next invocation of send().
    > > > > This is a bit puzzling (a) The exception being thrown is for send of
    > a
    > > > > message which did not do anything wrong - the previous guy broke it!
    > > and
    > > > > (b) what if a checkpoint was called after calling send() of the
    > message
    > > > > that caused exception? Will we lose processing of that message?
    > > Wouldn't
    > > > it
    > > > > be too late by the time the exception is thrown back to the client?
    > > > >
    > > > > The point I am driving at is that should the send() method in
    > > > > MessageCollector interface provide an optional mechanism to operate
    > it
    > > in
    > > > > strictly synchronous mode?
    > > > >
    > > > > On Thu, Mar 9, 2017 at 11:25 AM, Jagadish Venkatraman <
    > > > > jagadish1...@gmail.com<mailto:jagadish1...@gmail.com>> wrote:
    > > > >
    > > > > > Gaurav,
    > > > > >
    > > > > > I really appreciate your diligent walkthrough of the code base.
    > > Please
    > > > > find
    > > > > > my replies inline.
    > > > > >
    > > > > > *>> I am trying to figure out, how to make our Samza task
    > processing
    > > > > > strictly ordered *
    > > > > >
    > > > > > By default, Samza offers you guaranteed in-order atleast-once
    > > > processing
    > > > > > out-of the box (same semantics as Kafka). To ensure that each send
    > is
    > > > > > "acknowledged" by the broker, you can choose to invoke Samza's
    > > *commit*
    > > > > at
    > > > > > the end of processing every message.
    > > > > >
    > > > > > *>> We do not want to start processing of next message till it is
    > > > > > guaranteed that our previously emitted messages from samza tasks
    > have
    > > > > been
    > > > > > accepted by Kafka broker. Is there any samza configuration that
    > will
    > > > make
    > > > > > this happen? *
    > > > > >
    > > > > > You can do the following:
    > > > > > A. Set task.commit.ms<http://task.commit.ms> = -1 (This will
    > disable
    > > > auto-commit, and allow you
    > > > > > to
    > > > > > call manual commit).
    > > > > > B. At the end of every *process *or *window* call, you can invoke
    > > > > > *taskCoordinator.commit(RequestScope.CURRENT_TASK);*
    > > > > >
    > > > > >
    > > > > > *>>The `MessageCollector` interface does not expose a 'flush()'
    > > method
    > > > > that
    > > > > > we could have called after doing a send() to ensure the delivery 
of
    > > > > message
    > > > > > to Kafka Broker.*
    > > > > >
    > > > > > This is intentional(to provide an single commit/flush API via the
    > > > > > *taskCoordinator
    > > > > > *abstraction). Invoking *taskCoordinator.commit* will wait on
    > pending
    > > > > > futures, flush buffers, flush state stores and checkpoint offsets.
    > > > > >
    > > > > > Please let us know if we can be of more help!
    > > > > >
    > > > > > Thanks,
    > > > > > Jagadish
    > > > > >
    > > > > >
    > > > > >
    > > > > >
    > > > > >
    > > > > > On Wed, Mar 8, 2017 at 9:12 PM, Gaurav Agarwal <
    > > > gauravagarw...@gmail.com<mailto:gauravagarw...@gmail.com>
    > > > > >
    > > > > > wrote:
    > > > > >
    > > > > > > (correcting recipient address)
    > > > > > >
    > > > > > > On Thu, Mar 9, 2017 at 10:39 AM, Gaurav Agarwal <
    > > > > > gauravagarw...@gmail.com<mailto:gauravagarw...@gmail.com>>
    > > > > > > wrote:
    > > > > > >
    > > > > > > > Hi All,
    > > > > > > >
    > > > > > > > We are trying to upgrade to Kafka 0.12.0. In the process we
    > > noticed
    > > > > > that
    > > > > > > > the Kafka 0.10.0 KafkaProducer client api does not provide any
    > > > > > > > configuration to send() the messages synchronously. One needs
    > to
    > > > wait
    > > > > > on
    > > > > > > > the returned Future for synchronous guarantees.
    > > > > > > >
    > > > > > > > I am trying to figure out, how to make our Samza task
    > processing
    > > > > > strictly
    > > > > > > > ordered - i.e. we want to process an incoming message and
    > > > optionally
    > > > > > > write
    > > > > > > > back some messages to kafka. We do not want to start 
processing
    > > of
    > > > > next
    > > > > > > > message till it is guaranteed that our previously emitted
    > > messages
    > > > > from
    > > > > > > > samza tasks have been accepted by Kafka broker.
    > > > > > > >
    > > > > > > > Is there any samza configuration that will make this happen?
    > The
    > > `
    > > > > > > > MessageCollector` interface does not expose a 'flush()' method
    > > that
    > > > > we
    > > > > > > > could have called after doing a send() to ensure the delivery
    > of
    > > > > > message
    > > > > > > > to Kafka Broker. (note that `TaskInstanceCollector` -  
specific
    > > > > > > > implementation of `MessageCollector` interface does provide 
the
    > > > > > required
    > > > > > > > flush() method)
    > > > > > > >
    > > > > > > > --
    > > > > > > > cheers,
    > > > > > > > gaurav
    > > > > > > >
    > > > > > >
    > > > > >
    > > > > >
    > > > > >
    > > > > > --
    > > > > > Jagadish V,
    > > > > > Graduate Student,
    > > > > > Department of Computer Science,
    > > > > > Stanford University
    > > > > >
    > > > >
    > > >
    > > >
    > > >
    > > > --
    > > > Jagadish V,
    > > > Graduate Student,
    > > > Department of Computer Science,
    > > > Stanford University
    > > >
    > > >
    > >
    > >
    > > --
    > > Jagadish V,
    > > Graduate Student,
    > > Department of Computer Science,
    > > Stanford University
    > >
    >
    
    
    
    -- 
    Jagadish V,
    Graduate Student,
    Department of Computer Science,
    Stanford University
    


Reply via email to