I keep getting the following exception when I am trying to read a Parquet
file from a Path on S3 in Spark/Scala. Note: I am running this on EMR.

java.lang.NullPointerException
        at 
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
        at 
org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)
        at 
org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)
        at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)

Interestingly I can read the path from Spark shell:

scala> val df = spark.read.parquet("s3://my-path/").count
df: Long = 47

I've created the SparkSession as follows:

val sparkConf = new SparkConf().setAppName("My spark app")val spark =
SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.hadoopConfiguration.set("java.library.path",
"/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native")
spark.conf.set("spark.sql.parquet.mergeSchema", "true")
spark.conf.set("spark.speculation", "false")
spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "true")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version",
"2")
spark.sparkContext.hadoopConfiguration.setBoolean("mapreduce.fileoutputcommitter.cleanup.skipped",
true)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key",
System.getenv("AWS_ACCESS_KEY_ID"))
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key",
System.getenv("AWS_SECRET_ACCESS_KEY"))
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint",
"s3.amazonaws.com")

Here's the line where I am getting this exception:

val df1 = spark.read.parquet(pathToRead)

What am I doing wrong? I have tried without setting 'access key' &
'secret key' as well with no luck.

Reply via email to