OK

This is the equivalent Python code

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, when
from pyspark.sql.types import StructType, StructField, LongType
from datetime import datetime

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("StreamingSparkPartitioned") \
    .getOrCreate()

expression = when(expr("value % 3 = 1"), "stupid_event") \
    .otherwise(when(expr("value % 3 = 2"),
"smart_event").otherwise("neutral_event"))

# Define the schema to match the rate-micro-batch data source
schema = StructType([StructField("timestamp", LongType()),
StructField("value", LongType())])
checkpoint_path = "file:///ssd/hduser/randomdata/chkpt"

# Convert human-readable timestamp to Unix timestamp in milliseconds
start_timestamp = int(datetime(2024, 1, 28).timestamp() * 1000)

streamingDF = spark.readStream \
    .format("rate-micro-batch") \
    .option("rowsPerBatch", "100") \
    .option("startTimestamp", start_timestamp) \
    .option("numPartitions", 1) \
    .load() \
    .withColumn("event_type", expression)

query = (
    streamingDF.writeStream
    .outputMode("append")
    .format("console")
    .trigger(processingTime="1 second")
    .option("checkpointLocation", checkpoint_path)
    .start()
)

query.awaitTermination()

This is the error I am getting
  File "/home/hduser/dba/bin/python/DSBQ/src/StreamingSparkPartitioned.py",
line 38, in <module>
    query.awaitTermination()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py",
line 201, in awaitTermination
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
line 1322, in __call__
  File
"/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
line 175, in deco
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED]
Query [id = db2fd1cc-cc72-439e-9dcb-8acfd2e9a61e, runId =
f71cd26f-7e86-491c-bcf2-ab2e3dc63eca] terminated with exception: No usable
value for offset
Did not find value which can be converted into long

Seems like there might be an issue with the *rate-micro-batch* source when
using the *startTimestamp* option.

You can try using socket source for testing purposes

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 28 Jan 2024 at 22:00, Perfect Stranger <paulpaul1...@gmail.com>
wrote:

> I described the issue here:
>
>
> https://stackoverflow.com/questions/77893939/how-does-starttimestamp-option-work-for-the-rate-micro-batch-format
>
> Could someone please respond?
>
> The rate-micro-batch format doesn't seem to respect the startTimestamp
> option.
>
> Thanks.
>

Reply via email to