Just FYI Tom, this fix is available in the Samza 1.2 release: https://samza.apache.org/releases/1.2.0
Thanks, Prateek On Tue, Mar 12, 2019 at 9:11 AM Tom Davis <t...@recursivedream.com> wrote: > > Bummer! Yes, that works. The phase from "pending" to "running" is a > nice-to-have at the moment (operations don't take long enough yet to > warrant the extra state) so we just removed it for the time being. > > Prateek Maheshwari <prateek...@gmail.com> writes: > > > Hi Tom, > > > > It looks like we won't be able to include SAMZA-2116 in the upcoming 1.1 > > release due to time constraints. It'll have to go in to the 1.2 release, > > which will tentatively be in June. Does that still work for you? > > > > Thanks, > > Prateek > > > > On Thu, Feb 28, 2019 at 2:16 PM Tom Davis <t...@recursivedream.com> wrote: > > > >> Thanks, Prateek! Yes, the workaround will be fine for the time being. > >> Thank you again! > >> > >> Prateek Maheshwari <prateek...@gmail.com> writes: > >> > >> > Hi Tom, > >> > > >> > Thanks for reporting this. I created a ticket (SAMZA-2116 > >> > <https://issues.apache.org/jira/browse/SAMZA-2116>) to make the required > >> > API changes. We'll include this in the next Samza release, which should > >> be > >> > mid to late next month. > >> > > >> > In the mean time, the workaround would be to keep all of this > >> functionality > >> > in a sink function. Does this work for you? > >> > > >> > Thanks, > >> > Prateek > >> > > >> > On Wed, Feb 27, 2019 at 2:54 PM Tom Davis <t...@recursivedream.com> > >> wrote: > >> > > >> >> > >> >> Prateek Maheshwari <prateek...@gmail.com> writes: > >> >> > >> >> > Hi Tom, > >> >> > > >> >> > I'm assuming that the two sub-DAGs you're talking about are the two > >> Map > >> >> -> > >> >> > Send To chains acting on the "audit-report-requests" input and sending > >> >> > their results to the "audit-report-status" output. > >> >> > > >> >> > >> >> Yes, that's correct. > >> >> > >> >> > > >> >> > Although processing within each Task is in-order, the framework does > >> not > >> >> > guarantee the order in which the multiple chained operators for an > >> >> operator > >> >> > are evaluated. Specifically, in the current implementation > >> >> > < > >> >> > >> https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java#L106 > >> >> >, > >> >> > an Operator's registeredOperators are maintained as a HashSet of > >> >> > OperatorImpls. This would explain the out-of-order appearance of the > >> two > >> >> > messages. I'm not sure what's changed in 1.0 that makes this trigger > >> now. > >> >> > > >> >> > >> >> Ah! I thought this was the case but I couldn't find the part of the code > >> >> to prove it. This makes far more sense than Kafka routinely not > >> >> committing messages in order (though it is still technically a > >> >> possibility). > >> >> > >> >> Upon further investigation, I'm not convinced it's a 1.0 issue; I think > >> >> we just started using multiple chained operators more heavily. > >> >> > >> >> > > >> >> > Since both sendTo and sink are terminal operators (void return type), > >> I > >> >> > don't think you'll be able to easily get around this. Let me discuss > >> this > >> >> > with the team and get back to you with a workaround / fix. > >> >> > > >> >> > >> >> Thanks a lot! <3 > >> >> > >> >> > > >> >> > Thanks, > >> >> > Prateek > >> >> > > >> >> > > >> >> > On Tue, Feb 26, 2019 at 7:08 PM Tom Davis <t...@recursivedream.com> > >> >> wrote: > >> >> > > >> >> >> Hey folks! > >> >> >> > >> >> >> We have noticed some inconsistencies in message ordering when > >> running a > >> >> >> StreamApplication that calls two separate `map` functions over an > >> input > >> >> >> and sends results to the same output. I have attached my Execution > >> Plan, > >> >> >> but the gist is that the first `map` function marks a thing as > >> "pending" > >> >> >> by sending a message to a status topic and the second `map` function > >> >> >> does some work then sends its own status with "done". > >> >> >> > >> >> >> We have a test set up to read the resulting status topic with a > >> normal > >> >> >> Kafka consumer to ensure that two status messages were produced by > >> Samza > >> >> >> and consumed in the proper order (first "pending", then "done", per > >> the > >> >> >> order of the MessageStream call chains). This test flaps pretty > >> >> >> routinely since upgrading to Samza 1.0; we never noticed this in the > >> >> >> past. Sometimes, it times out waiting for any messages, though that's > >> >> >> considerably less rare than the ordering issue. My understanding is, > >> for > >> >> >> a given Task, by default, all processing should be done serially. Is > >> >> >> that no longer true? Is the guarantee *only* for the order in which > >> >> >> messages are consumed, not produced? > >> >> >> > >> >> >> For test simplicity, there's a single Kafka partition for each topic > >> and > >> >> >> I attempted to create a configuration file that would eliminate as > >> much > >> >> >> coordination and concurrency sources as I knew how: > >> >> >> > >> >> >> processor.id=0 > >> >> >> > >> >> >> > >> >> > >> job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory > >> >> >> job.container.single.thread.mode=true > >> >> >> > >> >> >> (We use the ZkJobCoordinatorFactory normally but both produce the > >> bug) > >> >> >> > >> >> >> I realize the KafkaProducer does not *technically* guarantee delivery > >> >> >> order except when using transactions, which KafkaSystemProducer > >> doesn't > >> >> >> appear to do by default. I have checked the actual message envelope > >> and > >> >> >> when the ordering is wrong, the offset order is correct -- so, "done" > >> >> >> was recorded by Kafka prior to "pending". This seems to rule out > >> Samza > >> >> >> but I'm not entirely confident in that conclusion. Any thoughts? > >> >> >> > >> >> >> Thanks, > >> >> >> > >> >> >> Tom > >> >> >> > >> >> > >>