Hi Mich, Thanks for reply. Will checkout this.
Kind Regards, Sachit Murarka On Fri, Feb 26, 2021 at 2:14 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi Sachit, > > I managed to make mine work using the *foreachBatch function *in > writeStream. > > "foreach" performs custom write logic on each row and "foreachBatch" > performs custom write logic on each micro-batch through SendToBigQuery > function here > foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as > DataFrame or Dataset and second: unique id for each batch > Using foreachBatch, we write each micro batch to storage defined in our > custom logic. In this case, we store the output of our streaming > application to Google BigQuery table. > Note that we are appending data and column "rowkey" is defined as UUID so > it can be used as the primary key. batchId is just the counter > (monolithically increasing number). > > This is my code: > > > from __future__ import print_function > from config import config > import sys > from sparkutils import sparkstuff as s > from pyspark.sql import * > from pyspark.sql.functions import * > from pyspark.sql.types import StructType, StringType,IntegerType, > FloatType, TimestampType > from google.cloud import bigquery > > > def SendToBigQuery(df, batchId): > > """ > Below uses standard Spark-BigQuery API to write to the table > Additional transformation logic will be performed here > """ > s.writeTableToBQ(df, "append", > config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) > > class MDStreaming: > def __init__(self, spark_session,spark_context): > self.spark = spark_session > self.sc = spark_context > self.config = config > > def fetch_data(self): > self.sc.setLogLevel("ERROR") > #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT", > "timeissued":"2021-02-23T08:42:23", "price":31.12} > schema = StructType().add("rowkey", StringType()).add("ticker", > StringType()).add("timeissued", TimestampType()).add("price", FloatType()) > try: > # construct a streaming dataframe streamingDataFrame that > subscribes to topic config['MDVariables']['topic']) -> md (market data) > streamingDataFrame = self.spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", > config['MDVariables']['bootstrapServers'],) \ > .option("schema.registry.url", > config['MDVariables']['schemaRegistryURL']) \ > .option("group.id", config['common']['appName']) \ > .option("zookeeper.connection.timeout.ms", > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ > .option("rebalance.backoff.ms", > config['MDVariables']['rebalanceBackoffMS']) \ > .option("zookeeper.session.timeout.ms", > config['MDVariables']['zookeeperSessionTimeOutMs']) \ > .option("auto.commit.interval.ms", > config['MDVariables']['autoCommitIntervalMS']) \ > .option("subscribe", config['MDVariables']['topic']) \ > .option("failOnDataLoss", "false") \ > .option("includeHeaders", "true") \ > .option("startingOffsets", "latest") \ > .load() \ > .select(from_json(col("value").cast("string"), > schema).alias("parsed_value")) > > #streamingDataFrame.printSchema() > > """ > "foreach" performs custom write logic on each row and > "foreachBatch" performs custom write logic on each micro-batch through > SendToBigQuery function > foreachBatch(SendToBigQuery) expects 2 parameters, first: > micro-batch as DataFrame or Dataset and second: unique id for each batch > Using foreachBatch, we write each micro batch to storage > defined in our custom logic. In this case, we store the output of our > streaming application to Google BigQuery table. > Note that we are appending data and column "rowkey" is > defined as UUID so it can be used as the primary key > """ > result = streamingDataFrame.select( \ > col("parsed_value.rowkey").alias("rowkey") \ > , col("parsed_value.ticker").alias("ticker") \ > , col("parsed_value.timeissued").alias("timeissued") \ > , col("parsed_value.price").alias("price")). \ > withColumn("currency", > lit(config['MDVariables']['currency'])). \ > withColumn("op_type", > lit(config['MDVariables']['op_type'])). \ > withColumn("op_time", current_timestamp()). \ > writeStream. \ > * foreachBatch(SendToBigQuery). \* > outputMode("update"). \ > start() > except Exception as e: > print(f"""{e}, quitting""") > sys.exit(1) > > > result.awaitTermination() > > if __name__ == "__main__": > appName = config['common']['appName'] > spark_session = s.spark_session(appName) > spark_session = s.setSparkConfBQ(spark_session) > spark_context = s.sparkcontext() > mdstreaming = MDStreaming(spark_session, spark_context) > streamingDataFrame = mdstreaming.fetch_data() > > > My batch interval is 2 seconds and in this case I am sending 10 rows for > each ticker (security). > > HTH > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 25 Feb 2021 at 06:27, Sachit Murarka <connectsac...@gmail.com> > wrote: > >> Hello Users, >> >> I am using Spark 3.0.1 Structuring streaming with Pyspark. >> >> My use case:: >> I get so many records in kafka(essentially some metadata with the >> location of actual data). I have to take that metadata from kafka and apply >> some processing. >> Processing includes : Reading the actual data location from metadata and >> fetching the actual data and applying some operation on actual data. >> >> What I have tried:: >> >> def process_events(event): >> fetch_actual_data() >> #many more steps >> >> def fetch_actual_data(): >> #applying operation on actual data >> >> df = spark.readStream.format("kafka") \ >> .option("kafka.bootstrap.servers", KAFKA_URL) \ >> .option("subscribe", KAFKA_TOPICS) \ >> .option("startingOffsets", >> START_OFFSET).load() .selectExpr("CAST(value AS STRING)") >> >> >> query = >> df.writeStream.foreach(process_events).option("checkpointLocation", >> "/opt/checkpoint").trigger(processingTime="30 seconds").start() >> >> >> My Queries: >> >> 1. Will this foreach run across different executor processes? Generally >> in spark , foreach means it runs on a single executor. >> >> 2. I receive too many records in kafka and above code will run multiple >> times for each single message. If I change it for foreachbatch, will it >> optimize it? >> >> >> Kind Regards, >> Sachit Murarka >> >