Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-22 Thread via GitHub


HeartSaVioR closed pull request #46139: [SPARK-47920][DOCS][SS][PYTHON] Add doc 
for python streaming data source API
URL: https://github.com/apache/spark/pull/46139


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-22 Thread via GitHub


HeartSaVioR commented on PR #46139:
URL: https://github.com/apache/spark/pull/46139#issuecomment-2124076427

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-22 Thread via GitHub


HeartSaVioR commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1609442247


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,9 +109,157 @@ Define the reader logic to generate synthetic data. Use 
the `faker` library to p
 row.append(value)
 yield tuple(row)
 
+Implementing Streaming Reader and Writer for Python Data Source
+---
+**Implement the Stream Reader**
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self) -> dict:
+"""
+Return the initial start offset of the reader.
+"""
+return {"offset": 0}
+
+def latestOffset(self) -> dict:
+"""
+Return the current latest offset that the next microbatch will 
read to.
+"""
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start: dict, end: dict):
+"""
+Plans the partitioning of the current microbatch defined by start 
and end offset,
+it needs to return a sequence of :class:`InputPartition` object.
+"""
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end: dict):
+"""
+This is invoked when the query has finished processing data before 
end offset, this can be used to clean up resource.
+"""
+pass
+
+def read(self, partition) -> Iterator[Tuple]:
+"""
+Takes a partition as an input and read an iterator of tuples from 
the data source.
+"""
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+**Implement the Simple Stream Reader**
+
+If the data source has low throughput and doesn't require partitioning, you 
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.

Review Comment:
   > which does not have a way to query given a start and an end offset
   
   This isn't still possible with simple stream reader. This is an essential 
requirement of streaming data source. If we want to enable this, it's likely 
that we'll need to replicate the data into durable storage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-20 Thread via GitHub


chaoqin-li1123 commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1607401423


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,9 +109,157 @@ Define the reader logic to generate synthetic data. Use 
the `faker` library to p
 row.append(value)
 yield tuple(row)
 
+Implementing Streaming Reader and Writer for Python Data Source
+---
+**Implement the Stream Reader**
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self) -> dict:
+"""
+Return the initial start offset of the reader.
+"""
+return {"offset": 0}
+
+def latestOffset(self) -> dict:
+"""
+Return the current latest offset that the next microbatch will 
read to.
+"""
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start: dict, end: dict):
+"""
+Plans the partitioning of the current microbatch defined by start 
and end offset,
+it needs to return a sequence of :class:`InputPartition` object.
+"""
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end: dict):
+"""
+This is invoked when the query has finished processing data before 
end offset, this can be used to clean up resource.
+"""
+pass
+
+def read(self, partition) -> Iterator[Tuple]:
+"""
+Takes a partition as an input and read an iterator of tuples from 
the data source.
+"""
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+**Implement the Simple Stream Reader**
+
+If the data source has low throughput and doesn't require partitioning, you 
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.
+
+One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming data source. And simpleStreamReader() will only be invoked 
when streamReader() is not implemented.
+
+This is the same dummy streaming reader that generate 2 rows every batch 
implemented with SimpleDataSourceStreamReader interface.
+
+.. code-block:: python
+
+class SimpleStreamReader(SimpleDataSourceStreamReader):
+def initialOffset(self):
+"""
+Return the initial start offset of the reader.
+"""
+return {"offset": 0}
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Takes start offset as an input, return an iterator of tuples and 
the start offset of next read.
+"""
+start_idx = start["offset"]
+it = iter([(i,) for i in range(start_idx, start_idx + 2)])
+return (it, {"offset": start_idx + 2})
+
+def readBetweenOffsets(self, start: dict, end: dict) -> 
Iterator[Tuple]:

Review Comment:
   This is required because we need to repeat the read in a range after query 
restart.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-20 Thread via GitHub


allisonwang-db commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1607398362


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,9 +101,158 @@ Define the reader logic to generate synthetic data. Use 
the `faker` library to p
 row.append(value)
 yield tuple(row)
 
+Implementing Streaming Reader and Writer for Python Data Source
+---
+**Implement the Stream Reader**
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self) -> dict:
+"""
+Return the initial start offset of the reader.
+"""
+return {"offset": 0}
+
+def latestOffset(self) -> dict:
+"""
+Return the current latest offset that the next microbatch will 
read to.
+"""
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start: dict, end: dict):
+"""
+Plans the partitioning of the current microbatch defined by start 
and end offset,
+it needs to return a sequence of :class:`InputPartition` object.
+"""
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end: dict):
+"""
+This is invoked when the query has finished processing data before 
end offset, this can be used to clean up resource.
+"""
+pass
+
+def read(self, partition) -> Iterator[Tuple]:
+"""
+Takes a partition as an input and read an iterator of tuples from 
the data source.
+"""
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+**Implement the Simple Stream Reader**
+
+If the data source has low throughput and doesn't require partitioning, you 
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.
+
+One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming data source. And simpleStreamReader() will only be invoked 
when streamReader() is not implemented.
+
+This is the same dummy streaming reader that generate 2 rows every batch 
implemented with SimpleDataSourceStreamReader interface.
+
+.. code-block:: python
+
+class SimpleStreamReader(SimpleDataSourceStreamReader):
+def initialOffset(self):
+"""
+Return the initial start offset of the reader.
+"""
+return {"offset": 0}
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Takes start offset as an input, return an iterator of tuples and 
the start offset of next read.
+"""
+start_idx = start["offset"]
+it = iter([(i,) for i in range(start_idx, start_idx + 2)])
+return (it, {"offset": start_idx + 2})
+
+def readBetweenOffsets(self, start: dict, end: dict) -> 
Iterator[Tuple]:
+"""
+Takes start and end offset as input and read an iterator of data 
deterministically.
+This is called whe query replay batches during restart or after 
failure.
+"""
+start_idx = start["offset"]
+end_idx = end["offset"]
+return iter([(i,) for i in range(start_idx, end_idx)])
+
+def commit(self, end):
+"""
+This is invoked when the query has finished processing data before 
end offset, this can be used to clean up resource.
+"""
+pass
+
+**Implement the Stream Writer**
+
+This is a streaming data writer that write the metadata information of each 
microbatch to a local path.
+
+.. code-block:: python
+
+class SimpleCommitMessage(WriterCommitMessage):
+   partition_id: int
+   count: int
+
+class FakeStreamWriter(DataSourceStreamWriter):
+   def __init__(self, options):
+   self.options = options
+   self.path = self.options.get("path")
+   assert self.path is not None
+
+   def write(self, iterator):
+   """
+   Write the data and return the commit message of that partition
+   """
+   from pyspark import TaskContext
+   context = TaskContext.get()
+   partition_id = context.partitionId()
+   cnt = 0
+   for row in iterator:
+   cnt += 1
+   return SimpleCo

Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-16 Thread via GitHub


chaoqin-li1123 commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1603979413


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -59,8 +59,17 @@ Start by creating a new subclass of :class:`DataSource`. 
Define the source name,
 def reader(self, schema: StructType):
 return FakeDataSourceReader(schema, self.options)
 
+def streamReader(self, schema: StructType):

Review Comment:
   I prefer not to duplicate the DataSource code. We already document that 
developer only need to implement the corresponding method for a certain 
capacity.



##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -33,9 +33,15 @@ To create a custom Python data source, you'll need to 
subclass the :class:`DataS
 
 This example demonstrates creating a simple data source to generate synthetic 
data using the `faker` library. Ensure the `faker` library is installed and 
accessible in your Python environment.
 
-**Step 1: Define the Data Source**
+**Define the Data Source**
 
-Start by creating a new subclass of :class:`DataSource`. Define the source 
name, schema, and reader logic as follows:
+Start by creating a new subclass of :class:`DataSource` with the source name, 
schema.
+
+In order to read from the data source in a batch query, reader() method need 
to be defined.
+
+In order to read from the data source in a streaming query, streamReader() or 
simpleStreamReader() method need to be defined.
+
+In order to write to the data source in a streaming query, streamWriter() 
method need to be defined.

Review Comment:
   Table added.



##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,9 +101,158 @@ Define the reader logic to generate synthetic data. Use 
the `faker` library to p
 row.append(value)
 yield tuple(row)
 
+Implementing Streaming Reader and Writer for Python Data Source
+---
+**Implement the Stream Reader**
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self) -> dict:
+"""
+Return the initial start offset of the reader.
+"""
+return {"offset": 0}
+
+def latestOffset(self) -> dict:
+"""
+Return the current latest offset that the next microbatch will 
read to.
+"""
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start: dict, end: dict):
+"""
+Plans the partitioning of the current microbatch defined by start 
and end offset,
+it needs to return a sequence of :class:`InputPartition` object.
+"""
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end: dict):
+"""
+This is invoked when the query has finished processing data before 
end offset, this can be used to clean up resource.
+"""
+pass
+
+def read(self, partition) -> Iterator[Tuple]:
+"""
+Takes a partition as an input and read an iterator of tuples from 
the data source.
+"""
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+**Implement the Simple Stream Reader**
+
+If the data source has low throughput and doesn't require partitioning, you 
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.
+
+One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming data source. And simpleStreamReader() will only be invoked 
when streamReader() is not implemented.
+
+This is the same dummy streaming reader that generate 2 rows every batch 
implemented with SimpleDataSourceStreamReader interface.
+
+.. code-block:: python
+
+class SimpleStreamReader(SimpleDataSourceStreamReader):
+def initialOffset(self):
+"""
+Return the initial start offset of the reader.
+"""
+return {"offset": 0}
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Takes start offset as an input, return an iterator of tuples and 
the start offset of next read.
+"""
+start_idx = start["offset"]
+it = iter([(i,) for i in range(start_idx, start_idx + 2)])
+return (it, {"offset": start

Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-16 Thread via GitHub


allisonwang-db commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1602361195


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -33,9 +33,15 @@ To create a custom Python data source, you'll need to 
subclass the :class:`DataS
 
 This example demonstrates creating a simple data source to generate synthetic 
data using the `faker` library. Ensure the `faker` library is installed and 
accessible in your Python environment.
 
-**Step 1: Define the Data Source**
+**Define the Data Source**
 
-Start by creating a new subclass of :class:`DataSource`. Define the source 
name, schema, and reader logic as follows:
+Start by creating a new subclass of :class:`DataSource` with the source name, 
schema.
+
+In order to read from the data source in a batch query, reader() method need 
to be defined.
+
+In order to read from the data source in a streaming query, streamReader() or 
simpleStreamReader() method need to be defined.
+
+In order to write to the data source in a streaming query, streamWriter() 
method need to be defined.

Review Comment:
   Do you think it's more clear to have a markdown table here? streaming/batch 
x read/write.



##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,9 +101,158 @@ Define the reader logic to generate synthetic data. Use 
the `faker` library to p
 row.append(value)
 yield tuple(row)
 
+Implementing Streaming Reader and Writer for Python Data Source
+---
+**Implement the Stream Reader**
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self) -> dict:
+"""
+Return the initial start offset of the reader.
+"""
+return {"offset": 0}
+
+def latestOffset(self) -> dict:
+"""
+Return the current latest offset that the next microbatch will 
read to.
+"""
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start: dict, end: dict):
+"""
+Plans the partitioning of the current microbatch defined by start 
and end offset,
+it needs to return a sequence of :class:`InputPartition` object.
+"""
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end: dict):
+"""
+This is invoked when the query has finished processing data before 
end offset, this can be used to clean up resource.
+"""
+pass
+
+def read(self, partition) -> Iterator[Tuple]:
+"""
+Takes a partition as an input and read an iterator of tuples from 
the data source.
+"""
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+**Implement the Simple Stream Reader**
+
+If the data source has low throughput and doesn't require partitioning, you 
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.
+
+One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming data source. And simpleStreamReader() will only be invoked 
when streamReader() is not implemented.
+
+This is the same dummy streaming reader that generate 2 rows every batch 
implemented with SimpleDataSourceStreamReader interface.
+
+.. code-block:: python
+
+class SimpleStreamReader(SimpleDataSourceStreamReader):
+def initialOffset(self):
+"""
+Return the initial start offset of the reader.
+"""
+return {"offset": 0}
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Takes start offset as an input, return an iterator of tuples and 
the start offset of next read.
+"""
+start_idx = start["offset"]
+it = iter([(i,) for i in range(start_idx, start_idx + 2)])
+return (it, {"offset": start_idx + 2})
+
+def readBetweenOffsets(self, start: dict, end: dict) -> 
Iterator[Tuple]:
+"""
+Takes start and end offset as input and read an iterator of data 
deterministically.
+This is called whe query replay batches during restart or after 
failure.
+"""
+start_idx = start["offset"]
+end_idx = end["offset"]
+return iter([(i,) for i in range(start_id

Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-06 Thread via GitHub


chaoqin-li1123 commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1591369804


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
 row.append(value)
 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self):
+return {"offset": 0}
+
+def latestOffset(self):
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start, end):
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end):
+pass
+
+def read(self, partition):
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should 
return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the 
current latest offset that the next microbatch will read to.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the 
partitioning of the current microbatch defined by start and end offset, it 
needs to return a sequence of :class:`InputPartition` object.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition 
as an input and read an iterator of tuples from the data source.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when 
the query has finished processing data before end offset, this can be used to 
clean up resource.
+
+**Implement the Simple Stream Reader**
+
+.. code-block:: python
+
+class SimpleStreamReader(SimpleDataSourceStreamReader):
+def initialOffset(self):
+return {"offset": 0}
+
+def read(self, start: dict):
+start_idx = start["offset"]
+it = iter([(i,) for i in range(start_idx, start_idx + 2)])
+return (it, {"offset": start_idx + 2})
+
+def readBetweenOffsets(self, start: dict, end: dict):
+start_idx = start["offset"]
+end_idx = end["offset"]
+return iter([(i,) for i in range(start_idx, end_idx)])
+
+def commit(self, end):
+pass
+
+If the data source has low throughput and doesn't require partitioning, you 
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.
+
+One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming data source. And simpleStreamReader() will only be invoked 
when streamReader() is not implemented.
+
+This is the same dummy streaming reader that generate 2 rows every batch 
implemented with SimpleDataSourceStreamReader interface.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.initialOffset` 
should return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.read` takes start 
offset as an input, return an iterator of tuples and the start offset of next 
read.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.readBetweenOffsets` 
takes start and end offset as input and read an iterator of data 
deterministically. This is called whe query replay batches during restart or 
after failure.

Review Comment:
   What is the recommended way to link to datasource.py from this doc file? 
@HyukjinKwon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-06 Thread via GitHub


allisonwang-db commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1591322341


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
 row.append(value)
 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self):
+return {"offset": 0}
+
+def latestOffset(self):
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start, end):
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end):
+pass
+
+def read(self, partition):
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should 
return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the 
current latest offset that the next microbatch will read to.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the 
partitioning of the current microbatch defined by start and end offset, it 
needs to return a sequence of :class:`InputPartition` object.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition 
as an input and read an iterator of tuples from the data source.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when 
the query has finished processing data before end offset, this can be used to 
clean up resource.
+
+**Implement the Simple Stream Reader**
+
+.. code-block:: python
+
+class SimpleStreamReader(SimpleDataSourceStreamReader):
+def initialOffset(self):
+return {"offset": 0}
+
+def read(self, start: dict):
+start_idx = start["offset"]
+it = iter([(i,) for i in range(start_idx, start_idx + 2)])
+return (it, {"offset": start_idx + 2})
+
+def readBetweenOffsets(self, start: dict, end: dict):
+start_idx = start["offset"]
+end_idx = end["offset"]
+return iter([(i,) for i in range(start_idx, end_idx)])
+
+def commit(self, end):
+pass
+
+If the data source has low throughput and doesn't require partitioning, you 
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.
+
+One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming data source. And simpleStreamReader() will only be invoked 
when streamReader() is not implemented.
+
+This is the same dummy streaming reader that generate 2 rows every batch 
implemented with SimpleDataSourceStreamReader interface.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.initialOffset` 
should return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.read` takes start 
offset as an input, return an iterator of tuples and the start offset of next 
read.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.readBetweenOffsets` 
takes start and end offset as input and read an iterator of data 
deterministically. This is called whe query replay batches during restart or 
after failure.

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-05 Thread via GitHub


chaoqin-li1123 commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1590525766


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
 row.append(value)
 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self):
+return {"offset": 0}
+
+def latestOffset(self):
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start, end):
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end):
+pass
+
+def read(self, partition):
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should 
return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the 
current latest offset that the next microbatch will read to.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the 
partitioning of the current microbatch defined by start and end offset, it 
needs to return a sequence of :class:`InputPartition` object.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition 
as an input and read an iterator of tuples from the data source.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when 
the query has finished processing data before end offset, this can be used to 
clean up resource.
+
+**Implement the Simple Stream Reader**
+
+.. code-block:: python
+
+class SimpleStreamReader(SimpleDataSourceStreamReader):
+def initialOffset(self):
+return {"offset": 0}
+
+def read(self, start: dict):
+start_idx = start["offset"]
+it = iter([(i,) for i in range(start_idx, start_idx + 2)])
+return (it, {"offset": start_idx + 2})
+
+def readBetweenOffsets(self, start: dict, end: dict):
+start_idx = start["offset"]
+end_idx = end["offset"]
+return iter([(i,) for i in range(start_idx, end_idx)])
+
+def commit(self, end):
+pass
+
+If the data source has low throughput and doesn't require partitioning, you 
can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.
+
+One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming data source. And simpleStreamReader() will only be invoked 
when streamReader() is not implemented.
+
+This is the same dummy streaming reader that generate 2 rows every batch 
implemented with SimpleDataSourceStreamReader interface.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.initialOffset` 
should return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.read` takes start 
offset as an input, return an iterator of tuples and the start offset of next 
read.
+
+:meth:`pyspark.sql.datasource.SimpleDataSourceStreamReader.readBetweenOffsets` 
takes start and end offset as input and read an iterator of data 
deterministically. This is called whe query replay batches during restart or 
after failure.

Review Comment:
   Instead of duplicating all the information in method docstring here, can we 
point user to refer to datasource.py as @HeartSaVioR suggested?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-05 Thread via GitHub


chaoqin-li1123 commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1590521175


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -59,8 +59,17 @@ Start by creating a new subclass of :class:`DataSource`. 
Define the source name,
 def reader(self, schema: StructType):
 return FakeDataSourceReader(schema, self.options)
 
+def streamReader(self, schema: StructType):
+return FakeStreamReader(schema, self.options)
 
-**Step 2: Implement the Reader**
+def simpleStreamReader(self, schema: StructType):

Review Comment:
   Comment added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-05 Thread via GitHub


chaoqin-li1123 commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1590520752


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -59,8 +59,17 @@ Start by creating a new subclass of :class:`DataSource`. 
Define the source name,
 def reader(self, schema: StructType):
 return FakeDataSourceReader(schema, self.options)
 
+def streamReader(self, schema: StructType):
+return FakeStreamReader(schema, self.options)
 
-**Step 2: Implement the Reader**
+def simpleStreamReader(self, schema: StructType):
+return SimpleStreamReader()
+
+def streamWriter(self, schema: StructType, overwrite: bool):
+return FakeStreamWriter(self.options)
+

Review Comment:
   Section separated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-05 Thread via GitHub


chaoqin-li1123 commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1590520510


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
 row.append(value)
 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self):
+return {"offset": 0}
+
+def latestOffset(self):
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start, end):
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end):
+pass
+
+def read(self, partition):
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should 
return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the 
current latest offset that the next microbatch will read to.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the 
partitioning of the current microbatch defined by start and end offset, it 
needs to return a sequence of :class:`InputPartition` object.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition 
as an input and read an iterator of tuples from the data source.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when 
the query has finished processing data before end offset, this can be used to 
clean up resource.

Review Comment:
   Moved to doc string.



##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
 row.append(value)
 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self):

Review Comment:
   Typing added.



##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
 row.append(value)
 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self):
+return {"offset": 0}
+
+def latestOffset(self):
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start, end):
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end):
+pass
+
+def read(self, partition):
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.

Review Comment:
   Moved under header.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-05 Thread via GitHub


chaoqin-li1123 commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1590520296


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -137,3 +271,21 @@ Use the fake datasource with a different number of rows:
 # |  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
 # | Douglas James|2007-01-18|  46226| Alabama|
 # +--+--+---++
+
+Start a streaming query with the fake data stream. Once we register the python 
data source, we can use it as source of readStream() or sink of writeStream() 
by passing short name or full name to format().
+
+.. code-block:: python
+
+query = 
spark.readStream.format("fake").load().writeStream().format("fake").start()

Review Comment:
   Example output added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-02 Thread via GitHub


HeartSaVioR commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1588609079


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
 row.append(value)
 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self):
+return {"offset": 0}
+
+def latestOffset(self):
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start, end):
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end):
+pass
+
+def read(self, partition):
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should 
return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the 
current latest offset that the next microbatch will read to.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the 
partitioning of the current microbatch defined by start and end offset, it 
needs to return a sequence of :class:`InputPartition` object.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition 
as an input and read an iterator of tuples from the data source.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when 
the query has finished processing data before end offset, this can be used to 
clean up resource.

Review Comment:
   +1



##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
 row.append(value)
 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self):
+return {"offset": 0}
+
+def latestOffset(self):
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start, end):
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end):
+pass
+
+def read(self, partition):
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.

Review Comment:
   +1



##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -59,8 +59,17 @@ Start by creating a new subclass of :class:`DataSource`. 
Define the source name,
 def reader(self, schema: StructType):
 return FakeDataSourceReader(schema, self.options)
 
+def streamReader(self, schema: StructType):
+return FakeStreamReader(schema, self.options)
 
-**Step 2: Implement the Reader**
+def simpleStreamReader(self, schema: StructType):

Review Comment:
   This could lead to confusion that they have to implement both, not either 
one. Could we please add comment or just duplicate the data source and describe 
separately?



##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -59,8 +59,17 @@ Start by creating a new subclass of :class:`DataSource`. 
Define the source name,
 def reader(self, schema: StructType):
 return FakeDataSourceReader(schema, self.options)
 
+def streamReader(self, schema: StructType):

Review Comment:
   Given they will see the example from the start, it could give a confusion 
that every data source has to implement both batch and streaming. Probably 
better to have some explanation.



##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -137,3 +271,21 @@ Use the fake datasource with a different number of rows:
 # |  Caitlin Reed|1983-06-22|  89813|Pennsyl

Re: [PR] [SPARK-47920][DOCS][SS][PYTHON] Add doc for python streaming data source API [spark]

2024-05-01 Thread via GitHub


allisonwang-db commented on code in PR #46139:
URL: https://github.com/apache/spark/pull/46139#discussion_r1586863445


##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
 row.append(value)
 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self):
+return {"offset": 0}
+
+def latestOffset(self):
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start, end):
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end):
+pass
+
+def read(self, partition):
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should 
return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the 
current latest offset that the next microbatch will read to.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the 
partitioning of the current microbatch defined by start and end offset, it 
needs to return a sequence of :class:`InputPartition` object.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition 
as an input and read an iterator of tuples from the data source.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when 
the query has finished processing data before end offset, this can be used to 
clean up resource.

Review Comment:
   Can we move these explanations into the docstring of the function?



##
python/docs/source/user_guide/sql/python_data_source.rst:
##
@@ -84,6 +93,131 @@ Define the reader logic to generate synthetic data. Use the 
`faker` library to p
 row.append(value)
 yield tuple(row)
 
+**Implement the Stream Reader**
+
+.. code-block:: python
+
+class RangePartition(InputPartition):
+def __init__(self, start, end):
+self.start = start
+self.end = end
+
+class FakeStreamReader(DataSourceStreamReader):
+def __init__(self, schema, options):
+self.current = 0
+
+def initialOffset(self):
+return {"offset": 0}
+
+def latestOffset(self):
+self.current += 2
+return {"offset": self.current}
+
+def partitions(self, start, end):
+return [RangePartition(start["offset"], end["offset"])]
+
+def commit(self, end):
+pass
+
+def read(self, partition):
+start, end = partition.start, partition.end
+for i in range(start, end):
+yield (i, str(i))
+
+This is a dummy streaming data reader that generate 2 rows in every 
microbatch. The streamReader instance has a integer offset that increase by 2 
in every microbatch.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.initialOffset` should 
return the initial start offset of the reader.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.latestOffset` return the 
current latest offset that the next microbatch will read to.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.partitions` plans the 
partitioning of the current microbatch defined by start and end offset, it 
needs to return a sequence of :class:`InputPartition` object.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.read` takes a partition 
as an input and read an iterator of tuples from the data source.
+
+:meth:`pyspark.sql.datasource.DataSourceStreamReader.commit` is invoked when 
the query has finished processing data before end offset, this can be used to 
clean up resource.
+
+**Implement the Simple Stream Reader**
+
+.. code-block:: python
+
+class SimpleStreamReader(SimpleDataSourceStreamReader):
+def initialOffset(self):
+return {"offset": 0}
+
+def read(self, start: dict):
+start_idx = start["offset"]
+it = iter([(i,) for i in range(start_idx, start_idx + 2)])
+return (it, {"offset": start_idx + 2})
+
+def readBetweenOffsets(self, start: dict, end: dict):
+start_idx = start["offset"]
+end_idx = end["offset"]
+return iter([(i,) for i in range(start_