BTW you intend to process these in 30 seconds?

processingTime="30 seconds

So how many rows of data are sent in microbatch and what is the interval at
which you receive the data in batches from the producer?




LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 25 Feb 2021 at 12:21, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> If you are receiving data from Kafka, Wouldn't that be better in Json
> format?
>
> .       try:
>             # construct a streaming dataframe streamingDataFrame that
> subscribes to topic config['MDVariables']['topic']) -> md (market data)
>             streamingDataFrame = self.spark \
>                 .readStream \
>                 .format("kafka") \
>                 .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                 .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
>                 .option("group.id", config['common']['appName']) \
>                 .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>                 .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
>                 .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>                 .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
>                 .option("subscribe", config['MDVariables']['topic']) \
>                 .option("failOnDataLoss", "false") \
>                 .option("includeHeaders", "true") \
>                 .option("startingOffsets", "earliest") \
>                 .load() \
>                 .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>             return streamingDataFrame
>         except Exception as e:
>                 print(f"""{e}, quitting""")
>                 sys.exit(1)
>
> and pass a class to the writer
>
>        result = streamingDataFrame. \
>                      writeStream. \
>                      foreach(*ForeachWriter()*). \
>                      start()
>
> You don't want to use a row by row (cursor) approach as it would leave a
> lot of messages un processed (as you correctly stated it runs on a single
> JVM).
>
>
> I am doing the same trying to process and write messages to BigQuery.
>
>
> HTH
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 25 Feb 2021 at 06:27, Sachit Murarka <connectsac...@gmail.com>
> wrote:
>
>> Hello Users,
>>
>> I am using Spark 3.0.1 Structuring streaming with Pyspark.
>>
>> My use case::
>> I get so many records in kafka(essentially some metadata with the
>> location of actual data). I have to take that metadata from kafka and apply
>> some processing.
>> Processing includes : Reading the actual data location from metadata and
>> fetching the actual data and applying some operation on actual data.
>>
>> What I have tried::
>>
>> def process_events(event):
>> fetch_actual_data()
>> #many more steps
>>
>> def fetch_actual_data():
>> #applying operation on actual data
>>
>> df = spark.readStream.format("kafka") \
>>             .option("kafka.bootstrap.servers", KAFKA_URL) \
>>             .option("subscribe", KAFKA_TOPICS) \
>>             .option("startingOffsets",
>> START_OFFSET).load() .selectExpr("CAST(value AS STRING)")
>>
>>
>> query =
>> df.writeStream.foreach(process_events).option("checkpointLocation",
>> "/opt/checkpoint").trigger(processingTime="30 seconds").start()
>>
>>
>> My Queries:
>>
>> 1. Will this foreach run across different executor processes? Generally
>> in spark , foreach means it runs on a single executor.
>>
>> 2. I receive too many records in kafka and above code will run multiple
>> times for each single message. If I change it for foreachbatch, will it
>> optimize it?
>>
>>
>> Kind Regards,
>> Sachit Murarka
>>
>

Reply via email to