Hi Reuven, Do you mean we should have coder explicitly defined for all input types and output types in chaining do fns? Do we also need to have comparedTo and equals defined as well?
thanks again! Siyu > On May 6, 2022, at 12:23 PM, Reuven Lax <re...@google.com> wrote: > > > Could be - I would check the implementation of inputAdapator. > >> On Fri, May 6, 2022 at 11:59 AM Yuri Jin <yuri....@unity3d.com> wrote: >> Thanks, I'll check it out. >> >> I split inputAdaptor.adapt() into different DoFn for testing and it threw >> the same exception for the new DoFn. So I guess it's because of >> inputAdaptor.adapt(). >> >>> On Fri, May 6, 2022 at 11:45 AM Reuven Lax <re...@google.com> wrote: >>> I meant to say .equals() not compareTo. >>> >>>> On Fri, May 6, 2022 at 11:44 AM Reuven Lax <re...@google.com> wrote: >>>> Unfortunately I'm not very familiar with Scio. However this could also be >>>> caused by an object that either doesn't properly implement the compareTo >>>> method or the coder doesn't return such an object in structuralValue. >>>> >>>>> On Fri, May 6, 2022 at 11:26 AM Yuri Jin <yuri....@unity3d.com> wrote: >>>>> Reuven, thanks for the reply. >>>>> >>>>> The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the >>>>> "KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of), >>>>> ByteArrayCoder.of)" coder. >>>>> I can't paste the code for DoFn due to company policy, but here's the >>>>> structure: >>>>> >>>>> ////////////////////////////////////// >>>>> Pipeline.scala >>>>> ------------------------------- >>>>> type InputType = KafkaRecord[Array[Byte], Array[Byte]] >>>>> >>>>> ////////////////////////////////////// >>>>> ParsePayloadDoFn.scala >>>>> ------------------------------- >>>>> class ParsePayloadDoFn[InputType]( >>>>> inputAdaptor: RowAdaptor[InputType], >>>>> ... >>>>> deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends >>>>> DoFn[InputType, OutpuType] { >>>>> >>>>> @Setup >>>>> def setup(): Unit = >>>>> inputAdaptor.setup() >>>>> >>>>> @ProcessElement >>>>> def processElement(c: DoFn[InputType, OutputType]#ProcessContext): Unit >>>>> = >>>>> Try { >>>>> val result: Result = inputAdaptor.adapt(c.element()) // parse >>>>> payload >>>>> ... >>>>> val deadLetterMessageFn: KV[KeyType, ValueType] => Unit = >>>>> c.output(deadLetterTag, _) >>>>> val outputPayloadFn: OutputType => Unit = c.output >>>>> >>>>> result.protocol match { >>>>> case error: ErrorType => >>>>> deadLetterMessageFn(KV.of(..., ...)) >>>>> case payload: Payload => >>>>> payload.events.zipWithIndex.foreach { >>>>> case failure: ParsingFailure => >>>>> deadLetterMessageFn(KV.of(..., ...)) >>>>> case (message: Message, index: Int) => >>>>> // extract body from Message >>>>> val body = ... >>>>> // make a GET http call and compose output >>>>> val output = ... >>>>> >>>>> outputPayloadFn( >>>>> OutputType( >>>>> output, >>>>> ... >>>>> payload.header, >>>>> body, >>>>> index >>>>> ) >>>>> ) >>>>> } >>>>> } >>>>> } match { >>>>> case Failure(exception) => >>>>> error( >>>>> s"ParsePayloadDoFn - unhandled exception: >>>>> ${exception.getMessage}\nStack trace: >>>>> ${ExceptionUtils.getStackTrace(exception)}" >>>>> ) >>>>> case Success(_) => () >>>>> } >>>>> } >>>>> ////////////////////////////////////// >>>>> >>>>> For reference, we are using Scio v0.11.5 and Beam v2.36.0. >>>>> >>>>> Thank you, >>>>> Yuri Jin >>>>> >>>>> >>>>>> On Thu, May 5, 2022 at 10:36 PM Reuven Lax <re...@google.com> wrote: >>>>>> What is the type of the input - do you have a custom coder? Are you able >>>>>> to paste the code for your DoFn? >>>>>> >>>>>> In answer to your question - Direct runner tests for this, because it is >>>>>> a testing runner. This error scenario can cause random unexpected >>>>>> behavior in production runners, which is why the testing runner tries to >>>>>> explicitly detect it. >>>>>> >>>>>> Reuven >>>>>> >>>>>>> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <yuri....@unity3d.com> wrote: >>>>>>> Hi Beam users, >>>>>>> >>>>>>> We have a DoFn that reads data from Kafka and parses an array byte >>>>>>> payload. It works fine with dataflow runner, but throws >>>>>>> IllegalMutationException with direct runner. It does not directly >>>>>>> modify the input value. Therefore, I am guessing that the output is >>>>>>> different when there are multiple input values. >>>>>>> >>>>>>> The detailed error is as follows. >>>>>>> Exception in thread "main" >>>>>>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse >>>>>>> Payload mutated value "DoFnOutputA" after it was output (new value was >>>>>>> "DoFnOutputB"). Values must not be mutated in any way after being >>>>>>> output. >>>>>>> at >>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137) >>>>>>> at >>>>>>> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228) >>>>>>> at >>>>>>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160) >>>>>>> at >>>>>>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292) >>>>>>> at >>>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194) >>>>>>> at >>>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131) >>>>>>> at >>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value >>>>>>> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding >>>>>>> was "Base64EncodedA", now "Base64EncodedB". >>>>>>> at >>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158) >>>>>>> at >>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153) >>>>>>> at >>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128) >>>>>>> at >>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127) >>>>>>> ... 10 more >>>>>>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA" is >>>>>>> different from "Base64EncodedB". >>>>>>> >>>>>>> I was wondering if you could give me some advice on the following >>>>>>> questions. >>>>>>> 1. How can we find the problematic part? I did some unit tests, but I >>>>>>> couldn't reproduce them. >>>>>>> 2. Have you experienced the same error and solved it? >>>>>>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to >>>>>>> use the "enforceImmutability=false" option? >>>>>>> >>>>>>> Any comments would be appreciated. >>>>>>> >>>>>>> Thanks, >>>>>>> Yuri Jin >>>>> >>>>> >>>>> -- >>>>> >>>>> Yuri Jin >>>>> Senior Software Developer, Data Platform >>>>> yuri....@unity3d.com >>>>> (+1) 778-858-3585 >>>>> unity.com >> >> >> -- >> >> Yuri Jin >> Senior Software Developer, Data Platform >> yuri....@unity3d.com >> (+1) 778-858-3585 >> unity.com