[
https://issues.apache.org/jira/browse/SPARK-55450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-55450:
-----------------------------------
Labels: pull-request-available (was: )
> Document how to use Admission Control and Trigger.AvailableNow in PySpark
> custom streaming data sources
> ----------------------------------------------------------------------------------------------------------
>
> Key: SPARK-55450
> URL: https://issues.apache.org/jira/browse/SPARK-55450
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Affects Versions: 4.2.0
> Reporter: Jitesh Soni
> Priority: Minor
> Labels: pull-request-available
>
> Following the implementation in SPARK-55304 (PR #54085), PySpark now supports
> Admission Control and {{Trigger.AvailableNow}} for custom streaming data
> sources, bringing feature parity with Scala implementations.
> This ticket tracks the creation of user-facing documentation with practical
> examples showing how to implement these features in custom Python data
> sources.
> h2. Key Features to Document
> * *Updated {{latestOffset()}} signature* – Now accepts {{start}} offset and
> {{ReadLimit}} parameters.
> * *Optional {{getDefaultReadLimit()}}* – Allows sources to specify preferred
> data consumption limits.
> * *Optional {{reportLatestOffset()}}* – Enables tracking available data
> without consumption.
> * *ReadLimit framework* – Built-in implementations for controlling data
> volume.
> * *{{Trigger.AvailableNow}} support* – Via the
> {{SupportsTriggerAvailableNow}} mixin interface.
> h2. Usage Examples
> h3. Example 1: Basic Streaming Reader with Admission Control
>
> {{from pyspark.sql.datasource import (
> DataSourceStreamReader,
> InputPartition,
> ReadLimit,
> )
> class MyStreamReader(DataSourceStreamReader):
> """
> Custom streaming reader with admission control support.
> """
> def initialOffset(self):
> """Return the initial offset for the stream."""
> return \{"offset": 0}
> def latestOffset(self, start, limit):
> """
> Get the latest offset respecting the read limit.
> Args:
> start: The starting offset.
> limit: ReadLimit object controlling data volume.
> """
> current_offset = start.get("offset", 0)
> # Check available data
> available_records = self._count_available_records()
> # Apply limit if specified
> if isinstance(limit, ReadLimit):
> if hasattr(limit, "maxRows"):
> records_to_read = min(available_records, limit.maxRows())
> elif hasattr(limit, "maxFiles"):
> records_to_read = min(
> available_records,
> limit.maxFiles() * self.records_per_file,
> )
> else:
> records_to_read = available_records
> else:
> records_to_read = available_records
> return \{"offset": current_offset + records_to_read}
> def getDefaultReadLimit(self):
> """
> Optional: Specify default read limit for this source.
> """
> from pyspark.sql.datasource import ReadMaxRows
> # Read at most 1000 rows per batch
> return ReadMaxRows(1000)
> def read(self, start, end):
> """Read data between start and end offsets."""
> start_offset = start.get("offset", 0)
> end_offset = end.get("offset", 0)
> # Return InputPartition instances for the data range
> return [MyInputPartition(start_offset, end_offset)] }}
> h3. Example 2: Using `Trigger.AvailableNow`
>
> {{from pyspark.sql.datasource import (
> DataSourceStreamReader,
> SupportsTriggerAvailableNow,
> )
> class SnapshotStreamReader(DataSourceStreamReader,
> SupportsTriggerAvailableNow):
> """
> Streaming reader supporting Trigger.AvailableNow for snapshot processing.
> """
> def prepareForTriggerAvailableNow(self):
> """
> Prepare source for snapshot-based trigger.
> Called once when Trigger.AvailableNow is used.
> """
> # Capture snapshot of available data
> self.snapshot_offset = self._capture_current_state()
> print(f"Snapshot captured at offset: \{self.snapshot_offset}")
> def latestOffset(self, start, limit):
> """
> When using Trigger.AvailableNow, this should return the snapshot offset.
> """
> if hasattr(self, "snapshot_offset"):
> # Return snapshot boundary
> return \{"offset": self.snapshot_offset}
> else:
> # Normal streaming mode
> return \{"offset": self._get_current_offset()}
> # ... other required methods ...
> # Using the source with Trigger.AvailableNow
> df = (
> spark.readStream
> .format("mySnapshotSource")
> .load()
> )
> query = (
> df.writeStream
> .trigger(availableNow=True)
> .format("console")
> .start()
> )
> query.awaitTermination() }}
> h3. Example 3: Using `reportLatestOffset` for Monitoring
>
> {{from pyspark.sql.datasource import DataSourceStreamReader
> class MonitoredStreamReader(DataSourceStreamReader):
> """
> Reader that reports available data without consuming it.
> """
> def reportLatestOffset(self):
> """
> Optional: Report latest available offset without side effects.
> Used for monitoring and metrics.
> """
> return \{"offset": self._peek_latest_offset()}
> def latestOffset(self, start, limit):
> """
> Actual offset method that may have side effects.
> """
> latest = self._fetch_and_mark_offset(start, limit)
> return \{"offset": latest} }}
> h3. Example 4: `ReadLimit` Types
>
> {{from pyspark.sql.streaming import ReadLimit
> # Available ReadLimit implementations:
> # 1. ReadAllAvailable() - Read all available data.
> # 2. ReadMinRows(n) - Read at least n rows.
> # 3. ReadMaxRows(n) - Read at most n rows.
> # 4. ReadMaxFiles(n) - Read at most n files.
> # 5. ReadMaxBytes(n) - Read at most n bytes.
> # Configure read limits in stream options
> query = (
> spark.readStream
> .format("mySource")
> .option("maxFilesPerTrigger", "100")
> .option("maxBytesPerTrigger", "10mb")
> .load()
> .writeStream
> .format("console")
> .start()
> ) }}
> h2. Backward Compatibility Note
> The implementation automatically detects old-style {{latestOffset()}} methods
> (without parameters) using Python introspection, ensuring existing
> implementations continue to work without modification.
>
> {{# Old style – still supported
> def latestOffset(self):
> return \{"offset": 100}
> # New style – recommended
> def latestOffset(self, start, limit):
> return \{"offset": 100} }}
> h2. Reference
> * PR: [https://github.com/apache/spark/pull/54085]
> * JIRA: SPARK-55304
> h2. Acceptance Criteria
> * Code examples tested and validated.
> * Examples added to PySpark examples directory.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]