I have question about how to use textFileStream, I have included a code snippet below. I am trying to read .gz files that are getting put into my bucket. I do not want to specify the schema, I have a similar feature that just does spark.read.json(inputBucket). This works great and if I can get that DStream input into a DataFrame I am golden. Can anyone point me to what I am doing wrong? The files I am reading get continually add to that folder as .gz within them are json objects. If I uncompress the .gz the dStream.print() shows the content but it is not turning it into the DataFrame as expect.
def processStream(dStream: DStream[String], outputBucket: String, spark: SparkSession) { var df:DataFrame = null dStream.foreachRDD { rdd => import spark.implicits._ if (jsonDf != null) { //If I do //println("RDD: " + rdd.toDF().printSchema()) //I get nothing when reading from an .gz, I just get an empty dataFrame //If uncompress that .gz I get some content everything in one column jsonDf = jsonDf.union(rdd.toDF()) } else { jsonDf = rdd.toDF() } } if (jsonDf != null) //Some processing } val spark = SparkSession.builder().appName("Some Name").config("spark.sql.hive.thriftServer.singleSession", "true").getOrCreate() val context = spark.sparkContext val ssc = new StreamingContext(context, Seconds(10)) val dStream = ssc.textFileStream("/Users/userName/Documents/SparkStreamingTestfolder/" + "*/*/*/*") processStream(dStream, "/Users/userName/destination/folder /streamTest", spark) ssc.start() ssc.awaitTermination()