Hi Folks, I am writing a pipeline which reads from Kafka, applying some transformations, then persist to HDFS.
Obviously such operation is not supported to DStream, since the *DStream.save*(Path) *method, considers the Path as a directory, not a file. Also using *repartition(1).mode(Savemode.APPEND) *before persisting did not work out. Any thought how to solve such issue ? Below is a code snippet. { val inputStream = kafkaStreamingUtil.streamConsume(streamingContext, Set(srcTopic), consumerGroupId).filter(_.value().matches(youtubeRegex)).map(_.value()) inputStream.foreachRDD(rdd => { val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ val rddAsDataFrame = rdd.toDF() rddAsDataFrame.coalesce(1).write.mode(SaveMode.Append).csv(dstPath) })