Seems like there is an issue with non-serialisable AwsCredentialsProvider as a 
member of BasicSqsClientProvider (which is Serializable).

> On 2 Oct 2020, at 20:11, tclem...@tutanota.com wrote:
> 
> The app itself is developed in Clojure, but here's the gist of how it's 
> getting configured:
> 
>     AwsCredentialsProvider credProvider = 
> EnvrionmentVariableCredentialsProvider.create();
>     
>     pipeline.apply(
>       SqsIO.read()
>         .withQueueUrl(url)
>         .withSqsClientProvider(credProvider, region, endpoint));
> 
> Oct 1, 2020, 08:48 by aromanenko....@gmail.com:
> Could you send a code snippet of your pipeline with SqsIO v2 Read transform 
> configuration?
> On 30 Sep 2020, at 22:56, tclem...@tutanota.com wrote:
> 
> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache 
> Spark 2.4.7. However,
> when switching over to the new API and running it I keep getting the 
> following exceptions:
> 
> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting the 
> context, marking it as stopped
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 
> (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException
> at 
> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
> at 
> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
> at 
> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
> at 
> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
> at 
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
> at 
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
> at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
> at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
> 
> Examining the source of SqsUnboundedSource reveals a lambda where it's trying 
> to chain a few references:
> read.sqsClientProvider().getSqsClient()
> 
> Which is odd as I explicitly set the client provider on the read transform. 
> This was working well enough with the old SqsIO API to connect and process 
> messages off the queue.
> 
> Any thoughts on why this might be happening? Or avenues to pursue in 
> debugging this?
> 
> Thanks.
> 

Reply via email to