[ 
https://issues.apache.org/jira/browse/SPARK-56367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18071574#comment-18071574
 ] 

Jitesh Soni commented on SPARK-56367:
-------------------------------------

[~canadiandataguy]  Starting work on it

> [SS][PYTHON][DOCS] Fix latestOffset docstring and tutorial to use correct 
> field name and signature
> --------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-56367
>                 URL: https://issues.apache.org/jira/browse/SPARK-56367
>             Project: Spark
>          Issue Type: Improvement
>          Components: Documentation, Structured Streaming
>    Affects Versions: 4.2.0
>            Reporter: Jitesh Soni
>            Priority: Minor
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
>   This PR fixes two pre-existing documentation issues in the PySpark 
> streaming data source API, identified during review of SPARK-55450.           
>          
>                                                                               
>                                                                               
>   h2. Issue 1: Docstring Typo in DataSourceStreamReader.latestOffset()        
>                                                                               
>                                                                               
>                                                                               
>   *File:* {{python/pyspark/sql/datasource.py}}                                
>                                                                               
>   *Line:* 759                                                                 
>                                                                               
>   *Method:* {{DataSourceStreamReader.latestOffset()}} docstring example       
>                                                                               
>                                                                               
>                                                                               
>   The docstring example for the {{latestOffset}} method uses an incorrect 
> attribute name when accessing the {{ReadMaxRows}} limit parameter.            
>     
>                                                                               
>                                                                               
>   *Current code (incorrect):*                                                 
>                                                                               
>   {code:python}                                                               
>                                                                               
>   >>> from pyspark.sql.streaming.datasource import ReadAllAvailable, 
> ReadMaxRows                                                                   
>          
>   >>> def latestOffset(self, start, limit):                                   
>                                                                               
>   ...     # Assume the source has 10 new records between start and latest 
> offset                                                                        
>     
>   ...     if isinstance(limit, ReadAllAvailable):                             
>                                                                               
>   ...        return {"index": start["index"] + 10}                            
>                                                                               
>   ...     else:  # e.g., limit is ReadMaxRows(5)                              
>                                                                               
>   ...        return {"index": start["index"] + min(10, limit.maxRows)}        
>                                                                               
>   {code}                                                                      
>                                                                               
>                                                                               
>                                                                               
>   *Should be:*                                                                
>                                                                               
>   {code:python}                                                               
>                                                                               
>   >>> from pyspark.sql.streaming.datasource import ReadAllAvailable, 
> ReadMaxRows                                                                   
>          
>   >>> def latestOffset(self, start, limit):                                   
>                                                                               
>   ...     # Assume the source has 10 new records between start and latest 
> offset                                                                        
>     
>   ...     if isinstance(limit, ReadAllAvailable):                             
>                                                                               
>   ...        return {"index": start["index"] + 10}                            
>                                                                               
>   ...     else:  # e.g., limit is ReadMaxRows(5)                              
>                                                                               
>   ...        return {"index": start["index"] + min(10, limit.max_rows)}       
>                                                                               
>   {code}                                                                      
>                                                                               
>                                                                               
>                                                                               
>   *Problem:* {{limit.maxRows}} should be {{limit.max_rows}}                   
>                                                                               
>                                                                               
>                                                                               
>   *Impact:* The {{ReadMaxRows}} dataclass (defined in 
> {{python/pyspark/sql/streaming/datasource.py}}) uses the field name 
> {{max_rows}} (Python snake_case   
>   convention), not {{maxRows}} (camelCase). If a user copies this example 
> code, they will get an {{AttributeError: 'ReadMaxRows' object has no 
> attribute    
>   'maxRows'}} at runtime.                                                     
>                                                                               
>                                                                               
>                                                                               
>   ----                                                                        
>                                                                               
>                                                                               
>                                                                               
>   h2. Issue 2: Outdated Method Signature in Tutorial                          
>                                                                               
>                                                                               
>                                                                               
>   *File:* {{python/docs/source/tutorial/sql/python_data_source.rst}}          
>                                                                               
>   *Line:* ~241                                                                
>                                                                               
>   *Class:* {{FakeStreamReader}} example in the "Implement the Stream Reader" 
> section                                                                       
>  
>                                                                               
>                                                                               
>   The tutorial example uses the old {{latestOffset()}} method signature with 
> no parameters, but the recommended signature now includes {{start}} and       
>  
>   {{limit}} parameters for admission control support.                         
>                                                                               
>                                                                               
>                                                                               
>   *Current code (outdated):*                                                  
>                                                                               
>   {code:python}                                                               
>                                                                               
>   class FakeStreamReader(DataSourceStreamReader):                             
>                                                                               
>       def __init__(self, schema, options):                                    
>                                                                               
>           self.current = 0                                                    
>                                                                               
>                                                                               
>                                                                               
>       def initialOffset(self) -> dict:                                        
>                                                                               
>           """                                                                 
>                                                                               
>           Return the initial start offset of the reader.                      
>                                                                               
>           """                                                                 
>                                                                               
>           return {"offset": 0}                                                
>                                                                               
>                                                                               
>                                                                               
>       def latestOffset(self) -> dict:                                         
>                                                                               
>           """                                                                 
>                                                                               
>           Return the current latest offset that the next microbatch will read 
> to.                                                                           
>           """                                                                 
>                                                                               
>           self.current += 2                                                   
>                                                                               
>           return {"offset": self.current}                                     
>                                                                               
>                                                                               
>                                                                               
>       def partitions(self, start: dict, end: dict) -> list[InputPartition]:   
>                                                                               
>           ...                                                                 
>                                                                               
>   {code}                                                                      
>                                                                               
>                                                                               
>                                                                               
>   *Should be:*                                                                
>                                                                               
>   {code:python}                                                               
>                                                                               
>   class FakeStreamReader(DataSourceStreamReader):                             
>                                                                               
>       def __init__(self, schema, options):                                    
>                                                                               
>           self.current = 0                                                    
>                                                                               
>                                                                               
>                                                                               
>       def initialOffset(self) -> dict:                                        
>                                                                               
>           """                                                                 
>                                                                               
>           Return the initial start offset of the reader.                      
>                                                                               
>           """                                                                 
>                                                                               
>           return {"offset": 0}                                                
>                                                                               
>                                                                               
>                                                                               
>       def latestOffset(self, start: dict, limit) -> dict:                     
>                                                                               
>           """                                                                 
>                                                                               
>           Return the current latest offset that the next microbatch will read 
> to.                                                                           
>           """                                                                 
>                                                                               
>           self.current += 2                                                   
>                                                                               
>           return {"offset": self.current}                                     
>                                                                               
>                                                                               
>                                                                               
>       def partitions(self, start: dict, end: dict) -> list[InputPartition]:   
>                                                                               
>           ...                                                                 
>                                                                               
>   {code}                                                                      
>                                                                               
>                                                                               
>                                                                               
>   *Problem:* {{def latestOffset(self) -> dict:}} should be {{def 
> latestOffset(self, start: dict, limit) -> dict:}}                             
>              
>                                                                               
>                                                                               
>   *Impact:*                                                                   
>                                                                               
>   * The new {{latestOffset(start, limit)}} signature was introduced in Spark 
> 4.2 via SPARK-55304 to support admission control                              
>  
>   * While Spark maintains backward compatibility for existing data sources 
> using the old signature, the tutorial should guide new users toward the       
>    
>   recommended approach                                                        
>                                                                               
>   * The old parameterless signature does not support admission control 
> features like {{ReadMaxRows}} and {{ReadAllAvailable}}                        
>        
>   * Users following this tutorial will not learn about the {{start}} offset 
> parameter (useful for resuming from a specific position) or the {{limit}}     
>   
>   parameter (required for admission control)                                  
>                                                                               
>                                                                               
>                                                                               
>   ----                                                                        
>                                                                               
>                                                                               
>                                                                               
>   h2. Why are the changes needed?                                             
>                                                                               
>                                                                               
>                                                                               
>   # *Correctness:* The docstring example contains a bug that would cause 
> runtime errors if users copy-paste it                                         
>      
>   # *Best practices:* The official tutorial should demonstrate the 
> recommended API signature, not a deprecated one                               
>            
>   # *Consistency:* The docstring and tutorial should align with the actual 
> API behavior                                                                  
>    
>   # *Feature discoverability:* Users reading the tutorial should learn about 
> admission control capabilities                                                
>  
>                                                                               
>                                                                               
>   h2. Does this PR introduce any user-facing change?                          
>                                                                               
>                                                                               
>                                                                               
>   No. Documentation and docstring fixes only. No changes to runtime behavior. 
>                                                                               
>                                                                               
>                                                                               
>   h2. How was this patch tested?                                              
>                                                                               
>                                                                               
>                                                                               
>   * Verified the {{ReadMaxRows}} dataclass definition uses {{max_rows}} field 
> name                                                                          
>   * Confirmed the {{DataSourceStreamReader.latestOffset()}} method signature 
> accepts {{start}} and {{limit}} parameters                                    
>  
>   * Confirmed the new signature is the recommended approach per SPARK-55304   
>                                                                               
>                                                                               
>                                                                               
>   h2. Files to Change                                                         
>                                                                               
>                                                                               
>                                                                               
>   ||File||Line||Change||                                                      
>                                                                               
>   |{{python/pyspark/sql/datasource.py}}|759|Change {{limit.maxRows}} to 
> {{limit.max_rows}}|                                                           
>       
>   |{{python/docs/source/tutorial/sql/python_data_source.rst}}|~241|Change 
> {{def latestOffset(self)}} to {{def latestOffset(self, start: dict, limit)}}| 
>     
>                                                                               
>                                                                               
>   h2. Related Issues                                                          
>                                                                               
>                                                                               
>                                                                               
>   * SPARK-55304 - PySpark streaming data source admission control support 
> (introduced the new {{latestOffset(start, limit)}} signature)                 
>     
>   * SPARK-55450 - Document admission control in PySpark streaming data 
> sources (PR review identified these pre-existing issues)                      
>        
>                                                                               
>                                                                               
>   h2. Origin                                                                  
>                                                                               
>                                                                               
>                                                                               
>   These issues were identified by reviewer [~heartsavior] during review of PR 
> #54807 (SPARK-55450). See review comment:                                     
>   
> https://github.com/apache/spark/pull/54807#pullrequestreview-2026-04-06T01:39:23Z
>         



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to