JulianJaffePinterest opened a new pull request #11823:
URL: https://github.com/apache/druid/pull/11823


   Add code, utilities, tests, and documentation for reading data from Druid 
using Spark.
   
   (See #10920  and #11474 for context)
   
   This PR splits out the reader logic from #10920 into a standalone PR as 
discussed. This PR also reworks how the connector handles segment loading to 
allow for more extensibility.
   
   ### Description
   
   
   At a high level, this connector reads data from Druid into Spark by querying 
the Druid metadata SQL server for the segment load specs for the specified data 
source and time period and then fetching and loading those segments from deep 
storage. On the Spark side, querying the Druid cluster and determining which 
segments to load is done on the driver side, while the actual fetching and 
loading of segment files is done on the executors.
   
   A more granular walk-through follows:
   
   #### Determining which segments to load
   
   When a user calls `.load()` to read in a Druid data source into a Spark 
DataFrame, the first step for the connector is to determine which segments need 
to be read. This is done by querying a Druid cluster's backing metadata 
database for the specified data source and time range via the 
`DruidMetadataClient` (we need to query the backing SQL database directly 
because segment load specs are pruned when served from the broker). 
Additionally, if the user does not provide a schema for the data source, we 
need to construct the correct schema ourselves. We do this via SegmentMetadata 
queries sent to a Druid broker.
   
   ##### Key Classes
   `DruidDataSourceReader`
   `DruidMetadataClient`
   `DruidClient` (already reviewed in #11474)
   
   #### Distributing the segments
   
   Once we've determined which Druid segments we need to load, we need to 
assign the segments to Spark partitions that will actually do the reading. For 
now, we simply assign each Druid segment file to a Spark partition, although in 
the future this could be extended to do smarter things (for instance, we could 
map multiple Druid segments to a single Spark partition and thus allow the user 
to specify how many Spark partitions they wanted their DataFrame to have, 
regardless of the number of underlying Druid segment files).
   
   ##### Key Classes
   `DruidDataSourceReader` (the `PlanInputPartition` methods)
   `DruidInputPartition`
   
   #### Reading the data
   
   Once we've assigned a Druid segment file to a Spark partition, we need to 
actually fetch the segment file from deep storage and read its data. This is 
handled by an `InputPartitionReader` (`DruidInputPartitionReader` for row-based 
reads; `DruidColumnarInputPartitionReader` for vectorized reads). Using either 
default or custom, user-provided logic the input partition reader pulls the 
segment file from deep storage and opens it as a Druid QueryableIndex. The 
reader then applies any pushed-down filters, projects the specified columns (if 
the user provided a schema explicitly), and fills Spark rows or vectors with 
the segment's data.
   
   A key piece here (and an enhancement from #10920) is the logic used to pull 
the segment file from deep storage. There are two supported approaches: by 
default, the partition reader will attempt to deserialize the load spec for it 
assigned segment and delegate to a Druid DataSegmentPuller to handle fetching. 
This requires a user to configure the reader with all necessary properties to 
connect and authenticate to their deep storage, but all Druid "core" deep 
storages (local, HDFS, S3, GCS, and Azure) are supported. Alternatively, users 
can defer to their Spark application's configuration. In this scenario, the 
reader extracts a URI from it's assigned segment's load spec and then 
constructs a FileSystem from the Spark application's configuration. The reader 
uses the file system to pull the extracted URI, meaning that users are not 
responsible for handling connection and authentication to deep storage. This 
second case is useful for users running on clusters that rely on GCS ADCs or A
 WS IAM roles for machine authorization to GCS/S3, or for clusters that manage 
access keys for their users. Only local, HDFS, S3, and GCS deep storage 
implementation are supported out of the box for this approach (Azure users will 
need to use the first approach or register a custom load function via the 
`SegmentReaderRegistry`).
   
   ##### Key Classes
   `DruidBaseInputPartitionReader`
   `DruidColumnarInputPartitionReader`
   `DruidInputPartitionReader`
   `SegmentReaderRegistry`
   
   #### User interface
   
   Users use the connector like any other Spark reader: they call `.read` on 
their Spark session, set the format (in this case, `"druid"`), specify the 
properties to use, and then call `load()`. For example:
   
   ```scala
   sparkSession
     .read
     .format("druid")
     .options(Map[String, String](
       "metadata.dbType" -> "mysql",
       "metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",
       "metadata.user" -> "druid",
       "metadata.password" -> "diurd",
       "broker.host" -> "localhost",
       "broker.port" -> 8082,
       "table" -> "dataSource",
       "reader.deepStorageType" -> "local",
       "local.storageDirectory" -> "/mnt/druid/druid-segments/"
   ))
     .load()
   ```
   
   For convenience, a more strongly typed way to apply configure the reader is 
also provided:
   
   ```
   import org.apache.druid.spark.DruidDataFrameReader
   
   val deepStorageConfig = new 
LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
   
   sparkSession
     .read
     .brokerHost("localhost")
     .brokerPort(8082)
     .metadataDbType("mysql")
     .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
     .metadataUser("druid")
     .metadataPassword("diurd")
     .dataSource("dataSource")
     .deepStorage(deepStorageConfig)
     .druid()
   ```
   
   User documentation with examples and configuration option descriptions is 
provided in [spark.md](docs/operations/spark.md).
   
   Additionally, because the connector does not run in a Druid cluster, we 
can't use Druid's dependency injection to transparently handle custom 
extensions and behavior. In order to support users who use unsupported or 
custom complex metrics, deep storage implementations, or metadata databases, 
the connector uses a plugin-based architecture. All Druid core extensions are 
supported out of the box. If users need to use their own custom logic, they can 
register the appropriate functions with the corresponding registry 
(`ComplexMetricRegistry` for complex metrics, `SQLConnectorRegistry` for 
metadata databases, and `SegmentReaderRegistry` for loading segments from deep 
storage).
   
   ##### Key Classes
   `ComplexMetricRegistry`
   `SQLConnectorRegistry`
   `SegmentReaderRegistry`
   `DruidConfigurationKeys`
   `spark.md`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not 
all of these items apply to every PR. Remove the items which are not done or 
not relevant to the PR. None of the items from the checklist below are strictly 
necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   - [x] been self-reviewed.
      - [ ] using the [concurrency 
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
 (Remove this item if the PR doesn't have any relation to concurrency.)
   - [x] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in 
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [x] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [ ] added integration tests.
   - [x] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to