Jitesh Soni created SPARK-56367:
-----------------------------------

             Summary: [SS][PYTHON][DOCS] Fix latestOffset docstring and 
tutorial to use correct field name and signature
                 Key: SPARK-56367
                 URL: https://issues.apache.org/jira/browse/SPARK-56367
             Project: Spark
          Issue Type: Improvement
          Components: Documentation, Structured Streaming
    Affects Versions: 4.2.0
            Reporter: Jitesh Soni


  This PR fixes two pre-existing documentation issues in the PySpark streaming 
data source API, identified during review of SPARK-55450.                    
                                                                                
                                                                            
  h2. Issue 1: Docstring Typo in DataSourceStreamReader.latestOffset()          
                                                                            
                                                                                
                                                                            
  *File:* {{python/pyspark/sql/datasource.py}}                                  
                                                                            
  *Line:* 759                                                                   
                                                                            
  *Method:* {{DataSourceStreamReader.latestOffset()}} docstring example         
                                                                            
                                                                                
                                                                            
  The docstring example for the {{latestOffset}} method uses an incorrect 
attribute name when accessing the {{ReadMaxRows}} limit parameter.              
  
                                                                                
                                                                            
  *Current code (incorrect):*                                                   
                                                                            
  {code:python}                                                                 
                                                                            
  >>> from pyspark.sql.streaming.datasource import ReadAllAvailable, 
ReadMaxRows                                                                     
       
  >>> def latestOffset(self, start, limit):                                     
                                                                            
  ...     # Assume the source has 10 new records between start and latest 
offset                                                                          
  
  ...     if isinstance(limit, ReadAllAvailable):                               
                                                                            
  ...        return {"index": start["index"] + 10}                              
                                                                            
  ...     else:  # e.g., limit is ReadMaxRows(5)                                
                                                                            
  ...        return {"index": start["index"] + min(10, limit.maxRows)}          
                                                                            
  {code}                                                                        
                                                                            
                                                                                
                                                                            
  *Should be:*                                                                  
                                                                            
  {code:python}                                                                 
                                                                            
  >>> from pyspark.sql.streaming.datasource import ReadAllAvailable, 
ReadMaxRows                                                                     
       
  >>> def latestOffset(self, start, limit):                                     
                                                                            
  ...     # Assume the source has 10 new records between start and latest 
offset                                                                          
  
  ...     if isinstance(limit, ReadAllAvailable):                               
                                                                            
  ...        return {"index": start["index"] + 10}                              
                                                                            
  ...     else:  # e.g., limit is ReadMaxRows(5)                                
                                                                            
  ...        return {"index": start["index"] + min(10, limit.max_rows)}         
                                                                            
  {code}                                                                        
                                                                            
                                                                                
                                                                            
  *Problem:* {{limit.maxRows}} should be {{limit.max_rows}}                     
                                                                            
                                                                                
                                                                            
  *Impact:* The {{ReadMaxRows}} dataclass (defined in 
{{python/pyspark/sql/streaming/datasource.py}}) uses the field name 
{{max_rows}} (Python snake_case   
  convention), not {{maxRows}} (camelCase). If a user copies this example code, 
they will get an {{AttributeError: 'ReadMaxRows' object has no attribute    
  'maxRows'}} at runtime.                                                       
                                                                            
                                                                                
                                                                            
  ----                                                                          
                                                                            
                                                                                
                                                                            
  h2. Issue 2: Outdated Method Signature in Tutorial                            
                                                                            
                                                                                
                                                                            
  *File:* {{python/docs/source/tutorial/sql/python_data_source.rst}}            
                                                                            
  *Line:* ~241                                                                  
                                                                            
  *Class:* {{FakeStreamReader}} example in the "Implement the Stream Reader" 
section                                                                        
                                                                                
                                                                            
  The tutorial example uses the old {{latestOffset()}} method signature with no 
parameters, but the recommended signature now includes {{start}} and        
  {{limit}} parameters for admission control support.                           
                                                                            
                                                                                
                                                                            
  *Current code (outdated):*                                                    
                                                                            
  {code:python}                                                                 
                                                                            
  class FakeStreamReader(DataSourceStreamReader):                               
                                                                            
      def __init__(self, schema, options):                                      
                                                                            
          self.current = 0                                                      
                                                                            
                                                                                
                                                                            
      def initialOffset(self) -> dict:                                          
                                                                            
          """                                                                   
                                                                            
          Return the initial start offset of the reader.                        
                                                                            
          """                                                                   
                                                                            
          return {"offset": 0}                                                  
                                                                            
                                                                                
                                                                            
      def latestOffset(self) -> dict:                                           
                                                                            
          """                                                                   
                                                                            
          Return the current latest offset that the next microbatch will read 
to.                                                                           
          """                                                                   
                                                                            
          self.current += 2                                                     
                                                                            
          return {"offset": self.current}                                       
                                                                            
                                                                                
                                                                            
      def partitions(self, start: dict, end: dict) -> list[InputPartition]:     
                                                                            
          ...                                                                   
                                                                            
  {code}                                                                        
                                                                            
                                                                                
                                                                            
  *Should be:*                                                                  
                                                                            
  {code:python}                                                                 
                                                                            
  class FakeStreamReader(DataSourceStreamReader):                               
                                                                            
      def __init__(self, schema, options):                                      
                                                                            
          self.current = 0                                                      
                                                                            
                                                                                
                                                                            
      def initialOffset(self) -> dict:                                          
                                                                            
          """                                                                   
                                                                            
          Return the initial start offset of the reader.                        
                                                                            
          """                                                                   
                                                                            
          return {"offset": 0}                                                  
                                                                            
                                                                                
                                                                            
      def latestOffset(self, start: dict, limit) -> dict:                       
                                                                            
          """                                                                   
                                                                            
          Return the current latest offset that the next microbatch will read 
to.                                                                           
          """                                                                   
                                                                            
          self.current += 2                                                     
                                                                            
          return {"offset": self.current}                                       
                                                                            
                                                                                
                                                                            
      def partitions(self, start: dict, end: dict) -> list[InputPartition]:     
                                                                            
          ...                                                                   
                                                                            
  {code}                                                                        
                                                                            
                                                                                
                                                                            
  *Problem:* {{def latestOffset(self) -> dict:}} should be {{def 
latestOffset(self, start: dict, limit) -> dict:}}                               
           
                                                                                
                                                                            
  *Impact:*                                                                     
                                                                            
  * The new {{latestOffset(start, limit)}} signature was introduced in Spark 
4.2 via SPARK-55304 to support admission control                               
  * While Spark maintains backward compatibility for existing data sources 
using the old signature, the tutorial should guide new users toward the         
 
  recommended approach                                                          
                                                                            
  * The old parameterless signature does not support admission control features 
like {{ReadMaxRows}} and {{ReadAllAvailable}}                               
  * Users following this tutorial will not learn about the {{start}} offset 
parameter (useful for resuming from a specific position) or the {{limit}}       
  parameter (required for admission control)                                    
                                                                            
                                                                                
                                                                            
  ----                                                                          
                                                                            
                                                                                
                                                                            
  h2. Why are the changes needed?                                               
                                                                            
                                                                                
                                                                            
  # *Correctness:* The docstring example contains a bug that would cause 
runtime errors if users copy-paste it                                           
   
  # *Best practices:* The official tutorial should demonstrate the recommended 
API signature, not a deprecated one                                          
  # *Consistency:* The docstring and tutorial should align with the actual API 
behavior                                                                     
  # *Feature discoverability:* Users reading the tutorial should learn about 
admission control capabilities                                                 
                                                                                
                                                                            
  h2. Does this PR introduce any user-facing change?                            
                                                                            
                                                                                
                                                                            
  No. Documentation and docstring fixes only. No changes to runtime behavior.   
                                                                            
                                                                                
                                                                            
  h2. How was this patch tested?                                                
                                                                            
                                                                                
                                                                            
  * Verified the {{ReadMaxRows}} dataclass definition uses {{max_rows}} field 
name                                                                          
  * Confirmed the {{DataSourceStreamReader.latestOffset()}} method signature 
accepts {{start}} and {{limit}} parameters                                     
  * Confirmed the new signature is the recommended approach per SPARK-55304     
                                                                            
                                                                                
                                                                            
  h2. Files to Change                                                           
                                                                            
                                                                                
                                                                            
  ||File||Line||Change||                                                        
                                                                            
  |{{python/pyspark/sql/datasource.py}}|759|Change {{limit.maxRows}} to 
{{limit.max_rows}}|                                                             
    
  |{{python/docs/source/tutorial/sql/python_data_source.rst}}|~241|Change {{def 
latestOffset(self)}} to {{def latestOffset(self, start: dict, limit)}}|     
                                                                                
                                                                            
  h2. Related Issues                                                            
                                                                            
                                                                                
                                                                            
  * SPARK-55304 - PySpark streaming data source admission control support 
(introduced the new {{latestOffset(start, limit)}} signature)                   
  
  * SPARK-55450 - Document admission control in PySpark streaming data sources 
(PR review identified these pre-existing issues)                             
                                                                                
                                                                            
  h2. Origin                                                                    
                                                                            
                                                                                
                                                                            
  These issues were identified by reviewer [~heartsavior] during review of PR 
#54807 (SPARK-55450). See review comment:                                     
  
https://github.com/apache/spark/pull/54807#pullrequestreview-2026-04-06T01:39:23Z
        



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to