HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580604894


##########
python/pyspark/sql/datasource_internal.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+    DataSource,
+    DataSourceStreamReader,
+    InputPartition,
+    SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+    """
+    Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+    This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+    invoking datasource.streamReader() directly.
+    """
+    try:
+        return datasource.streamReader(schema=schema)
+    except PySparkNotImplementedError:
+        return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+    def __init__(self, start: dict, end: dict):
+        self.start = start
+        self.end = end
+
+
+class PrefetchedCacheEntry:
+    def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+        self.start = start
+        self.end = end
+        self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+    """
+    A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+    so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+    ordinary :class:`DataSourceStreamReader`.
+
+    current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+    initialOffset() when query start for the first time or initialized to be 
the end offset of
+    the last committed batch when query restarts.
+
+    When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+    current_offset, prefetches and cache the data, then updates the 
current_offset to be
+    the end offset of the new data.
+
+    When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+    from cache and send it to JVM along with the input partitions.
+
+    When query restart, batches in write ahead offset log that has not been 
committed will be
+    replayed by reading data between start and end offset through read2(start, 
end).
+    """
+
+    def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+        self.simple_reader = simple_reader
+        self.initial_offset: Optional[dict] = None
+        self.current_offset: Optional[dict] = None
+        self.cache: List[PrefetchedCacheEntry] = []
+
+    def initialOffset(self) -> dict:
+        if self.initial_offset is None:
+            self.initial_offset = self.simple_reader.initialOffset()
+        return self.initial_offset
+
+    def latestOffset(self) -> dict:
+        # when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   Actually this is the hard part of implementing prefetcher for SS data 
source. When the query restarts, we assume that prefetcher would be able to 
start from known committed offset. Unfortunately that is not true. You've 
mentioned that this relies on getBatch trick but that's only applicable with 
DSv1 and it's clearly a hack to address some specific data source.
   
   We have an interface `AcceptsLatestSeenOffset` for this case (you need to 
adopt this on determining the start offset for prefetching), but this still 
does not give you the last committed offset but the latest seen offset, so 
Spark could still request the offset range before this offset. Though it would 
work if the simple data source reader can work with all 
planned-but-not-yet-committed offset range without relying on prefetcher. 
prefetcher can start prefetching with latest seen offset and previous offset 
range should be covered with planned batch(es).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to