Hi Tom,

It looks like we won't be able to include SAMZA-2116 in the upcoming 1.1
release due to time constraints. It'll have to go in to the 1.2 release,
which will tentatively be in June. Does that still work for you?

Thanks,
Prateek

On Thu, Feb 28, 2019 at 2:16 PM Tom Davis <t...@recursivedream.com> wrote:

> Thanks, Prateek! Yes, the workaround will be fine for the time being.
> Thank you again!
>
> Prateek Maheshwari <prateek...@gmail.com> writes:
>
> > Hi Tom,
> >
> > Thanks for reporting this. I created a ticket (SAMZA-2116
> > <https://issues.apache.org/jira/browse/SAMZA-2116>) to make the required
> > API changes. We'll include this in the next Samza release, which should
> be
> > mid to late next month.
> >
> > In the mean time, the workaround would be to keep all of this
> functionality
> > in a sink function. Does this work for you?
> >
> > Thanks,
> > Prateek
> >
> > On Wed, Feb 27, 2019 at 2:54 PM Tom Davis <t...@recursivedream.com>
> wrote:
> >
> >>
> >> Prateek Maheshwari <prateek...@gmail.com> writes:
> >>
> >> > Hi Tom,
> >> >
> >> > I'm assuming that the two sub-DAGs you're talking about are the two
> Map
> >> ->
> >> > Send To chains acting on the "audit-report-requests" input and sending
> >> > their results to the "audit-report-status" output.
> >> >
> >>
> >> Yes, that's correct.
> >>
> >> >
> >> > Although processing within each Task is in-order, the framework does
> not
> >> > guarantee the order in which the multiple chained operators for an
> >> operator
> >> > are evaluated. Specifically, in the current implementation
> >> > <
> >>
> https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java#L106
> >> >,
> >> > an Operator's registeredOperators are maintained as a HashSet of
> >> > OperatorImpls. This would explain the out-of-order appearance of the
> two
> >> > messages. I'm not sure what's changed in 1.0 that makes this trigger
> now.
> >> >
> >>
> >> Ah! I thought this was the case but I couldn't find the part of the code
> >> to prove it. This makes far more sense than Kafka routinely not
> >> committing messages in order (though it is still technically a
> >> possibility).
> >>
> >> Upon further investigation, I'm not convinced it's a 1.0 issue; I think
> >> we just started using multiple chained operators more heavily.
> >>
> >> >
> >> > Since both sendTo and sink are terminal operators (void return type),
> I
> >> > don't think you'll be able to easily get around this. Let me discuss
> this
> >> > with the team and get back to you with a workaround / fix.
> >> >
> >>
> >> Thanks a lot! <3
> >>
> >> >
> >> > Thanks,
> >> > Prateek
> >> >
> >> >
> >> > On Tue, Feb 26, 2019 at 7:08 PM Tom Davis <t...@recursivedream.com>
> >> wrote:
> >> >
> >> >> Hey folks!
> >> >>
> >> >> We have noticed some inconsistencies in message ordering when
> running a
> >> >> StreamApplication that calls two separate `map` functions over an
> input
> >> >> and sends results to the same output. I have attached my Execution
> Plan,
> >> >> but the gist is that the first `map` function marks a thing as
> "pending"
> >> >> by sending a message to a status topic and the second `map` function
> >> >> does some work then sends its own status with "done".
> >> >>
> >> >> We have a test set up to read the resulting status topic with a
> normal
> >> >> Kafka consumer to ensure that two status messages were produced by
> Samza
> >> >> and consumed in the proper order (first "pending", then "done", per
> the
> >> >> order of the MessageStream call chains). This test flaps pretty
> >> >> routinely since upgrading to Samza 1.0; we never noticed this in the
> >> >> past. Sometimes, it times out waiting for any messages, though that's
> >> >> considerably less rare than the ordering issue. My understanding is,
> for
> >> >> a given Task, by default, all processing should be done serially. Is
> >> >> that no longer true? Is the guarantee *only* for the order in which
> >> >> messages are consumed, not produced?
> >> >>
> >> >> For test simplicity, there's a single Kafka partition for each topic
> and
> >> >> I attempted to create a configuration file that would eliminate as
> much
> >> >> coordination and concurrency sources as I knew how:
> >> >>
> >> >>   processor.id=0
> >> >>
> >> >>
> >>
> job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
> >> >>   job.container.single.thread.mode=true
> >> >>
> >> >> (We use the ZkJobCoordinatorFactory normally but both produce the
> bug)
> >> >>
> >> >> I realize the KafkaProducer does not *technically* guarantee delivery
> >> >> order except when using transactions, which KafkaSystemProducer
> doesn't
> >> >> appear to do by default. I have checked the actual message envelope
> and
> >> >> when the ordering is wrong, the offset order is correct -- so, "done"
> >> >> was recorded by Kafka prior to "pending". This seems to rule out
> Samza
> >> >> but I'm not entirely confident in that conclusion. Any thoughts?
> >> >>
> >> >> Thanks,
> >> >>
> >> >> Tom
> >> >>
> >>
>

Reply via email to