[
https://issues.apache.org/jira/browse/SPARK-56367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jitesh Soni updated SPARK-56367:
--------------------------------
Summary: [SS][PYTHON][DOCS] Fix latestOffset docstring/tutorial and add
Trigger.AvailableNow documentation (was: [SS][PYTHON][DOCS] Fix
latestOffset docstring and tutorial to use correct field name and signature)
> [SS][PYTHON][DOCS] Fix latestOffset docstring/tutorial and add
> Trigger.AvailableNow documentation
> ------------------------------------------------------------------------------------------------------
>
> Key: SPARK-56367
> URL: https://issues.apache.org/jira/browse/SPARK-56367
> Project: Spark
> Issue Type: Improvement
> Components: Documentation, Structured Streaming
> Affects Versions: 4.2.0
> Reporter: Jitesh Soni
> Priority: Minor
> Labels: pull-request-available
> Original Estimate: 48h
> Remaining Estimate: 48h
>
> This PR fixes two pre-existing documentation issues in the PySpark
> streaming data source API, identified during review of SPARK-55450.
>
>
>
> h2. Issue 1: Docstring Typo in DataSourceStreamReader.latestOffset()
>
>
>
> *File:* {{python/pyspark/sql/datasource.py}}
>
> *Line:* 759
>
> *Method:* {{DataSourceStreamReader.latestOffset()}} docstring example
>
>
>
> The docstring example for the {{latestOffset}} method uses an incorrect
> attribute name when accessing the {{ReadMaxRows}} limit parameter.
>
>
>
> *Current code (incorrect):*
>
> {code:python}
>
> >>> from pyspark.sql.streaming.datasource import ReadAllAvailable,
> ReadMaxRows
>
> >>> def latestOffset(self, start, limit):
>
> ... # Assume the source has 10 new records between start and latest
> offset
>
> ... if isinstance(limit, ReadAllAvailable):
>
> ... return {"index": start["index"] + 10}
>
> ... else: # e.g., limit is ReadMaxRows(5)
>
> ... return {"index": start["index"] + min(10, limit.maxRows)}
>
> {code}
>
>
>
> *Should be:*
>
> {code:python}
>
> >>> from pyspark.sql.streaming.datasource import ReadAllAvailable,
> ReadMaxRows
>
> >>> def latestOffset(self, start, limit):
>
> ... # Assume the source has 10 new records between start and latest
> offset
>
> ... if isinstance(limit, ReadAllAvailable):
>
> ... return {"index": start["index"] + 10}
>
> ... else: # e.g., limit is ReadMaxRows(5)
>
> ... return {"index": start["index"] + min(10, limit.max_rows)}
>
> {code}
>
>
>
> *Problem:* {{limit.maxRows}} should be {{limit.max_rows}}
>
>
>
> *Impact:* The {{ReadMaxRows}} dataclass (defined in
> {{python/pyspark/sql/streaming/datasource.py}}) uses the field name
> {{max_rows}} (Python snake_case
> convention), not {{maxRows}} (camelCase). If a user copies this example
> code, they will get an {{AttributeError: 'ReadMaxRows' object has no
> attribute
> 'maxRows'}} at runtime.
>
>
>
> ----
>
>
>
> h2. Issue 2: Outdated Method Signature in Tutorial
>
>
>
> *File:* {{python/docs/source/tutorial/sql/python_data_source.rst}}
>
> *Line:* ~241
>
> *Class:* {{FakeStreamReader}} example in the "Implement the Stream Reader"
> section
>
>
>
> The tutorial example uses the old {{latestOffset()}} method signature with
> no parameters, but the recommended signature now includes {{start}} and
>
> {{limit}} parameters for admission control support.
>
>
>
> *Current code (outdated):*
>
> {code:python}
>
> class FakeStreamReader(DataSourceStreamReader):
>
> def __init__(self, schema, options):
>
> self.current = 0
>
>
>
> def initialOffset(self) -> dict:
>
> """
>
> Return the initial start offset of the reader.
>
> """
>
> return {"offset": 0}
>
>
>
> def latestOffset(self) -> dict:
>
> """
>
> Return the current latest offset that the next microbatch will read
> to.
> """
>
> self.current += 2
>
> return {"offset": self.current}
>
>
>
> def partitions(self, start: dict, end: dict) -> list[InputPartition]:
>
> ...
>
> {code}
>
>
>
> *Should be:*
>
> {code:python}
>
> class FakeStreamReader(DataSourceStreamReader):
>
> def __init__(self, schema, options):
>
> self.current = 0
>
>
>
> def initialOffset(self) -> dict:
>
> """
>
> Return the initial start offset of the reader.
>
> """
>
> return {"offset": 0}
>
>
>
> def latestOffset(self, start: dict, limit) -> dict:
>
> """
>
> Return the current latest offset that the next microbatch will read
> to.
> """
>
> self.current += 2
>
> return {"offset": self.current}
>
>
>
> def partitions(self, start: dict, end: dict) -> list[InputPartition]:
>
> ...
>
> {code}
>
>
>
> *Problem:* {{def latestOffset(self) -> dict:}} should be {{def
> latestOffset(self, start: dict, limit) -> dict:}}
>
>
>
> *Impact:*
>
> * The new {{latestOffset(start, limit)}} signature was introduced in Spark
> 4.2 via SPARK-55304 to support admission control
>
> * While Spark maintains backward compatibility for existing data sources
> using the old signature, the tutorial should guide new users toward the
>
> recommended approach
>
> * The old parameterless signature does not support admission control
> features like {{ReadMaxRows}} and {{ReadAllAvailable}}
>
> * Users following this tutorial will not learn about the {{start}} offset
> parameter (useful for resuming from a specific position) or the {{limit}}
>
> parameter (required for admission control)
>
>
>
> ----
>
>
>
> h2. Why are the changes needed?
>
>
>
> # *Correctness:* The docstring example contains a bug that would cause
> runtime errors if users copy-paste it
>
> # *Best practices:* The official tutorial should demonstrate the
> recommended API signature, not a deprecated one
>
> # *Consistency:* The docstring and tutorial should align with the actual
> API behavior
>
> # *Feature discoverability:* Users reading the tutorial should learn about
> admission control capabilities
>
>
>
> h2. Does this PR introduce any user-facing change?
>
>
>
> No. Documentation and docstring fixes only. No changes to runtime behavior.
>
>
>
> h2. How was this patch tested?
>
>
>
> * Verified the {{ReadMaxRows}} dataclass definition uses {{max_rows}} field
> name
> * Confirmed the {{DataSourceStreamReader.latestOffset()}} method signature
> accepts {{start}} and {{limit}} parameters
>
> * Confirmed the new signature is the recommended approach per SPARK-55304
>
>
>
> h2. Files to Change
>
>
>
> ||File||Line||Change||
>
> |{{python/pyspark/sql/datasource.py}}|759|Change {{limit.maxRows}} to
> {{limit.max_rows}}|
>
> |{{python/docs/source/tutorial/sql/python_data_source.rst}}|~241|Change
> {{def latestOffset(self)}} to {{def latestOffset(self, start: dict, limit)}}|
>
>
>
> h2. Related Issues
>
>
>
> * SPARK-55304 - PySpark streaming data source admission control support
> (introduced the new {{latestOffset(start, limit)}} signature)
>
> * SPARK-55450 - Document admission control in PySpark streaming data
> sources (PR review identified these pre-existing issues)
>
>
>
> h2. Origin
>
>
>
> These issues were identified by reviewer [~heartsavior] during review of PR
> #54807 (SPARK-55450). See review comment:
>
> https://github.com/apache/spark/pull/54807#pullrequestreview-2026-04-06T01:39:23Z
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
