Hello-

I am attempting to use SparkR to read in a parquet file from S3.
The exact same operation succeeds using PySpark - but I get a 503 error using 
SparkR.

In fact, I get the 503 even if I use a bad endpoint or bad credentials. It's as 
if Spark isn't even trying to make the HTTP request. It's the same machine and 
same cluster on which PySpark works flawlessly, though.
I get this warning after about 2 minutes - WARN streaming.FileStreamSink: Error 
while looking for metadata directory.
And the 503 appears after another minute or so.
There are no executors being started, so no logs there.
I am trying to connect to a custom endpoint, if that's relevant.

Any ideas what could be going wrong?
Below is the code and full output.

Thanks in advance.
Faysal

CODE-
library(SparkR)

spark_config_default <- list(spark.dynamicAllocation.enabled='true',
                             spark.shuffle.service.enabled='true',
                             spark.sql.parquet.binaryAsString='true',
                             spark.dynamicAllocation.initialExecutors='3',
                             spark.dynamicAllocation.maxExecutors='30',
                             spark.driver.memory='2g',
                             spark.executor.memory='4g',
                             spark.executor.cores='3')


jars_list = c("aws-java-sdk-1.7.4.2.jar",
              "hadoop-aws-2.6.0.jar",
              "aws-s3-1.7.1.jar",
              "hadoop-common-2.6.0.jar"
                )

sc <- sparkR.session(master = "yarn", deployMode = "client", sparkConfig = 
spark_config_default, sparkJars = jars_list)

hConf = SparkR:::callJMethod(sc, "conf")
SparkR:::callJMethod(hConf, "set", "fs.s3a.access.key", [access_key])
SparkR:::callJMethod(hConf, "set", "fs.s3a.secret.key", [secret_key])
SparkR:::callJMethod(hConf, "set", "fs.s3a.endpoint", [custom_endpoint)
SparkR:::callJMethod(hConf, "set", "com.amazonaws.services.s3a.enableV4", 
"true")
SparkR:::callJMethod(hConf, "set", "fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem")

sdf <- read.parquet("s3a://bucket/file.parquet")

sparkR.stop()

OUTPUT-
20/10/05 17:54:17 WARN streaming.FileStreamSink: Error while looking for 
metadata directory.
20/10/05 17:55:58 ERROR r.RBackendHandler: parquet on 10 failed
java.lang.reflect.InvocationTargetException
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:498)
     at 
org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:167)
     at 
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:108)
     at 
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:40)
     at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
     at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
     at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
     at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
     at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
     at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
     at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
     at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
     at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
     at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
     at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
     at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
     at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
     at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
     at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
     at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
     at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
     at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
     at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
     at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
     at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
     at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
     at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
     at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
     at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
     at 
io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
     at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
     at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
     at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 503, 
AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: null, AWS Error 
Message: Service Unavailable, S3 Extended Request ID: null
     at 
com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
     at 
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
     at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3556)
     at 
com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1036)
     at 
com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:999)
     at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:154)
     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
     at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
     at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
     at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
     at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
     at scala.collection.immutable.List.foreach(List.scala:392)
     at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
     at scala.collection.immutable.List.flatMap(List.scala:355)
     at 
org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
     at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
     at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
     at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:645)
     at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:644)
     ... 37 more
Error: Error in parquet : com.amazonaws.services.s3.model.AmazonS3Exception: 
Status Code: 503, AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: 
null, AWS Error Message: Service Unavailable, S3 Extended Request ID: null
     at 
com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
     at 
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
     at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3556)
     at 
com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1036)
     at 
com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:999)
     at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:154)
     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(File

Reply via email to