Hi,
version: spark-3.0.0-preview2-bin-hadoop2.7

As you can see from the code :

STEP 1:  I  create a object of type static frame which holds all the
information to the datasource (csv files).

STEP 2: Then I create a variable  called staticSchema  assigning the
information of the schema from the original static data frame.

STEP 3: then I create another variable called val streamingDataFrame of
type spark.readStream.
and Into the .schema function parameters I pass the object staticSchema
which is meant to hold the information to the  csv files including the
.load(path) function etc.

So then when I am creating val StreamingDataFrame and passing it
.schema(staticSchema)
the variable StreamingDataFrame  should have all the information.
I should only have to call .option("maxFilePerTrigger",1) and not .format
("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
Otherwise what is the point of passing .schema(staticSchema) to
StreamingDataFrame.

You can replicate it using the complete code below.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

  def main(args: Array[String]): Unit = {

    // create spark session
    val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail
Data").getOrCreate();
    // set spark runtime  configuration
    spark.conf.set("spark.sql.shuffle.partitions","5")
    
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")

    // create a static frame
  val staticDataFrame = spark.read.format("csv")
    .option ("header","true")
    .option("inferschema","true")
    .load("/data/retail-data/by-day/*.csv")


    staticDataFrame.createOrReplaceTempView("retail_data")
    val staticSchema = staticDataFrame.schema

    staticDataFrame
      .selectExpr(
        "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
      .groupBy(col("CustomerId"),
        window(col("InvoiceDate"),
        "1 day"))
      .sum("total_cost")
      .sort(desc("sum(total_cost)"))
      .show(2)

    val streamingDataFrame = spark.readStream
      .schema(staticSchema)
      .format("csv")
      .option("maxFilesPerTrigger", 1)
      .option("header","true")
      .load("/data/retail-data/by-day/*.csv")

      println(streamingDataFrame.isStreaming)

    // lazy operation so we will need to call a streaming action to
start the action
    val purchaseByCustomerPerHour = streamingDataFrame
    .selectExpr(
      "CustomerId",
      "(UnitPrice * Quantity) as total_cost",
      "InvoiceDate")
    .groupBy(
      col("CustomerId"), window(col("InvoiceDate"), "1 day"))
    .sum("total_cost")

    // stream action to write to console
    purchaseByCustomerPerHour.writeStream
      .format("console")
      .queryName("customer_purchases")
      .outputMode("complete")
      .start()

  } // main

} // object




















val staticSchema = staticDataFrame.schema













Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org
<http://www.backbutton.co.uk>

Reply via email to