If your code doesn't require "end to end exactly-once" then you could
leverage foreachBatch which enables you to use batch sink.

If your code requires "end to end exactly-once", then well, that's the
different story. I'm not familiar with BigQuery and even have no idea how
sink is implemented, but from quick googling tells me a transaction with
multiple DML isn't supported, so end to end exactly-once cannot be
implemented in any way.

If you ensure the write in the query is idempotent then no matter at all.

On Tue, Feb 23, 2021 at 10:35 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> With the ols spark streaming (example in Scala), this would have been
> easier through RDD. You could read data
>
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](streamingContext, kafkaParams, topicsValue)
>
>     dstream.foreachRDD
>
>     { pricesRDD =>
>
>       if (!pricesRDD.isEmpty)  // data exists in RDD
>
>       {
>
>          write to DB
>           }
>
>
> Now with structured streaming in Python, you read data into a dataframe
> with reaSstream and load
>
>
>        schema = StructType().add("rowkey", StringType()).add("ticker",
> StringType()).add("timeissued", TimestampType()).add("price", FloatType())
>
>             ds = self.spark \
>
>                 .readStream \
>
>                 .format("kafka") \
>
>              .......
>
>               .load() \
>
>                 .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>        ds2 = ds \
>
>             .select( \
>
>                      col("parsed_value.rowkey").alias("rowkey") \
>
>                    , col("parsed_value.ticker").alias("ticker") \
>
>                    , col("parsed_value.timeissued").alias("timeissued") \
>
>                    , col("parsed_value.price").alias("price")). \
>
>                      withColumn("currency",
> lit(config['MDVariables']['currency'])). \
>
>                      withColumn("op_type",
> lit(config['MDVariables']['op_type'])). \
>
>                      withColumn("op_time", current_timestamp())
>
> # write to console
>
>       query = ds2. \
>                     writeStream. \
>                     outputMode("append"). \
>                     format("console"). \
>                     start()
>         ds2.printSchema()
>
>
> But writing to BigQuery through BigQuery API does not work
>
>
>  s.writeTableToBQ(ds2, "overwrite",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>
>
>  query.awaitTermination()
>
>
> So this is the run result and the error
>
>
> root
>
>  |-- rowkey: string (nullable = true)
>
>  |-- ticker: string (nullable = true)
>
>  |-- timeissued: timestamp (nullable = true)
>
>  |-- price: float (nullable = true)
>
>  |-- currency: string (nullable = false)
>
>  |-- op_type: string (nullable = false)
>
>  |-- op_time: timestamp (nullable = false)
>
>
> *'write' can not be called on streaming Dataset/DataFrame;, quitting*
>
> I gather need to create RDD from the dataframe or maybe there is another
> way to write streaming data to DB directly from the dataframe?
>
> Thanks
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Reply via email to