chaoqin-li1123 commented on code in PR #46139: URL: https://github.com/apache/spark/pull/46139#discussion_r1590525766
########## python/docs/source/user_guide/sql/python_data_source.rst: ########## @@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the `faker` library to p row.append(value) yield tuple(row) +**Implement the Stream Reader** + +.. code-block:: python + + class RangePartition(InputPartition): + def __init__(self, start, end): + self.start = start + self.end = end + + class FakeStreamReader(DataSourceStreamReader): + def __init__(self, schema, options): + self.current = 0 + + def initialOffset(self): + return {"offset": 0} + + def latestOffset(self): + self.current += 2 + return {"offset": self.current} + + def partitions(self, start, end): + return [RangePartition(start["offset"], end["offset"])] + + def commit(self, end): + pass + + def read(self, partition): + start, end = partition.start, partition.end + for i in range(start, end): + yield (i, str(i)) + +This is a dummy streaming data reader that generate 2 rows in every microbatch. The streamReader instance has a integer offset that increase by 2 in every microbatch. + +:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should return the initial start offset of the reader. + +:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the current latest offset that the next microbatch will read to. + +:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the partitioning of the current microbatch defined by start and end offset, it needs to return a sequence of :class:`InputPartition` object. + +:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition as an input and read an iterator of tuples from the data source. + +:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when the query has finished processing data before end offset, this can be used to clean up resource. + +**Implement the Simple Stream Reader** + +.. code-block:: python + + class SimpleStreamReader(SimpleDataSourceStreamReader): + def initialOffset(self): + return {"offset": 0} + + def read(self, start: dict): + start_idx = start["offset"] + it = iter([(i,) for i in range(start_idx, start_idx + 2)]) + return (it, {"offset": start_idx + 2}) + + def readBetweenOffsets(self, start: dict, end: dict): + start_idx = start["offset"] + end_idx = end["offset"] + return iter([(i,) for i in range(start_idx, end_idx)]) + + def commit(self, end): + pass + +If the data source has low throughput and doesn't require partitioning, you can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader. + +One of simpleStreamReader() and streamReader() must be implemented for readable streaming data source. And simpleStreamReader() will only be invoked when streamReader() is not implemented. + +This is the same dummy streaming reader that generate 2 rows every batch implemented with SimpleDataSourceStreamReader interface. + +:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.initialOffset` should return the initial start offset of the reader. + +:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.read` takes start offset as an input, return an iterator of tuples and the start offset of next read. + +:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.readBetweenOffsets` takes start and end offset as input and read an iterator of data deterministically. This is called whe query replay batches during restart or after failure. Review Comment: Instead of duplicating all the information in method docstring here, can we point user to refer to datasource.py as @HeartSaVioR suggested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org