Hello Deng, Thank you for your email. Issue was with Spark - Hadoop / HDFS configuration settings.
Thanks On Mon, Jun 10, 2019 at 5:28 AM Deng Ching-Mallete <och...@apache.org> wrote: > Hi Chetan, > > Best to check if the user account that you're using to run the job has > permission to write to the path in HDFS. I would suggest to write the > parquet files to a different path, perhaps to a project space or user home, > rather than at the root directory. > > HTH, > Deng > > On Sat, Jun 8, 2019 at 8:00 AM Chetan Khatri <chetan.opensou...@gmail.com> > wrote: > >> Hello Dear Spark Users, >> >> I am trying to write data from Kafka Topic to Parquet HDFS with >> Structured Streaming but Getting failures. Please do help. >> >> val spark: SparkSession = >> SparkSession.builder().appName("DemoSparkKafka").getOrCreate() >> import spark.implicits._ >> val dataFromTopicDF = spark >> .readStream >> .format("kafka") >> .option("kafka.bootstrap.servers", "localhost:9092") >> .option("subscribe", "test") >> .option("startingOffsets", "earliest") >> .load() >> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") >> >> logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.") >> val topicQuery = dataFromTopicDF.writeStream >> .format("console") >> .option("truncate", false) >> .option("checkpointLocation", "/tmp/checkpoint") >> .trigger(Trigger.ProcessingTime(10.seconds)) >> .start() >> >> topicQuery.awaitTermination() >> topicQuery.stop() >> >> >> Above code is working well but when I am trying to write to Parquet at HDFS >> getting exceptions. >> >> >> logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS") >> >> val parquetQuery = dataFromTopicDF.writeStream >> .format("parquet") >> .option("startingOffsets", "earliest") >> .option("checkpointLocation", "/tmp/checkpoint") >> .option("path", "/sample-topic") >> .start() >> >> parquetQuery.awaitTermination() >> parquetQuery.stop() >> >> >> *Exception Details:* >> >> >> Exception in thread "main" java.io.IOException: mkdir of >> /sample-topic/_spark_metadata failed >> at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067) >> at >> org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176) >> at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197) >> at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730) >> at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726) >> at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) >> at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733) >> at >> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378) >> at >> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:66) >> at >> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:46) >> at >> org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:85) >> at >> org.apache.spark.sql.execution.streaming.FileStreamSink.<init>(FileStreamSink.scala:98) >> at >> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317) >> at >> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293) >> at >> com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35) >> at >> com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7) >> at scala.Function0$class.apply$mcV$sp(Function0.scala:34) >> at >> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) >> at scala.App$$anonfun$main$1.apply(App.scala:76) >> at scala.App$$anonfun$main$1.apply(App.scala:76) >> at scala.collection.immutable.List.foreach(List.scala:381) >> at >> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) >> at scala.App$class.main(App.scala:76) >> at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7) >> at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) >> at >> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894) >> at >> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198) >> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228) >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> >> >> Thanks >> >> >