Hi Folks,

I am writing a pipeline which reads from Kafka, applying some
transformations, then persist to HDFS.

Obviously such operation is not supported to DStream, since the
*DStream.save*(Path)
*method,
considers the Path as a directory, not a file. Also using
*repartition(1).mode(Savemode.APPEND) *before persisting did not work out.

Any thought how to solve such issue ? Below is a code snippet.


{

val inputStream = kafkaStreamingUtil.streamConsume(streamingContext,
Set(srcTopic), 
consumerGroupId).filter(_.value().matches(youtubeRegex)).map(_.value())

inputStream.foreachRDD(rdd => {

  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._
  val rddAsDataFrame = rdd.toDF()

  rddAsDataFrame.coalesce(1).write.mode(SaveMode.Append).csv(dstPath)
})

Reply via email to