Quick update on this. We have fixed an issue with AwsCredentialsProvider serialisation [1] for AWS v2 IOs (well, why it’s not serialisable by default it’s a different question) in 2.27.0. Since it’s not yet released, feel free to test it with snapshot artifacts.
[1] https://issues.apache.org/jira/browse/BEAM-11016 > On 6 Oct 2020, at 23:25, tclem...@tutanota.com wrote: > > Yep, same stacktrace. NPE originating from line 41 of > SqsUnboundedSource.java. > > I'll see about getting a remote debugger attached to the process. > > Oct 6, 2020, 14:06 by aromanenko....@gmail.com: > Hmm, do you have the same stack trace in this case? > > Can you debug it in runtime and make sure that read.sqsClientProvider() > returns null in SqsUnboundedSource(Read)? I’m curious if all calls to > SqsUnboundedSource(Read) were done in the same JVM. > >> On 6 Oct 2020, at 22:14, tclem...@tutanota.com >> <mailto:tclem...@tutanota.com> wrote: >> >> To test this, I tried a workaround of an implementation of >> AwsCredentialsProvider that also implemented Serializable. The >> resolveCredentials method of this class would call that static create >> function of DefaultCredentialsProvider and forward the task to that. There >> are no fields in the class that should actually need to be serialized. >> >> However, this approach is failing in the exact same manner. I suspect >> something else might be the culprit here. >> >> Oct 5, 2020, 08:56 by aromanenko....@gmail.com >> <mailto:aromanenko....@gmail.com>: >> Seems like there is an issue with non-serialisable AwsCredentialsProvider as >> a member of BasicSqsClientProvider (which is Serializable). >> >>> On 2 Oct 2020, at 20:11, tclem...@tutanota.com >>> <mailto:tclem...@tutanota.com> wrote: >>> >>> The app itself is developed in Clojure, but here's the gist of how it's >>> getting configured: >>> >>> AwsCredentialsProvider credProvider = >>> EnvrionmentVariableCredentialsProvider.create(); >>> >>> pipeline.apply( >>> SqsIO.read() >>> .withQueueUrl(url) >>> .withSqsClientProvider(credProvider, region, endpoint)); >>> >>> Oct 1, 2020, 08:48 by aromanenko....@gmail.com >>> <mailto:aromanenko....@gmail.com>: >>> Could you send a code snippet of your pipeline with SqsIO v2 Read transform >>> configuration? >>> On 30 Sep 2020, at 22:56, tclem...@tutanota.com >>> <mailto:tclem...@tutanota.com> wrote: >>> >>> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache >>> Spark 2.4.7. However, >>> when switching over to the new API and running it I keep getting the >>> following exceptions: >>> >>> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting >>> the context, marking it as stopped >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 >>> in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage >>> 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException >>> at >>> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41) >>> at >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129) >>> at >>> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74) >>> at >>> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165) >>> at >>> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108) >>> at >>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246) >>> at >>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224) >>> at >>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168) >>> at >>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107) >>> at >>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181) >>> at >>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) >>> at >>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57) >>> >>> Examining the source of SqsUnboundedSource reveals a lambda where it's >>> trying to chain a few references: >>> read.sqsClientProvider().getSqsClient() >>> >>> Which is odd as I explicitly set the client provider on the read transform. >>> This was working well enough with the old SqsIO API to connect and process >>> messages off the queue. >>> >>> Any thoughts on why this might be happening? Or avenues to pursue in >>> debugging this? >>> >>> Thanks. >>> >> >> > >