[jira] [Commented] (BEAM-8029) Using BigQueryIO.read with DIRECT_READ causes Illegal Mutation
[ https://issues.apache.org/jira/browse/BEAM-8029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17122702#comment-17122702 ] Beam JIRA Bot commented on BEAM-8029: - This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3. Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean. > Using BigQueryIO.read with DIRECT_READ causes Illegal Mutation > --- > > Key: BEAM-8029 > URL: https://issues.apache.org/jira/browse/BEAM-8029 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Affects Versions: 2.14.0 >Reporter: Chris Larsen >Priority: P2 > Labels: stale-P2 > > > Code to read from BigQuery that is causing the issue: > {code:java} > pipeline > .apply(BigQueryIO > .read(SchemaAndRecord::getRecord) > .from(options.getTableRef()) > .withMethod(Method.DIRECT_READ) > .withCoder(AvroCoder.of(schema))) > {code} > If we remove .withMethod(Method.DIRECT_READ) then there is no issue. > > The error is: > {code:java} > org.apache.beam.sdk.util.IllegalMutationException: PTransform > BigQueryIO.TypedRead/Read(BigQueryStorageTableSource) mutated value > {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f": > 52.0, "sample_time": 1564412307969368, "humidity": 74.3} after it was output > (new value was {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, > "temperature_f": 52.0, "sample_time": 1564412360458615, "humidity": 74.7}). > Values must not be mutated in any way after being output. > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit > (ImmutabilityCheckingBundleFactory.java:134) > at org.apache.beam.runners.direct.EvaluationContext.commitBundles > (EvaluationContext.java:210) > at org.apache.beam.runners.direct.EvaluationContext.handleResult > (EvaluationContext.java:151) > at > org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult > (QuiescenceDriver.java:262) > at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle > (DirectTransformExecutor.java:189) > at org.apache.beam.runners.direct.DirectTransformExecutor.run > (DirectTransformExecutor.java:126) > at java.util.concurrent.Executors$RunnableAdapter.call > (Executors.java:511) > at java.util.concurrent.FutureTask.run (FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker > (ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run > (ThreadPoolExecutor.java:624) > at java.lang.Thread.run (Thread.java:748) > Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value > {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f": > 52.0, "sample_time": 1564412307969368, "humidity": 74.3} mutated illegally, > new value was {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, > "temperature_f": 52.0, "sample_time": 1564412360458615, "humidity": 74.7}. > Encoding was > AiZycGktcnBpMC10aGVybW9zdGF0AgAAADRAAgAAAEpAArDVsP7jtMcFAjMzMzMzk1JA, > now > AiZycGktcnBpMC10aGVybW9zdGF0AgAAADRAAgAAAEpAAu6FuLDktMcFAs3MzMzMrFJA. > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation > (MutationDetectors.java:153) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions > (MutationDetectors.java:148) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified > (MutationDetectors.java:123) > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit > (ImmutabilityCheckingBundleFactory.java:124) > at org.apache.beam.runners.direct.EvaluationContext.commitBundles > (EvaluationContext.java:210) > at org.apache.beam.runners.direct.EvaluationContext.handleResult > (EvaluationContext.java:151) > at > org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult > (QuiescenceDriver.java:262) > at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle > (DirectTransformExecutor.java:189) > at org.apache.beam.runners.direct.DirectTransformExecutor.run > (DirectTransformExecutor.java:126) > at java.util.concurrent.Executors$RunnableAdapter.call > (Executors.java:511) > at java.util.concurrent.FutureTask.run (FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker >
[jira] [Commented] (BEAM-8029) Using BigQueryIO.read with DIRECT_READ causes Illegal Mutation
[ https://issues.apache.org/jira/browse/BEAM-8029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16937108#comment-16937108 ] Jason Bowman commented on BEAM-8029: Serializing the generic record in the serializing the generic record to binary avro, using the ByteArray coder, and deserializing it again in the next pipeline stage prevents the mutation. In the initial report it shows value it's showing fields being overwritten. We see the same, and we see byte array fields getting partially overwritten, for example a Json field will turn into: "\{"a": 1}b": 2} with the previous value being left in the array. This seems to point to a reader corruption/reuse issue. Digging a bit deeper I find: [https://github.com/apache/beam/blob/ac45af909923e6d5e43f83087943ad71513b37e8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java#L251] It seems the GenericRecord is reused explicitly and is a member variable of the stream source, not the row result. The maintainers would seem to assume that you would never use the GenericRecord result. > Using BigQueryIO.read with DIRECT_READ causes Illegal Mutation > --- > > Key: BEAM-8029 > URL: https://issues.apache.org/jira/browse/BEAM-8029 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Affects Versions: 2.14.0 >Reporter: Chris Larsen >Priority: Major > > > Code to read from BigQuery that is causing the issue: > {code:java} > pipeline > .apply(BigQueryIO > .read(SchemaAndRecord::getRecord) > .from(options.getTableRef()) > .withMethod(Method.DIRECT_READ) > .withCoder(AvroCoder.of(schema))) > {code} > If we remove .withMethod(Method.DIRECT_READ) then there is no issue. > > The error is: > {code:java} > org.apache.beam.sdk.util.IllegalMutationException: PTransform > BigQueryIO.TypedRead/Read(BigQueryStorageTableSource) mutated value > {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f": > 52.0, "sample_time": 1564412307969368, "humidity": 74.3} after it was output > (new value was {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, > "temperature_f": 52.0, "sample_time": 1564412360458615, "humidity": 74.7}). > Values must not be mutated in any way after being output. > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit > (ImmutabilityCheckingBundleFactory.java:134) > at org.apache.beam.runners.direct.EvaluationContext.commitBundles > (EvaluationContext.java:210) > at org.apache.beam.runners.direct.EvaluationContext.handleResult > (EvaluationContext.java:151) > at > org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult > (QuiescenceDriver.java:262) > at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle > (DirectTransformExecutor.java:189) > at org.apache.beam.runners.direct.DirectTransformExecutor.run > (DirectTransformExecutor.java:126) > at java.util.concurrent.Executors$RunnableAdapter.call > (Executors.java:511) > at java.util.concurrent.FutureTask.run (FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker > (ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run > (ThreadPoolExecutor.java:624) > at java.lang.Thread.run (Thread.java:748) > Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value > {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f": > 52.0, "sample_time": 1564412307969368, "humidity": 74.3} mutated illegally, > new value was {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, > "temperature_f": 52.0, "sample_time": 1564412360458615, "humidity": 74.7}. > Encoding was > AiZycGktcnBpMC10aGVybW9zdGF0AgAAADRAAgAAAEpAArDVsP7jtMcFAjMzMzMzk1JA, > now > AiZycGktcnBpMC10aGVybW9zdGF0AgAAADRAAgAAAEpAAu6FuLDktMcFAs3MzMzMrFJA. > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation > (MutationDetectors.java:153) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions > (MutationDetectors.java:148) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified > (MutationDetectors.java:123) > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit > (ImmutabilityCheckingBundleFactory.java:124) > at org.apache.beam.runners.direct.EvaluationContext.commitBundles > (EvaluationContext.java:210) > at org.apache.beam.runners.direct.EvaluationContext.handleResult > (EvaluationContext.java:151) > at >
[jira] [Commented] (BEAM-8029) Using BigQueryIO.read with DIRECT_READ causes Illegal Mutation
[ https://issues.apache.org/jira/browse/BEAM-8029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16936139#comment-16936139 ] Jason Bowman commented on BEAM-8029: I'm seeing field mutation/corruption in the generic record results using Method.DIRECT_READ with the DataflowRunner and beam 2.15.0, and this runtime exception from the directrunner so it seems to be legit. > Using BigQueryIO.read with DIRECT_READ causes Illegal Mutation > --- > > Key: BEAM-8029 > URL: https://issues.apache.org/jira/browse/BEAM-8029 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Affects Versions: 2.14.0 >Reporter: Chris Larsen >Priority: Major > > > Code to read from BigQuery that is causing the issue: > {code:java} > pipeline > .apply(BigQueryIO > .read(SchemaAndRecord::getRecord) > .from(options.getTableRef()) > .withMethod(Method.DIRECT_READ) > .withCoder(AvroCoder.of(schema))) > {code} > If we remove .withMethod(Method.DIRECT_READ) then there is no issue. > > The error is: > {code:java} > org.apache.beam.sdk.util.IllegalMutationException: PTransform > BigQueryIO.TypedRead/Read(BigQueryStorageTableSource) mutated value > {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f": > 52.0, "sample_time": 1564412307969368, "humidity": 74.3} after it was output > (new value was {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, > "temperature_f": 52.0, "sample_time": 1564412360458615, "humidity": 74.7}). > Values must not be mutated in any way after being output. > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit > (ImmutabilityCheckingBundleFactory.java:134) > at org.apache.beam.runners.direct.EvaluationContext.commitBundles > (EvaluationContext.java:210) > at org.apache.beam.runners.direct.EvaluationContext.handleResult > (EvaluationContext.java:151) > at > org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult > (QuiescenceDriver.java:262) > at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle > (DirectTransformExecutor.java:189) > at org.apache.beam.runners.direct.DirectTransformExecutor.run > (DirectTransformExecutor.java:126) > at java.util.concurrent.Executors$RunnableAdapter.call > (Executors.java:511) > at java.util.concurrent.FutureTask.run (FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker > (ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run > (ThreadPoolExecutor.java:624) > at java.lang.Thread.run (Thread.java:748) > Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value > {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f": > 52.0, "sample_time": 1564412307969368, "humidity": 74.3} mutated illegally, > new value was {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, > "temperature_f": 52.0, "sample_time": 1564412360458615, "humidity": 74.7}. > Encoding was > AiZycGktcnBpMC10aGVybW9zdGF0AgAAADRAAgAAAEpAArDVsP7jtMcFAjMzMzMzk1JA, > now > AiZycGktcnBpMC10aGVybW9zdGF0AgAAADRAAgAAAEpAAu6FuLDktMcFAs3MzMzMrFJA. > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation > (MutationDetectors.java:153) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions > (MutationDetectors.java:148) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified > (MutationDetectors.java:123) > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit > (ImmutabilityCheckingBundleFactory.java:124) > at org.apache.beam.runners.direct.EvaluationContext.commitBundles > (EvaluationContext.java:210) > at org.apache.beam.runners.direct.EvaluationContext.handleResult > (EvaluationContext.java:151) > at > org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult > (QuiescenceDriver.java:262) > at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle > (DirectTransformExecutor.java:189) > at org.apache.beam.runners.direct.DirectTransformExecutor.run > (DirectTransformExecutor.java:126) > at java.util.concurrent.Executors$RunnableAdapter.call > (Executors.java:511) > at java.util.concurrent.FutureTask.run (FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker > (ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run > (ThreadPoolExecutor.java:624) > at java.lang.Thread.run (Thread.java:748){code} > -- This message was