To test this, I tried a workaround of an implementation of 
AwsCredentialsProvider that also implemented Serializable.  The 
resolveCredentials method of this class would call that static create function 
of DefaultCredentialsProvider and forward the task to that.  There are no 
fields in the class that should actually need to be serialized.

However, this approach is failing in the exact same manner.  I suspect 
something else might be the culprit here.

Oct 5, 2020, 08:56 by aromanenko....@gmail.com:

> Seems like there is an issue with non-serialisable AwsCredentialsProvider as 
> a member of BasicSqsClientProvider (which is Serializable).
>
>
>> On 2 Oct 2020, at 20:11, >> tclem...@tutanota.com>>  wrote:
>>
>> The app itself is developed in Clojure, but here's the gist of how it's 
>> getting configured:
>>
>>     AwsCredentialsProvider credProvider = 
>> EnvrionmentVariableCredentialsProvider.create();
>>     
>>     pipeline.apply(
>>       SqsIO.read()
>>         .withQueueUrl(url)
>>         .withSqsClientProvider(credProvider, region, endpoint));
>>
>> Oct 1, 2020, 08:48 by >> aromanenko....@gmail.com>> :
>>
>>> Could you send a code snippet of your pipeline with SqsIO v2 Read transform 
>>> configuration?
>>>
>>>> On 30 Sep 2020, at 22:56, >>>> tclem...@tutanota.com>>>>  wrote:
>>>>
>>>> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache 
>>>> Spark 2.4.7.  However,
>>>> when switching over to the new API and running it I keep getting the 
>>>> following exceptions:
>>>>
>>>> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting 
>>>> the context, marking it as stopped
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
>>>> in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
>>>> 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException
>>>> at 
>>>> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
>>>> at 
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
>>>> at 
>>>> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
>>>> at 
>>>> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
>>>> at 
>>>> org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
>>>> at 
>>>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
>>>> at 
>>>> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
>>>> at 
>>>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
>>>> at 
>>>> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>>> at 
>>>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>>> at 
>>>> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>>> at 
>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>>>
>>>> Examining the source of SqsUnboundedSource reveals a lambda where it's 
>>>> trying to chain a few references:
>>>> read.sqsClientProvider().getSqsClient()
>>>>
>>>> Which is odd as I explicitly set the client provider on the read 
>>>> transform.  This was working well enough with the old SqsIO API to connect 
>>>> and process messages off the queue.
>>>>
>>>> Any thoughts on why this might be happening?  Or avenues to pursue in 
>>>> debugging this?
>>>>
>>>> Thanks.
>>>>
>>
>>

Reply via email to