Quick update on this.

We have fixed an issue with AwsCredentialsProvider serialisation [1] for AWS v2 
IOs (well, why it’s not serialisable by default it’s a different question) in 
2.27.0.
Since it’s not yet released, feel free to test it with snapshot artifacts. 

[1] https://issues.apache.org/jira/browse/BEAM-11016

> On 6 Oct 2020, at 23:25, tclem...@tutanota.com wrote:
> 
> Yep, same stacktrace.  NPE originating from line 41 of 
> SqsUnboundedSource.java.
> 
> I'll see about getting a remote debugger attached to the process.
> 
> Oct 6, 2020, 14:06 by aromanenko....@gmail.com:
> Hmm, do you have the same stack trace in this case?
> 
> Can you debug it in runtime and make sure that read.sqsClientProvider() 
> returns null in SqsUnboundedSource(Read)? I’m curious if all calls to 
> SqsUnboundedSource(Read) were done in the same JVM.
> 
>> On 6 Oct 2020, at 22:14, tclem...@tutanota.com 
>> <mailto:tclem...@tutanota.com> wrote:
>> 
>> To test this, I tried a workaround of an implementation of 
>> AwsCredentialsProvider that also implemented Serializable.  The 
>> resolveCredentials method of this class would call that static create 
>> function of DefaultCredentialsProvider and forward the task to that.  There 
>> are no fields in the class that should actually need to be serialized.
>> 
>> However, this approach is failing in the exact same manner.  I suspect 
>> something else might be the culprit here.
>> 
>> Oct 5, 2020, 08:56 by aromanenko....@gmail.com 
>> <mailto:aromanenko....@gmail.com>:
>> Seems like there is an issue with non-serialisable AwsCredentialsProvider as 
>> a member of BasicSqsClientProvider (which is Serializable).
>> 
>>> On 2 Oct 2020, at 20:11, tclem...@tutanota.com 
>>> <mailto:tclem...@tutanota.com> wrote:
>>> 
>>> The app itself is developed in Clojure, but here's the gist of how it's 
>>> getting configured:
>>> 
>>>     AwsCredentialsProvider credProvider = 
>>> EnvrionmentVariableCredentialsProvider.create();
>>>     
>>>     pipeline.apply(
>>>       SqsIO.read()
>>>         .withQueueUrl(url)
>>>         .withSqsClientProvider(credProvider, region, endpoint));
>>> 
>>> Oct 1, 2020, 08:48 by aromanenko....@gmail.com 
>>> <mailto:aromanenko....@gmail.com>:
>>> Could you send a code snippet of your pipeline with SqsIO v2 Read transform 
>>> configuration?
>>> On 30 Sep 2020, at 22:56, tclem...@tutanota.com 
>>> <mailto:tclem...@tutanota.com> wrote:
>>> 
>>> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache 
>>> Spark 2.4.7. However,
>>> when switching over to the new API and running it I keep getting the 
>>> following exceptions:
>>> 
>>> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting 
>>> the context, marking it as stopped
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
>>> in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
>>> 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException
>>> at 
>>> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
>>> at 
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
>>> at 
>>> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
>>> at 
>>> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
>>> at 
>>> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
>>> at 
>>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
>>> at 
>>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
>>> at 
>>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
>>> at 
>>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>> at 
>>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>> at 
>>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>> at 
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>> 
>>> Examining the source of SqsUnboundedSource reveals a lambda where it's 
>>> trying to chain a few references:
>>> read.sqsClientProvider().getSqsClient()
>>> 
>>> Which is odd as I explicitly set the client provider on the read transform. 
>>> This was working well enough with the old SqsIO API to connect and process 
>>> messages off the queue.
>>> 
>>> Any thoughts on why this might be happening? Or avenues to pursue in 
>>> debugging this?
>>> 
>>> Thanks.
>>> 
>> 
>> 
> 
> 

Reply via email to