Hi,
I managed to make mine work using the *foreachBatch function *in
writeStream.
"foreach" performs custom write logic on each row and "foreachBatch"
performs custom write logic on each micro-batch through SendToBigQuery
function here
foreachBatch(SendToBigQuery) expects 2 parameters, first:
Thanks Jungtaek.
I am stuck on how to add rows to BigQuery. Spark API in PySpark does it
fine. However, we are talking about structured streaming with PySpark.
This is my code that reads and display data on the console fine
class MDStreaming:
def __init__(self, spark_session,spark_context):
If your code doesn't require "end to end exactly-once" then you could
leverage foreachBatch which enables you to use batch sink.
If your code requires "end to end exactly-once", then well, that's the
different story. I'm not familiar with BigQuery and even have no idea how
sink is implemented,
With the ols spark streaming (example in Scala), this would have been
easier through RDD. You could read data
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](streamingContext, kafkaParams, topicsValue)
dstream.foreachRDD
{ pricesRDD =>
if
Hi Pankaj,
What version of Spark are you using?
If you are using 2.4+ then there is an inbuilt function "to_json" which
converts the columns of your dataset to JSON format.
https://spark.apache.org/docs/2.4.0/api/sql/#to_json
Akshay Bhardwaj
+91-97111-33849
On Wed, Mar 6, 2019 at 10:29 PM
Hi,
I am using structured streaming for ETL.
val data_stream = spark
.readStream // constantly expanding dataframe
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "sms_history")
.option("startingOffsets", "earliest") // begin from start of