Re: Structured streaming, Writing Kafka topic to BigQuery table, throws error

2021-02-25 Thread Mich Talebzadeh
Hi, I managed to make mine work using the *foreachBatch function *in writeStream. "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function here foreachBatch(SendToBigQuery) expects 2 parameters, first:

Re: Structured streaming, Writing Kafka topic to BigQuery table, throws error

2021-02-24 Thread Mich Talebzadeh
Thanks Jungtaek. I am stuck on how to add rows to BigQuery. Spark API in PySpark does it fine. However, we are talking about structured streaming with PySpark. This is my code that reads and display data on the console fine class MDStreaming: def __init__(self, spark_session,spark_context):

Re: Structured streaming, Writing Kafka topic to BigQuery table, throws error

2021-02-23 Thread Jungtaek Lim
If your code doesn't require "end to end exactly-once" then you could leverage foreachBatch which enables you to use batch sink. If your code requires "end to end exactly-once", then well, that's the different story. I'm not familiar with BigQuery and even have no idea how sink is implemented,

Structured streaming, Writing Kafka topic to BigQuery table, throws error

2021-02-23 Thread Mich Talebzadeh
With the ols spark streaming (example in Scala), this would have been easier through RDD. You could read data val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsValue) dstream.foreachRDD { pricesRDD => if

Re: Structured Streaming to Kafka Topic

2019-03-06 Thread Akshay Bhardwaj
Hi Pankaj, What version of Spark are you using? If you are using 2.4+ then there is an inbuilt function "to_json" which converts the columns of your dataset to JSON format. https://spark.apache.org/docs/2.4.0/api/sql/#to_json Akshay Bhardwaj +91-97111-33849 On Wed, Mar 6, 2019 at 10:29 PM

Structured Streaming to Kafka Topic

2019-03-06 Thread Pankaj Wahane
Hi, I am using structured streaming for ETL. val data_stream = spark .readStream // constantly expanding dataframe .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "sms_history") .option("startingOffsets", "earliest") // begin from start of