Jitesh Soni created SPARK-56367:
-----------------------------------
Summary: [SS][PYTHON][DOCS] Fix latestOffset docstring and
tutorial to use correct field name and signature
Key: SPARK-56367
URL: https://issues.apache.org/jira/browse/SPARK-56367
Project: Spark
Issue Type: Improvement
Components: Documentation, Structured Streaming
Affects Versions: 4.2.0
Reporter: Jitesh Soni
This PR fixes two pre-existing documentation issues in the PySpark streaming
data source API, identified during review of SPARK-55450.
h2. Issue 1: Docstring Typo in DataSourceStreamReader.latestOffset()
*File:* {{python/pyspark/sql/datasource.py}}
*Line:* 759
*Method:* {{DataSourceStreamReader.latestOffset()}} docstring example
The docstring example for the {{latestOffset}} method uses an incorrect
attribute name when accessing the {{ReadMaxRows}} limit parameter.
*Current code (incorrect):*
{code:python}
>>> from pyspark.sql.streaming.datasource import ReadAllAvailable,
ReadMaxRows
>>> def latestOffset(self, start, limit):
... # Assume the source has 10 new records between start and latest
offset
... if isinstance(limit, ReadAllAvailable):
... return {"index": start["index"] + 10}
... else: # e.g., limit is ReadMaxRows(5)
... return {"index": start["index"] + min(10, limit.maxRows)}
{code}
*Should be:*
{code:python}
>>> from pyspark.sql.streaming.datasource import ReadAllAvailable,
ReadMaxRows
>>> def latestOffset(self, start, limit):
... # Assume the source has 10 new records between start and latest
offset
... if isinstance(limit, ReadAllAvailable):
... return {"index": start["index"] + 10}
... else: # e.g., limit is ReadMaxRows(5)
... return {"index": start["index"] + min(10, limit.max_rows)}
{code}
*Problem:* {{limit.maxRows}} should be {{limit.max_rows}}
*Impact:* The {{ReadMaxRows}} dataclass (defined in
{{python/pyspark/sql/streaming/datasource.py}}) uses the field name
{{max_rows}} (Python snake_case
convention), not {{maxRows}} (camelCase). If a user copies this example code,
they will get an {{AttributeError: 'ReadMaxRows' object has no attribute
'maxRows'}} at runtime.
----
h2. Issue 2: Outdated Method Signature in Tutorial
*File:* {{python/docs/source/tutorial/sql/python_data_source.rst}}
*Line:* ~241
*Class:* {{FakeStreamReader}} example in the "Implement the Stream Reader"
section
The tutorial example uses the old {{latestOffset()}} method signature with no
parameters, but the recommended signature now includes {{start}} and
{{limit}} parameters for admission control support.
*Current code (outdated):*
{code:python}
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Return the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Return the current latest offset that the next microbatch will read
to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict) -> list[InputPartition]:
...
{code}
*Should be:*
{code:python}
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Return the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self, start: dict, limit) -> dict:
"""
Return the current latest offset that the next microbatch will read
to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict) -> list[InputPartition]:
...
{code}
*Problem:* {{def latestOffset(self) -> dict:}} should be {{def
latestOffset(self, start: dict, limit) -> dict:}}
*Impact:*
* The new {{latestOffset(start, limit)}} signature was introduced in Spark
4.2 via SPARK-55304 to support admission control
* While Spark maintains backward compatibility for existing data sources
using the old signature, the tutorial should guide new users toward the
recommended approach
* The old parameterless signature does not support admission control features
like {{ReadMaxRows}} and {{ReadAllAvailable}}
* Users following this tutorial will not learn about the {{start}} offset
parameter (useful for resuming from a specific position) or the {{limit}}
parameter (required for admission control)
----
h2. Why are the changes needed?
# *Correctness:* The docstring example contains a bug that would cause
runtime errors if users copy-paste it
# *Best practices:* The official tutorial should demonstrate the recommended
API signature, not a deprecated one
# *Consistency:* The docstring and tutorial should align with the actual API
behavior
# *Feature discoverability:* Users reading the tutorial should learn about
admission control capabilities
h2. Does this PR introduce any user-facing change?
No. Documentation and docstring fixes only. No changes to runtime behavior.
h2. How was this patch tested?
* Verified the {{ReadMaxRows}} dataclass definition uses {{max_rows}} field
name
* Confirmed the {{DataSourceStreamReader.latestOffset()}} method signature
accepts {{start}} and {{limit}} parameters
* Confirmed the new signature is the recommended approach per SPARK-55304
h2. Files to Change
||File||Line||Change||
|{{python/pyspark/sql/datasource.py}}|759|Change {{limit.maxRows}} to
{{limit.max_rows}}|
|{{python/docs/source/tutorial/sql/python_data_source.rst}}|~241|Change {{def
latestOffset(self)}} to {{def latestOffset(self, start: dict, limit)}}|
h2. Related Issues
* SPARK-55304 - PySpark streaming data source admission control support
(introduced the new {{latestOffset(start, limit)}} signature)
* SPARK-55450 - Document admission control in PySpark streaming data sources
(PR review identified these pre-existing issues)
h2. Origin
These issues were identified by reviewer [~heartsavior] during review of PR
#54807 (SPARK-55450). See review comment:
https://github.com/apache/spark/pull/54807#pullrequestreview-2026-04-06T01:39:23Z
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
