Yes, there's definitely an issue, can someone fix it? I'm not familiar with
apache jira, do I need to make a  bug report or what?

On Mon, Jan 29, 2024 at 2:57 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> OK
>
> This is the equivalent Python code
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import expr, when
> from pyspark.sql.types import StructType, StructField, LongType
> from datetime import datetime
>
> spark = SparkSession.builder \
>     .master("local[*]") \
>     .appName("StreamingSparkPartitioned") \
>     .getOrCreate()
>
> expression = when(expr("value % 3 = 1"), "stupid_event") \
>     .otherwise(when(expr("value % 3 = 2"),
> "smart_event").otherwise("neutral_event"))
>
> # Define the schema to match the rate-micro-batch data source
> schema = StructType([StructField("timestamp", LongType()),
> StructField("value", LongType())])
> checkpoint_path = "file:///ssd/hduser/randomdata/chkpt"
>
> # Convert human-readable timestamp to Unix timestamp in milliseconds
> start_timestamp = int(datetime(2024, 1, 28).timestamp() * 1000)
>
> streamingDF = spark.readStream \
>     .format("rate-micro-batch") \
>     .option("rowsPerBatch", "100") \
>     .option("startTimestamp", start_timestamp) \
>     .option("numPartitions", 1) \
>     .load() \
>     .withColumn("event_type", expression)
>
> query = (
>     streamingDF.writeStream
>     .outputMode("append")
>     .format("console")
>     .trigger(processingTime="1 second")
>     .option("checkpointLocation", checkpoint_path)
>     .start()
> )
>
> query.awaitTermination()
>
> This is the error I am getting
>   File
> "/home/hduser/dba/bin/python/DSBQ/src/StreamingSparkPartitioned.py", line
> 38, in <module>
>     query.awaitTermination()
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py",
> line 201, in awaitTermination
>   File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
> line 1322, in __call__
>   File
> "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
> line 175, in deco
> pyspark.errors.exceptions.captured.StreamingQueryException:
> [STREAM_FAILED] Query [id = db2fd1cc-cc72-439e-9dcb-8acfd2e9a61e, runId =
> f71cd26f-7e86-491c-bcf2-ab2e3dc63eca] terminated with exception: No usable
> value for offset
> Did not find value which can be converted into long
>
> Seems like there might be an issue with the *rate-micro-batch* source
> when using the *startTimestamp* option.
>
> You can try using socket source for testing purposes
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 28 Jan 2024 at 22:00, Perfect Stranger <paulpaul1...@gmail.com>
> wrote:
>
>> I described the issue here:
>>
>>
>> https://stackoverflow.com/questions/77893939/how-does-starttimestamp-option-work-for-the-rate-micro-batch-format
>>
>> Could someone please respond?
>>
>> The rate-micro-batch format doesn't seem to respect the startTimestamp
>> option.
>>
>> Thanks.
>>
>

Reply via email to