chaoqin-li1123 commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1590520510


##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
                     row.append(value)
                 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+    class RangePartition(InputPartition):
+        def __init__(self, start, end):
+            self.start = start
+            self.end = end
+
+    class FakeStreamReader(DataSourceStreamReader):
+        def __init__(self, schema, options):
+            self.current = 0
+
+        def initialOffset(self):
+            return {"offset": 0}
+
+        def latestOffset(self):
+            self.current += 2
+            return {"offset": self.current}
+
+        def partitions(self, start, end):
+            return [RangePartition(start["offset"], end["offset"])]
+
+        def commit(self, end):
+            pass
+
+        def read(self, partition):
+            start, end = partition.start, partition.end
+            for i in range(start, end):
+                yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should 
return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the 
current latest offset that the next microbatch will read to.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the 
partitioning of the current microbatch defined by start and end offset, it 
needs to return a sequence of :class:`InputPartition` object.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition 
as an input and read an iterator of tuples from the data source.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when 
the query has finished processing data before end offset, this can be used to 
clean up resource.

Review Comment:
   Moved to doc string.



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
                     row.append(value)
                 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+    class RangePartition(InputPartition):
+        def __init__(self, start, end):
+            self.start = start
+            self.end = end
+
+    class FakeStreamReader(DataSourceStreamReader):
+        def __init__(self, schema, options):
+            self.current = 0
+
+        def initialOffset(self):

Review Comment:
   Typing added.



##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
                     row.append(value)
                 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+    class RangePartition(InputPartition):
+        def __init__(self, start, end):
+            self.start = start
+            self.end = end
+
+    class FakeStreamReader(DataSourceStreamReader):
+        def __init__(self, schema, options):
+            self.current = 0
+
+        def initialOffset(self):
+            return {"offset": 0}
+
+        def latestOffset(self):
+            self.current += 2
+            return {"offset": self.current}
+
+        def partitions(self, start, end):
+            return [RangePartition(start["offset"], end["offset"])]
+
+        def commit(self, end):
+            pass
+
+        def read(self, partition):
+            start, end = partition.start, partition.end
+            for i in range(start, end):
+                yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.

Review Comment:
   Moved under header.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to