Hi Mich,
the issue was related to incorrect, which is resolved.

However, wrt your comment - 'OK sounds like your watermark is done outside
of your processing.'

In my use-case which primarily deals with syslogs, syslog is a string
which needs to be parsed (with defensive coding built in to ensure records
are in correct format), before it is fed to
3 different classes (AlarmProc being one of them) - where there is
additional parsing + aggregation for specific types of logs.
The way I'm handling this is by using -- foreachBatch(convertToDict) in the
writeStream method, and the parsing + aggregation happens for the
microbatch.
foreachBatch - will wait for the parsing and aggregation to complete for
the microbatch, and then proceed to do the same with the next microbatch.

Since it involves a lot of parsing + aggregation, it requires more than a
df.select() - hence the approach above is taken.
>From what I understand, the watermark is done within the processing ..
since it is done per microbatch pulled with each trigger.

Pls let me know if you have comments/suggestions on this approach.

thanks,
Karan Alang


On Wed, Feb 16, 2022 at 12:52 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> OK sounds like your watermark is done outside of your processing.
>
> Check this
>
>             # construct a streaming dataframe streamingDataFrame that
> subscribes to topic temperature
>             streamingDataFrame = self.spark \
>                 .readStream \
>                 .format("kafka") \
>                 .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                 .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
>                 .option("group.id", config['common']['appName']) \
>                 .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>                 .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
>                 .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>                 .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
>                 .option("subscribe", "temperature") \
>                 .option("failOnDataLoss", "false") \
>                 .option("includeHeaders", "true") \
>                 .option("startingOffsets", "latest") \
>                 .load() \
>                 .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>
>             resultM = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.timestamp").alias("timestamp") \
>                    , col("parsed_value.temperature").alias("temperature"))
>             result = resultM. \
>                      withWatermark("timestamp", "5 minutes"). \
>                      groupBy(window(resultM.timestamp, "5 minutes", "5
> minutes")). \
>                      avg('temperature'). \
>                      writeStream. \
>                      outputMode('complete'). \
>                      option("numRows", 1000). \
>                      option("truncate", "false"). \
>                      format('console'). \
>                      option('checkpointLocation', checkpoint_path). \
>                      queryName("temperature"). \
>                      start()
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 16 Feb 2022 at 06:37, karan alang <karan.al...@gmail.com> wrote:
>
>>
>> Hello All,
>>
>> I have a Structured Streaming pyspark program running on GCP Dataproc,
>> which reads data from Kafka, and does some data massaging, and aggregation.
>> I'm trying to use withWatermark(), and it is giving error.
>>
>> py4j.Py4JException: An exception was raised by the Python Proxy. Return
>> Message: Traceback (most recent call last):
>>
>>   File
>> "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
>> 2442, in _call_proxy
>>
>>     return_value = getattr(self.pool[obj_id], method)(*params)
>>
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>> line 196, in call
>>
>>     raise e
>>
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>> line 193, in call
>>
>>     self.func(DataFrame(jdf, self.sql_ctx), batch_id)
>>
>>   File
>> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>> line 444, in convertToDictForEachBatch
>>
>>     ap = Alarm(tdict, spark)
>>
>>   File
>> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>> line 356, in __init__
>>
>>     computeCount(l_alarm_df, l_alarm1_df)
>>
>>   File
>> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>> line 262, in computeCount
>>
>>     window(col("timestamp"), "10 minutes").alias("window")
>>
>> TypeError: 'module' object is not callable
>>
>> Details are in stackoverflow below :
>>
>> https://stackoverflow.com/questions/71137296/structuredstreaming-withwatermark-typeerror-module-object-is-not-callable
>>
>> Any ideas on how to debug/fix this ?
>> tia !
>>
>

Reply via email to