HeartSaVioR commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1609442247


##########
python/docs/source/user_guide/sql/python_data_source.rst:
##########
@@ -84,9 +109,157 @@ Define the reader logic to generate synthetic data. Use 
the `faker` library to p
                     row.append(value)
                 yield tuple(row)
 
+Implementing Streaming Reader and Writer for Python Data Source
+---------------------------------------------------------------
+**Implement the Stream Reader**
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+.. code-block:: python
+
+    class RangePartition(InputPartition):
+        def __init__(self, start, end):
+            self.start = start
+            self.end = end
+
+    class FakeStreamReader(DataSourceStreamReader):
+        def __init__(self, schema, options):
+            self.current = 0
+
+        def initialOffset(self) -> dict:
+            """
+            Return the initial start offset of the reader.
+            """
+            return {"offset": 0}
+
+        def latestOffset(self) -> dict:
+            """
+            Return the current latest offset that the next microbatch will 
read to.
+            """
+            self.current += 2
+            return {"offset": self.current}
+
+        def partitions(self, start: dict, end: dict):
+            """
+            Plans the partitioning of the current microbatch defined by start 
and end offset,
+            it needs to return a sequence of :class:`InputPartition` object.
+            """
+            return [RangePartition(start["offset"], end["offset"])]
+
+        def commit(self, end: dict):
+            """
+            This is invoked when the query has finished processing data before 
end offset, this can be used to clean up resource.
+            """
+            pass
+
+        def read(self, partition) -> Iterator[Tuple]:
+            """
+            Takes a partition as an input and read an iterator of tuples from 
the data source.
+            """
+            start, end = partition.start, partition.end
+            for i in range(start, end):
+                yield (i, str(i))
+
+**Implement the Simple Stream Reader**
+
+If the data source has low throughput and doesn't require partitioning, you 
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.

Review Comment:
   > which does not have a way to query given a start and an end offset
   
   This isn't still possible with simple stream reader. This is an essential 
requirement of streaming data source. If we want to enable this, it's likely 
that we'll need to replicate the data into durable storage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to