If your code doesn't require "end to end exactly-once" then you could leverage foreachBatch which enables you to use batch sink.
If your code requires "end to end exactly-once", then well, that's the different story. I'm not familiar with BigQuery and even have no idea how sink is implemented, but from quick googling tells me a transaction with multiple DML isn't supported, so end to end exactly-once cannot be implemented in any way. If you ensure the write in the query is idempotent then no matter at all. On Tue, Feb 23, 2021 at 10:35 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > With the ols spark streaming (example in Scala), this would have been > easier through RDD. You could read data > > val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](streamingContext, kafkaParams, topicsValue) > > dstream.foreachRDD > > { pricesRDD => > > if (!pricesRDD.isEmpty) // data exists in RDD > > { > > write to DB > } > > > Now with structured streaming in Python, you read data into a dataframe > with reaSstream and load > > > schema = StructType().add("rowkey", StringType()).add("ticker", > StringType()).add("timeissued", TimestampType()).add("price", FloatType()) > > ds = self.spark \ > > .readStream \ > > .format("kafka") \ > > ....... > > .load() \ > > .select(from_json(col("value").cast("string"), > schema).alias("parsed_value")) > > ds2 = ds \ > > .select( \ > > col("parsed_value.rowkey").alias("rowkey") \ > > , col("parsed_value.ticker").alias("ticker") \ > > , col("parsed_value.timeissued").alias("timeissued") \ > > , col("parsed_value.price").alias("price")). \ > > withColumn("currency", > lit(config['MDVariables']['currency'])). \ > > withColumn("op_type", > lit(config['MDVariables']['op_type'])). \ > > withColumn("op_time", current_timestamp()) > > # write to console > > query = ds2. \ > writeStream. \ > outputMode("append"). \ > format("console"). \ > start() > ds2.printSchema() > > > But writing to BigQuery through BigQuery API does not work > > > s.writeTableToBQ(ds2, "overwrite", > config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) > > > query.awaitTermination() > > > So this is the run result and the error > > > root > > |-- rowkey: string (nullable = true) > > |-- ticker: string (nullable = true) > > |-- timeissued: timestamp (nullable = true) > > |-- price: float (nullable = true) > > |-- currency: string (nullable = false) > > |-- op_type: string (nullable = false) > > |-- op_time: timestamp (nullable = false) > > > *'write' can not be called on streaming Dataset/DataFrame;, quitting* > > I gather need to create RDD from the dataframe or maybe there is another > way to write streaming data to DB directly from the dataframe? > > Thanks > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > >