Hi Mich,

Thanks for reply.  Will checkout this.

Kind Regards,
Sachit Murarka


On Fri, Feb 26, 2021 at 2:14 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi Sachit,
>
> I managed to make mine work using the *foreachBatch function *in
> writeStream.
>
> "foreach" performs custom write logic on each row and "foreachBatch"
> performs custom write logic on each micro-batch through SendToBigQuery
> function here
>  foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as
> DataFrame or Dataset and second: unique id for each batch
>  Using foreachBatch, we write each micro batch to storage defined in our
> custom logic. In this case, we store the output of our streaming
> application to Google BigQuery table.
>  Note that we are appending data and column "rowkey" is defined as UUID so
> it can be used as the primary key. batchId is just the counter
> (monolithically increasing number).
>
> This is my code:
>
>
> from __future__ import print_function
> from config import config
> import sys
> from sparkutils import sparkstuff as s
> from pyspark.sql import *
> from pyspark.sql.functions import *
> from pyspark.sql.types import StructType, StringType,IntegerType,
> FloatType, TimestampType
> from google.cloud import bigquery
>
>
> def SendToBigQuery(df, batchId):
>
>     """
>         Below uses standard Spark-BigQuery API to write to the table
>         Additional transformation logic will be performed here
>     """
>     s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>
> class MDStreaming:
>     def __init__(self, spark_session,spark_context):
>         self.spark = spark_session
>         self.sc = spark_context
>         self.config = config
>
>     def fetch_data(self):
>         self.sc.setLogLevel("ERROR")
>         #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT",
> "timeissued":"2021-02-23T08:42:23", "price":31.12}
>         schema = StructType().add("rowkey", StringType()).add("ticker",
> StringType()).add("timeissued", TimestampType()).add("price", FloatType())
>         try:
>             # construct a streaming dataframe streamingDataFrame that
> subscribes to topic config['MDVariables']['topic']) -> md (market data)
>             streamingDataFrame = self.spark \
>                 .readStream \
>                 .format("kafka") \
>                 .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                 .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
>                 .option("group.id", config['common']['appName']) \
>                 .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>                 .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
>                 .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>                 .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
>                 .option("subscribe", config['MDVariables']['topic']) \
>                 .option("failOnDataLoss", "false") \
>                 .option("includeHeaders", "true") \
>                 .option("startingOffsets", "latest") \
>                 .load() \
>                 .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>             #streamingDataFrame.printSchema()
>
>             """
>                "foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> SendToBigQuery function
>                 foreachBatch(SendToBigQuery) expects 2 parameters, first:
> micro-batch as DataFrame or Dataset and second: unique id for each batch
>                Using foreachBatch, we write each micro batch to storage
> defined in our custom logic. In this case, we store the output of our
> streaming application to Google BigQuery table.
>                Note that we are appending data and column "rowkey" is
> defined as UUID so it can be used as the primary key
>             """
>             result = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.ticker").alias("ticker") \
>                    , col("parsed_value.timeissued").alias("timeissued") \
>                    , col("parsed_value.price").alias("price")). \
>                      withColumn("currency",
> lit(config['MDVariables']['currency'])). \
>                      withColumn("op_type",
> lit(config['MDVariables']['op_type'])). \
>                      withColumn("op_time", current_timestamp()). \
>                      writeStream. \
>                   *   foreachBatch(SendToBigQuery). \*
>                      outputMode("update"). \
>                      start()
>         except Exception as e:
>                 print(f"""{e}, quitting""")
>                 sys.exit(1)
>
>
>         result.awaitTermination()
>
> if __name__ == "__main__":
>     appName = config['common']['appName']
>     spark_session = s.spark_session(appName)
>     spark_session = s.setSparkConfBQ(spark_session)
>     spark_context = s.sparkcontext()
>     mdstreaming = MDStreaming(spark_session, spark_context)
>     streamingDataFrame = mdstreaming.fetch_data()
>
>
> My batch interval is 2 seconds and in this case I am sending 10 rows for
> each ticker (security).
>
> HTH
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 25 Feb 2021 at 06:27, Sachit Murarka <connectsac...@gmail.com>
> wrote:
>
>> Hello Users,
>>
>> I am using Spark 3.0.1 Structuring streaming with Pyspark.
>>
>> My use case::
>> I get so many records in kafka(essentially some metadata with the
>> location of actual data). I have to take that metadata from kafka and apply
>> some processing.
>> Processing includes : Reading the actual data location from metadata and
>> fetching the actual data and applying some operation on actual data.
>>
>> What I have tried::
>>
>> def process_events(event):
>> fetch_actual_data()
>> #many more steps
>>
>> def fetch_actual_data():
>> #applying operation on actual data
>>
>> df = spark.readStream.format("kafka") \
>>             .option("kafka.bootstrap.servers", KAFKA_URL) \
>>             .option("subscribe", KAFKA_TOPICS) \
>>             .option("startingOffsets",
>> START_OFFSET).load() .selectExpr("CAST(value AS STRING)")
>>
>>
>> query =
>> df.writeStream.foreach(process_events).option("checkpointLocation",
>> "/opt/checkpoint").trigger(processingTime="30 seconds").start()
>>
>>
>> My Queries:
>>
>> 1. Will this foreach run across different executor processes? Generally
>> in spark , foreach means it runs on a single executor.
>>
>> 2. I receive too many records in kafka and above code will run multiple
>> times for each single message. If I change it for foreachbatch, will it
>> optimize it?
>>
>>
>> Kind Regards,
>> Sachit Murarka
>>
>

Reply via email to