Hi, version: spark-3.0.0-preview2-bin-hadoop2.7 As you can see from the code :
STEP 1: I create a object of type static frame which holds all the information to the datasource (csv files). STEP 2: Then I create a variable called staticSchema assigning the information of the schema from the original static data frame. STEP 3: then I create another variable called val streamingDataFrame of type spark.readStream. and Into the .schema function parameters I pass the object staticSchema which is meant to hold the information to the csv files including the .load(path) function etc. So then when I am creating val StreamingDataFrame and passing it .schema(staticSchema) the variable StreamingDataFrame should have all the information. I should only have to call .option("maxFilePerTrigger",1) and not .format ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv") Otherwise what is the point of passing .schema(staticSchema) to StreamingDataFrame. You can replicate it using the complete code below. import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{window,column,desc,col} object RetailData { def main(args: Array[String]): Unit = { // create spark session val spark = SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail Data").getOrCreate(); // set spark runtime configuration spark.conf.set("spark.sql.shuffle.partitions","5") spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True") // create a static frame val staticDataFrame = spark.read.format("csv") .option ("header","true") .option("inferschema","true") .load("/data/retail-data/by-day/*.csv") staticDataFrame.createOrReplaceTempView("retail_data") val staticSchema = staticDataFrame.schema staticDataFrame .selectExpr( "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate") .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day")) .sum("total_cost") .sort(desc("sum(total_cost)")) .show(2) val streamingDataFrame = spark.readStream .schema(staticSchema) .format("csv") .option("maxFilesPerTrigger", 1) .option("header","true") .load("/data/retail-data/by-day/*.csv") println(streamingDataFrame.isStreaming) // lazy operation so we will need to call a streaming action to start the action val purchaseByCustomerPerHour = streamingDataFrame .selectExpr( "CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate") .groupBy( col("CustomerId"), window(col("InvoiceDate"), "1 day")) .sum("total_cost") // stream action to write to console purchaseByCustomerPerHour.writeStream .format("console") .queryName("customer_purchases") .outputMode("complete") .start() } // main } // object val staticSchema = staticDataFrame.schema Backbutton.co.uk ¯\_(ツ)_/¯ ♡۶Java♡۶RMI ♡۶ Make Use Method {MUM} makeuse.org <http://www.backbutton.co.uk>