Hi Tom,

I'm assuming that the two sub-DAGs you're talking about are the two Map ->
Send To chains acting on the "audit-report-requests" input and sending
their results to the "audit-report-status" output.

Although processing within each Task is in-order, the framework does not
guarantee the order in which the multiple chained operators for an operator
are evaluated. Specifically, in the current implementation
<https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java#L106>,
an Operator's registeredOperators are maintained as a HashSet of
OperatorImpls. This would explain the out-of-order appearance of the two
messages. I'm not sure what's changed in 1.0 that makes this trigger now.

Since both sendTo and sink are terminal operators (void return type), I
don't think you'll be able to easily get around this. Let me discuss this
with the team and get back to you with a workaround / fix.

Thanks,
Prateek


On Tue, Feb 26, 2019 at 7:08 PM Tom Davis <t...@recursivedream.com> wrote:

> Hey folks!
>
> We have noticed some inconsistencies in message ordering when running a
> StreamApplication that calls two separate `map` functions over an input
> and sends results to the same output. I have attached my Execution Plan,
> but the gist is that the first `map` function marks a thing as "pending"
> by sending a message to a status topic and the second `map` function
> does some work then sends its own status with "done".
>
> We have a test set up to read the resulting status topic with a normal
> Kafka consumer to ensure that two status messages were produced by Samza
> and consumed in the proper order (first "pending", then "done", per the
> order of the MessageStream call chains). This test flaps pretty
> routinely since upgrading to Samza 1.0; we never noticed this in the
> past. Sometimes, it times out waiting for any messages, though that's
> considerably less rare than the ordering issue. My understanding is, for
> a given Task, by default, all processing should be done serially. Is
> that no longer true? Is the guarantee *only* for the order in which
> messages are consumed, not produced?
>
> For test simplicity, there's a single Kafka partition for each topic and
> I attempted to create a configuration file that would eliminate as much
> coordination and concurrency sources as I knew how:
>
>   processor.id=0
>
> job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
>   job.container.single.thread.mode=true
>
> (We use the ZkJobCoordinatorFactory normally but both produce the bug)
>
> I realize the KafkaProducer does not *technically* guarantee delivery
> order except when using transactions, which KafkaSystemProducer doesn't
> appear to do by default. I have checked the actual message envelope and
> when the ordering is wrong, the offset order is correct -- so, "done"
> was recorded by Kafka prior to "pending". This seems to rule out Samza
> but I'm not entirely confident in that conclusion. Any thoughts?
>
> Thanks,
>
> Tom
>

Reply via email to