allisonwang-db commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1572908487


##########
python/pyspark/sql/datasource.py:
##########
@@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: 
bool) -> "DataSourceStream
             message_parameters={"feature": "streamWriter"},
         )
 
+    def _streamReader(self, schema: StructType) -> "DataSourceStreamReader":
+        try:
+            return self.streamReader(schema=schema)
+        except PySparkNotImplementedError:
+            return 
_SimpleStreamReaderWrapper(self.simpleStreamReader(schema=schema))
+
+    def simpleStreamReader(self, schema: StructType) -> 
"SimpleDataSourceStreamReader":
+        """
+        Returns a :class:`SimpleDataSourceStreamReader` instance for reading 
data.
+
+        One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming
+        data source.

Review Comment:
   Can we be more explicit about when users should choose streamReader versus 
simpleStreamReader here? This information will be included in the API 
documentation for this class.



##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -51,6 +52,71 @@
 )
 
 
+def records_to_arrow_batches(
+    output_iter: Iterator[Tuple],
+    max_arrow_batch_size: int,
+    return_type: StructType,
+    data_source: DataSource,
+) -> Iterable[pa.RecordBatch]:

Review Comment:
   Let's add some docstring for this function.



##########
python/pyspark/sql/datasource.py:
##########
@@ -469,6 +501,200 @@ def stop(self) -> None:
         ...
 
 
+class SimpleInputPartition(InputPartition):

Review Comment:
   Why do we need this in the public API? Why can't user define their own input 
partition class?



##########
python/pyspark/sql/datasource.py:
##########
@@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: 
bool) -> "DataSourceStream
             message_parameters={"feature": "streamWriter"},
         )
 
+    def _streamReader(self, schema: StructType) -> "DataSourceStreamReader":

Review Comment:
   Why do we need this `_streamReader` in datasource API?



##########
python/pyspark/sql/datasource.py:
##########
@@ -469,6 +501,200 @@ def stop(self) -> None:
         ...
 
 
+class SimpleInputPartition(InputPartition):
+    def __init__(self, start: dict, end: dict):
+        self.start = start
+        self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+    def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):
+        self.start = start
+        self.end = end
+        self.it = it
+
+
+class SimpleDataSourceStreamReader(ABC):
+    """
+    A base class for simplified streaming data source readers.
+    Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+    require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+    allows reading data and planning the latest offset at the same time.
+
+    .. versionadded: 4.0.0
+    """
+
+    def initialOffset(self) -> dict:
+        """
+        Return the initial offset of the streaming data source.
+        A new streaming query starts reading data from the initial offset.
+        If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+        rather than the initial one.
+
+        Returns
+        -------
+        dict
+            A dict or recursive dict whose key and value are primitive types, 
which includes
+            Integer, String and Boolean.
+
+        Examples
+        --------
+        >>> def initialOffset(self):
+        ...     return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+        """
+        raise PySparkNotImplementedError(
+            error_class="NOT_IMPLEMENTED",
+            message_parameters={"feature": "initialOffset"},
+        )
+
+    def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+        """
+        Read all available data from start offset and return the offset that 
next read attempt
+        starts from.
+
+        Parameters
+        ----------
+        start : dict
+            The start offset to start reading from.
+
+        Returns
+        -------
+        A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+            The iterator contains all the available records after start offset.
+            The dict is the end offset of this read attempt and the start of 
next read attempt.
+        """
+        raise PySparkNotImplementedError(
+            error_class="NOT_IMPLEMENTED",
+            message_parameters={"feature": "read"},
+        )
+
+    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+        """
+        Read all available data from specific start offset and end offset.
+        This is invoked during failure recovery to re-read a batch 
deterministically
+        in order to achieve exactly once.
+
+        Parameters
+        ----------
+        start : dict
+            The start offset to start reading from.
+
+        end : dict
+            The offset where the reading stop.
+
+        Returns
+        -------
+        iterator of :class:`Tuple`\\s
+            All the records between start offset and end offset.
+        """
+        raise PySparkNotImplementedError(
+            error_class="NOT_IMPLEMENTED",
+            message_parameters={"feature": "read2"},

Review Comment:
   read2? Should this be `readBetweenOffsets`?



##########
python/pyspark/sql/datasource.py:
##########
@@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: 
bool) -> "DataSourceStream
             message_parameters={"feature": "streamWriter"},
         )
 
+    def _streamReader(self, schema: StructType) -> "DataSourceStreamReader":
+        try:
+            return self.streamReader(schema=schema)
+        except PySparkNotImplementedError:
+            return 
_SimpleStreamReaderWrapper(self.simpleStreamReader(schema=schema))
+
+    def simpleStreamReader(self, schema: StructType) -> 
"SimpleDataSourceStreamReader":
+        """
+        Returns a :class:`SimpleDataSourceStreamReader` instance for reading 
data.
+
+        One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming
+        data source.
+
+        Parameters
+        ----------
+        schema : :class:`StructType`
+            The schema of the data to be read.
+
+        Returns
+        -------
+        reader : :class:`SimpleDataSourceStreamReader`
+            A reader instance for this data source.
+        """
+        raise PySparkNotImplementedError(
+            error_class="NOT_IMPLEMENTED",
+            message_parameters={"feature": "simpleStreamReader"},
+        )
+
     def streamReader(self, schema: StructType) -> "DataSourceStreamReader":
         """
         Returns a :class:`DataSourceStreamReader` instance for reading 
streaming data.
 
-        The implementation is required for readable streaming data sources.
+        One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming
+        data source.

Review Comment:
   ditto



##########
python/pyspark/sql/datasource.py:
##########
@@ -469,6 +501,200 @@ def stop(self) -> None:
         ...
 
 
+class SimpleInputPartition(InputPartition):
+    def __init__(self, start: dict, end: dict):
+        self.start = start
+        self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):

Review Comment:
   Ditto why do we need this in the public API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to