Hi Sachit, I managed to make mine work using the *foreachBatch function *in writeStream.
"foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function here foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table. Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key. batchId is just the counter (monolithically increasing number). This is my code: from __future__ import print_function from config import config import sys from sparkutils import sparkstuff as s from pyspark.sql import * from pyspark.sql.functions import * from pyspark.sql.types import StructType, StringType,IntegerType, FloatType, TimestampType from google.cloud import bigquery def SendToBigQuery(df, batchId): """ Below uses standard Spark-BigQuery API to write to the table Additional transformation logic will be performed here """ s.writeTableToBQ(df, "append", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) class MDStreaming: def __init__(self, spark_session,spark_context): self.spark = spark_session self.sc = spark_context self.config = config def fetch_data(self): self.sc.setLogLevel("ERROR") #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT", "timeissued":"2021-02-23T08:42:23", "price":31.12} schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType()) try: # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data) streamingDataFrame = self.spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \ .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \ .option("group.id", config['common']['appName']) \ .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \ .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \ .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \ .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \ .option("subscribe", config['MDVariables']['topic']) \ .option("failOnDataLoss", "false") \ .option("includeHeaders", "true") \ .option("startingOffsets", "latest") \ .load() \ .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) #streamingDataFrame.printSchema() """ "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table. Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key """ result = streamingDataFrame.select( \ col("parsed_value.rowkey").alias("rowkey") \ , col("parsed_value.ticker").alias("ticker") \ , col("parsed_value.timeissued").alias("timeissued") \ , col("parsed_value.price").alias("price")). \ withColumn("currency", lit(config['MDVariables']['currency'])). \ withColumn("op_type", lit(config['MDVariables']['op_type'])). \ withColumn("op_time", current_timestamp()). \ writeStream. \ * foreachBatch(SendToBigQuery). \* outputMode("update"). \ start() except Exception as e: print(f"""{e}, quitting""") sys.exit(1) result.awaitTermination() if __name__ == "__main__": appName = config['common']['appName'] spark_session = s.spark_session(appName) spark_session = s.setSparkConfBQ(spark_session) spark_context = s.sparkcontext() mdstreaming = MDStreaming(spark_session, spark_context) streamingDataFrame = mdstreaming.fetch_data() My batch interval is 2 seconds and in this case I am sending 10 rows for each ticker (security). HTH LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 25 Feb 2021 at 06:27, Sachit Murarka <connectsac...@gmail.com> wrote: > Hello Users, > > I am using Spark 3.0.1 Structuring streaming with Pyspark. > > My use case:: > I get so many records in kafka(essentially some metadata with the location > of actual data). I have to take that metadata from kafka and apply some > processing. > Processing includes : Reading the actual data location from metadata and > fetching the actual data and applying some operation on actual data. > > What I have tried:: > > def process_events(event): > fetch_actual_data() > #many more steps > > def fetch_actual_data(): > #applying operation on actual data > > df = spark.readStream.format("kafka") \ > .option("kafka.bootstrap.servers", KAFKA_URL) \ > .option("subscribe", KAFKA_TOPICS) \ > .option("startingOffsets", > START_OFFSET).load() .selectExpr("CAST(value AS STRING)") > > > query = > df.writeStream.foreach(process_events).option("checkpointLocation", > "/opt/checkpoint").trigger(processingTime="30 seconds").start() > > > My Queries: > > 1. Will this foreach run across different executor processes? Generally in > spark , foreach means it runs on a single executor. > > 2. I receive too many records in kafka and above code will run multiple > times for each single message. If I change it for foreachbatch, will it > optimize it? > > > Kind Regards, > Sachit Murarka >