Hi,
version: spark-3.0.0-preview2-bin-hadoop2.7
As you can see from the code :
STEP 1: I create a object of type static frame which holds all the
information to the datasource (csv files).
STEP 2: Then I create a variable called staticSchema assigning the
information of the schema from the original static data frame.
STEP 3: then I create another variable called val streamingDataFrame of
type spark.readStream.
and Into the .schema function parameters I pass the object staticSchema
which is meant to hold the information to the csv files including the
.load(path) function etc.
So then when I am creating val StreamingDataFrame and passing it
.schema(staticSchema)
the variable StreamingDataFrame should have all the information.
I should only have to call .option("maxFilePerTrigger",1) and not .format
("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
Otherwise what is the point of passing .schema(staticSchema) to
StreamingDataFrame.
You can replicate it using the complete code below.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}
object RetailData {
def main(args: Array[String]): Unit = {
// create spark session
val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail
Data").getOrCreate();
// set spark runtime configuration
spark.conf.set("spark.sql.shuffle.partitions","5")
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")
// create a static frame
val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")
staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema
staticDataFrame
.selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
.groupBy(col("CustomerId"),
window(col("InvoiceDate"),
"1 day"))
.sum("total_cost")
.sort(desc("sum(total_cost)"))
.show(2)
val streamingDataFrame = spark.readStream
.schema(staticSchema)
.format("csv")
.option("maxFilesPerTrigger", 1)
.option("header","true")
.load("/data/retail-data/by-day/*.csv")
println(streamingDataFrame.isStreaming)
// lazy operation so we will need to call a streaming action to
start the action
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")
// stream action to write to console
purchaseByCustomerPerHour.writeStream
.format("console")
.queryName("customer_purchases")
.outputMode("complete")
.start()
} // main
} // object
val staticSchema = staticDataFrame.schema
Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org
<http://www.backbutton.co.uk>