HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580659454


##########
python/pyspark/sql/datasource_internal.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+    DataSource,
+    DataSourceStreamReader,
+    InputPartition,
+    SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+    """
+    Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+    This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+    invoking datasource.streamReader() directly.
+    """
+    try:
+        return datasource.streamReader(schema=schema)
+    except PySparkNotImplementedError:
+        return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+    def __init__(self, start: dict, end: dict):
+        self.start = start
+        self.end = end
+
+
+class PrefetchedCacheEntry:
+    def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+        self.start = start
+        self.end = end
+        self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+    """
+    A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+    so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+    ordinary :class:`DataSourceStreamReader`.
+
+    current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+    initialOffset() when query start for the first time or initialized to be 
the end offset of
+    the last committed batch when query restarts.
+
+    When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+    current_offset, prefetches and cache the data, then updates the 
current_offset to be
+    the end offset of the new data.
+
+    When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+    from cache and send it to JVM along with the input partitions.
+
+    When query restart, batches in write ahead offset log that has not been 
committed will be
+    replayed by reading data between start and end offset through read2(start, 
end).
+    """
+
+    def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+        self.simple_reader = simple_reader
+        self.initial_offset: Optional[dict] = None
+        self.current_offset: Optional[dict] = None
+        self.cache: List[PrefetchedCacheEntry] = []
+
+    def initialOffset(self) -> dict:
+        if self.initial_offset is None:
+            self.initial_offset = self.simple_reader.initialOffset()
+        return self.initial_offset
+
+    def latestOffset(self) -> dict:
+        # when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   OK, never mind. You are dealing with all the thing individually (not just 
leveraging DSv1 trick). Your comment seems a bit confusing - mentioning 
getBatch was the starting point I got confused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to