JulianJaffePinterest opened a new pull request #11823:
URL: https://github.com/apache/druid/pull/11823
Add code, utilities, tests, and documentation for reading data from Druid
using Spark.
(See #10920 and #11474 for context)
This PR splits out the reader logic from #10920 into a standalone PR as
discussed. This PR also reworks how the connector handles segment loading to
allow for more extensibility.
### Description
At a high level, this connector reads data from Druid into Spark by querying
the Druid metadata SQL server for the segment load specs for the specified data
source and time period and then fetching and loading those segments from deep
storage. On the Spark side, querying the Druid cluster and determining which
segments to load is done on the driver side, while the actual fetching and
loading of segment files is done on the executors.
A more granular walk-through follows:
#### Determining which segments to load
When a user calls `.load()` to read in a Druid data source into a Spark
DataFrame, the first step for the connector is to determine which segments need
to be read. This is done by querying a Druid cluster's backing metadata
database for the specified data source and time range via the
`DruidMetadataClient` (we need to query the backing SQL database directly
because segment load specs are pruned when served from the broker).
Additionally, if the user does not provide a schema for the data source, we
need to construct the correct schema ourselves. We do this via SegmentMetadata
queries sent to a Druid broker.
##### Key Classes
`DruidDataSourceReader`
`DruidMetadataClient`
`DruidClient` (already reviewed in #11474)
#### Distributing the segments
Once we've determined which Druid segments we need to load, we need to
assign the segments to Spark partitions that will actually do the reading. For
now, we simply assign each Druid segment file to a Spark partition, although in
the future this could be extended to do smarter things (for instance, we could
map multiple Druid segments to a single Spark partition and thus allow the user
to specify how many Spark partitions they wanted their DataFrame to have,
regardless of the number of underlying Druid segment files).
##### Key Classes
`DruidDataSourceReader` (the `PlanInputPartition` methods)
`DruidInputPartition`
#### Reading the data
Once we've assigned a Druid segment file to a Spark partition, we need to
actually fetch the segment file from deep storage and read its data. This is
handled by an `InputPartitionReader` (`DruidInputPartitionReader` for row-based
reads; `DruidColumnarInputPartitionReader` for vectorized reads). Using either
default or custom, user-provided logic the input partition reader pulls the
segment file from deep storage and opens it as a Druid QueryableIndex. The
reader then applies any pushed-down filters, projects the specified columns (if
the user provided a schema explicitly), and fills Spark rows or vectors with
the segment's data.
A key piece here (and an enhancement from #10920) is the logic used to pull
the segment file from deep storage. There are two supported approaches: by
default, the partition reader will attempt to deserialize the load spec for it
assigned segment and delegate to a Druid DataSegmentPuller to handle fetching.
This requires a user to configure the reader with all necessary properties to
connect and authenticate to their deep storage, but all Druid "core" deep
storages (local, HDFS, S3, GCS, and Azure) are supported. Alternatively, users
can defer to their Spark application's configuration. In this scenario, the
reader extracts a URI from it's assigned segment's load spec and then
constructs a FileSystem from the Spark application's configuration. The reader
uses the file system to pull the extracted URI, meaning that users are not
responsible for handling connection and authentication to deep storage. This
second case is useful for users running on clusters that rely on GCS ADCs or A
WS IAM roles for machine authorization to GCS/S3, or for clusters that manage
access keys for their users. Only local, HDFS, S3, and GCS deep storage
implementation are supported out of the box for this approach (Azure users will
need to use the first approach or register a custom load function via the
`SegmentReaderRegistry`).
##### Key Classes
`DruidBaseInputPartitionReader`
`DruidColumnarInputPartitionReader`
`DruidInputPartitionReader`
`SegmentReaderRegistry`
#### User interface
Users use the connector like any other Spark reader: they call `.read` on
their Spark session, set the format (in this case, `"druid"`), specify the
properties to use, and then call `load()`. For example:
```scala
sparkSession
.read
.format("druid")
.options(Map[String, String](
"metadata.dbType" -> "mysql",
"metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",
"metadata.user" -> "druid",
"metadata.password" -> "diurd",
"broker.host" -> "localhost",
"broker.port" -> 8082,
"table" -> "dataSource",
"reader.deepStorageType" -> "local",
"local.storageDirectory" -> "/mnt/druid/druid-segments/"
))
.load()
```
For convenience, a more strongly typed way to apply configure the reader is
also provided:
```
import org.apache.druid.spark.DruidDataFrameReader
val deepStorageConfig = new
LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
sparkSession
.read
.brokerHost("localhost")
.brokerPort(8082)
.metadataDbType("mysql")
.metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
.metadataUser("druid")
.metadataPassword("diurd")
.dataSource("dataSource")
.deepStorage(deepStorageConfig)
.druid()
```
User documentation with examples and configuration option descriptions is
provided in [spark.md](docs/operations/spark.md).
Additionally, because the connector does not run in a Druid cluster, we
can't use Druid's dependency injection to transparently handle custom
extensions and behavior. In order to support users who use unsupported or
custom complex metrics, deep storage implementations, or metadata databases,
the connector uses a plugin-based architecture. All Druid core extensions are
supported out of the box. If users need to use their own custom logic, they can
register the appropriate functions with the corresponding registry
(`ComplexMetricRegistry` for complex metrics, `SQLConnectorRegistry` for
metadata databases, and `SegmentReaderRegistry` for loading segments from deep
storage).
##### Key Classes
`ComplexMetricRegistry`
`SQLConnectorRegistry`
`SegmentReaderRegistry`
`DruidConfigurationKeys`
`spark.md`
<hr>
<!-- Check the items by putting "x" in the brackets for the done things. Not
all of these items apply to every PR. Remove the items which are not done or
not relevant to the PR. None of the items from the checklist below are strictly
necessary, but it would be very helpful if you at least self-review the PR. -->
This PR has:
- [x] been self-reviewed.
- [ ] using the [concurrency
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
(Remove this item if the PR doesn't have any relation to concurrency.)
- [x] added documentation for new or modified features or behaviors.
- [x] added Javadocs for most classes and all non-trivial methods. Linked
related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
- [x] added comments explaining the "why" and the intent of the code
wherever would not be obvious for an unfamiliar reader.
- [x] added unit tests or modified existing tests to cover new code paths,
ensuring the threshold for [code
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
is met.
- [ ] added integration tests.
- [x] been tested in a test Druid cluster.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]