This is an automated email from the ASF dual-hosted git repository. progers pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 4d47a61 DRILL-7437: Storage Plugin for Generic HTTP REST API 4d47a61 is described below commit 4d47a61aefa77004f7541e59820c68a8ed8bad80 Author: Charles Givre <cgi...@apache.org> AuthorDate: Mon Apr 13 10:16:36 2020 -0400 DRILL-7437: Storage Plugin for Generic HTTP REST API --- common/pom.xml | 2 - .../native/client/src/protobuf/UserBitShared.pb.cc | 15 +- .../native/client/src/protobuf/UserBitShared.pb.h | 5 +- contrib/pom.xml | 1 + contrib/storage-http/README.md | 301 +++++++++++++ contrib/storage-http/images/issue_count.png | Bin 0 -> 50256 bytes contrib/storage-http/pom.xml | 101 +++++ .../drill/exec/store/http/HttpAPIConfig.java | 165 +++++++ .../exec/store/http/HttpAPIConnectionSchema.java | 83 ++++ .../drill/exec/store/http/HttpBatchReader.java | 97 +++++ .../drill/exec/store/http/HttpGroupScan.java | 168 ++++++++ .../exec/store/http/HttpScanBatchCreator.java | 104 +++++ .../apache/drill/exec/store/http/HttpScanSpec.java | 72 ++++ .../drill/exec/store/http/HttpSchemaFactory.java | 101 +++++ .../drill/exec/store/http/HttpStoragePlugin.java} | 54 +-- .../exec/store/http/HttpStoragePluginConfig.java | 170 ++++++++ .../apache/drill/exec/store/http/HttpSubScan.java | 130 ++++++ .../exec/store/http/util/HttpProxyConfig.java | 220 ++++++++++ .../drill/exec/store/http/util/SimpleHttp.java | 267 ++++++++++++ .../main/resources/bootstrap-storage-plugins.json | 9 + .../src/main/resources/drill-module.conf | 27 ++ .../drill/exec/store/http/TestHttpPlugin.java | 479 +++++++++++++++++++++ .../drill/exec/store/http/TestHttpProxy.java | 233 ++++++++++ .../src/test/resources/data/response.json | 14 + .../drill/exec/store/kudu/KuduStoragePlugin.java | 2 +- distribution/pom.xml | 5 + distribution/src/assemble/component.xml | 1 + .../src/main/resources/drill-override-example.conf | 31 ++ .../java/org/apache/drill/exec/ExecConstants.java | 17 + .../exec/store/easy/json/JSONRecordReader.java | 36 +- .../drill/exec/vector/complex/fn/JsonReader.java | 2 +- .../java-exec/src/main/resources/drill-module.conf | 39 ++ .../easy/json/parser/TestJsonParserBasics.java | 32 +- .../apache/drill/common/expression/SchemaPath.java | 21 + .../org/apache/drill/exec/proto/UserBitShared.java | 21 +- protocol/src/main/protobuf/UserBitShared.proto | 1 + 36 files changed, 2948 insertions(+), 78 deletions(-) diff --git a/common/pom.xml b/common/pom.xml index 889275e..8198be6 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -37,14 +37,12 @@ <artifactId>drill-protocol</artifactId> <version>${project.version}</version> </dependency> - <dependency> <!-- add as provided scope so that we can compile TestTools. Should only be ever used in a test scenario where someone else is bringing JUnit in. --> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> - <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc index 5f2186f..9ce65c2 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc @@ -1034,7 +1034,7 @@ void AddDescriptorsImpl() { "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014" "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" - "\032\n\026CANCELLATION_REQUESTED\020\006*\344\n\n\020CoreOper" + "\032\n\026CANCELLATION_REQUESTED\020\006*\367\n\n\020CoreOper" "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" @@ -1069,14 +1069,14 @@ void AddDescriptorsImpl() { "MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>\022\021\n\rHDF5_SUB_S" "CAN\020\?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA" "N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO" - "NTROLLER\020C*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN" - "\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002" - "\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033o" - "rg.apache.drill.exec.protoB\rUserBitShare" - "dH\001" + "NTROLLER\020C\022\021\n\rHTTP_SUB_SCAN\020F*g\n\nSaslSta" + "tus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n" + "\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n" + "\013SASL_FAILED\020\004B.\n\033org.apache.drill.exec." + "protoB\rUserBitSharedH\001" }; ::google::protobuf::DescriptorPool::InternalAddGeneratedFile( - descriptor, 5763); + descriptor, 5782); ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( "UserBitShared.proto", &protobuf_RegisterTypes); ::protobuf_Types_2eproto::AddDescriptors(); @@ -1323,6 +1323,7 @@ bool CoreOperatorType_IsValid(int value) { case 65: case 66: case 67: + case 70: return true; default: return false; diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h index 0aa41cf..d5483bc 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.h +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h @@ -358,11 +358,12 @@ enum CoreOperatorType { EXCEL_SUB_SCAN = 64, SHP_SUB_SCAN = 65, METADATA_HANDLER = 66, - METADATA_CONTROLLER = 67 + METADATA_CONTROLLER = 67, + HTTP_SUB_SCAN = 70 }; bool CoreOperatorType_IsValid(int value); const CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER; -const CoreOperatorType CoreOperatorType_MAX = METADATA_CONTROLLER; +const CoreOperatorType CoreOperatorType_MAX = HTTP_SUB_SCAN; const int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1; const ::google::protobuf::EnumDescriptor* CoreOperatorType_descriptor(); diff --git a/contrib/pom.xml b/contrib/pom.xml index 8f81d48..15f3dd5 100644 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -54,6 +54,7 @@ <module>storage-kafka</module> <module>storage-kudu</module> <module>storage-opentsdb</module> + <module>storage-http</module> </modules> </project> diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md new file mode 100644 index 0000000..3b9965c --- /dev/null +++ b/contrib/storage-http/README.md @@ -0,0 +1,301 @@ +# Generic REST API Storage Plugin + +This plugin is intended to enable you to query APIs over HTTP/REST. At this point, the API reader will only accept JSON as input however in the future, it may be possible to + add additional format readers to allow for APIs which return XML, CSV or other formats. + +Note: This plugin should **NOT** be used for interacting with tools which have REST APIs such as Splunk or Solr. It will not be performant for those use cases. + +## Configuration + +To configure the plugin, create a new storage plugin, and add the following configuration options which apply to ALL connections defined in this plugin: + +```json +{ + "type": "http", + "cacheResults": true, + "connections": {}, + "timeout": 0, + "proxyHost": null, + "proxyPort": 0, + "proxyType": null, + "proxyUsername": null, + "proxyPassword": null, + "enabled": true +} +``` +The required options are: +* `type`: This should be `http` +* `cacheResults`: Enable caching of the HTTP responses. Defaults to `false` +* `timeout`: Sets the response timeout in seconds. Defaults to `0` which is no timeout. +* `connections`: This field contains the details for individual connections. See the section *Configuring API Connections for Details*. + +You can configure Drill to work behind a corporate proxy. Details are listed below. + +### Configuring the API Connections + +The HTTP Storage plugin allows you to configure multiple APIS which you can query directly from this plugin. To do so, first add a `connections` parameter to the configuration +. Next give the connection a name, which will be used in queries. For instance `stockAPI` or `jira`. + +The `connection` can accept the following options: +* `url`: The base URL which Drill will query. You should include the ending slash if there are additional arguments which you are passing. +* `method`: The request method. Must be `get` or `post`. Other methods are not allowed and will default to `GET`. +* `headers`: Often APIs will require custom headers as part of the authentication. This field allows you to define key/value pairs which are submitted with the http request +. The format is: +```json +headers: { + "key1": "Value1", + "key2": "Value2" +} +``` +* `authType`: If your API requires authentication, specify the authentication type. At the time of implementation, the plugin only supports basic authentication, however, the + plugin will likely support OAUTH2 in the future. Defaults to `none`. If the `authType` is set to `basic`, `username` and `password` must be set in the configuration as well. + * `username`: The username for basic authentication. + * `password`: The password for basic authentication. + * `postBody`: Contains data, in the form of key value pairs, which are sent during a `POST` request. Post body should be in the form: + ``` +key1=value1 +key2=value2 +``` + +## Usage + +This plugin is different from other plugins in that it the table component of the `FROM` clause is different. In normal Drill queries, the `FROM` clause is constructed as follows: +```sql +FROM <storage plugin>.<schema>.<table> +``` +For example, you might have: +```sql +FROM dfs.test.`somefile.csv` + +-- or + +FROM mongo.stats.sales_data +``` + +The HTTP/REST plugin the `FROM` clause enables you to pass arguments to your REST call. The structure is: +```sql +FROM <plugin>.<connection>.<arguments> +--Actual example: + FROM http.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=today` +``` + +## Proxy Setup + +Some users access HTTP services from behind a proxy firewall. Drill provides three ways specify proxy +configuration. + +### Proxy Environment Variables + +Drill recognizes the usual Linux proxy environment variables: + +* `http_proxy`, `HTTP_PROXY` +* `https_proxy`, `HTTP_PROXY` +* `all_proxy`, `ALL_PROXY` + +This technique works well if your system is already configured to +handle proxies. + +### Boot Configuration + +You can also specify proxy configuration in the `drill-override.conf` file. +See `drill-override-example.conf` for a template. + +First, you can use the same form of URL you would use with the environment +variables: + +``` +drill.exec.net_proxy.http_url: "http://foo.com/1234" +``` + +There is one setting for HTTP, another for HTTPS. + +Alternatively, you can specify each field separately: + +``` +drill.exec.net_proxy.http: { + type: "none", # none, http, socks. Blank same as none. + host: "", + port: 80, + user_name: "", + password: "" + }, +``` + +The valid proxy types are `none`, `http` and `socks`. Blank is the same +as `none`. + +Again, there is a parallel section for HTTPS. + +Either of these approaches is preferred if the proxy is an attribute of your +network environment and is the same for all external HTTP/HTTPS requests. + +### In the HTTP Storage Plugin Config + +The final way to configure proxy is in the HTTP storage plugin itself. The proxy +applies to all connections defined in that plugin. Use this approach if the proxy +applies only to some external services, or if each service has a different proxy +(defined by creating a separate plugin config for each service.) + +```json + proxy_type: "direct", + proxy_host: "", + proxy_port: 80, + proxy_user_name: "", + proxy_password: "" +``` + +The valid proxy types are `direct`, `http` or `socks`. Blank is the same +as `direct`. + +## Examples + +### Example 1: Reference Data, A Sunrise/Sunset API + +The API sunrise-sunset.org returns data in the following format: + + ```json + "results": + { + "sunrise":"7:27:02 AM", + "sunset":"5:05:55 PM", + "solar_noon":"12:16:28 PM", + "day_length":"9:38:53", + "civil_twilight_begin":"6:58:14 AM", + "civil_twilight_end":"5:34:43 PM", + "nautical_twilight_begin":"6:25:47 AM", + "nautical_twilight_end":"6:07:10 PM", + "astronomical_twilight_begin":"5:54:14 AM", + "astronomical_twilight_end":"6:38:43 PM" + }, + "status":"OK" +} +``` +To query this API, set the configuration as follows: + +```json + + { + "type": "http", + "cacheResults": false, + "enabled": true, + "timeout": 5, + "connections": { + "sunrise": { + "url": "https://api.sunrise-sunset.org/", + "method": "GET", + "headers": null, + "authType": "none", + "userName": null, + "password": null, + "postBody": null + } + } +} + +``` +Then, to execute a query: +```sql + SELECT api_results.results.sunrise AS sunrise, + api_results.results.sunset AS sunset + FROM http.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=today` AS api_results; +``` +Which yields the following results: +``` ++------------+------------+ +| sunrise | sunset | ++------------+------------+ +| 7:17:46 AM | 5:01:33 PM | ++------------+------------+ +1 row selected (0.632 seconds) +``` + +### Example 2: JIRA + +JIRA Cloud has a REST API which is [documented here](https://developer.atlassian.com/cloud/jira/platform/rest/v3/?utm_source=%2Fcloud%2Fjira%2Fplatform%2Frest%2F&utm_medium=302). + +To connect Drill to JIRA Cloud, use the following configuration: +```json +{ + "type": "http", + "cacheResults": false, + "timeout": 5, + "connections": { + "sunrise": { + "url": "https://api.sunrise-sunset.org/", + "method": "GET", + "headers": null, + "authType": "none", + "userName": null, + "password": null, + "postBody": null + }, + "jira": { + "url": "https://<project>.atlassian.net/rest/api/3/", + "method": "GET", + "headers": { + "Accept": "application/json" + }, + "authType": "basic", + "userName": "<username>", + "password": "<API Key>", + "postBody": null + } + }, + "enabled": true +} +``` + +Once you've configured Drill to query the API, you can now easily access any of your data in JIRA. The JIRA API returns highly nested data, however with a little preparation, it + is pretty straightforward to transform it into a more useful table. For instance, the + query below: +```sql +SELECT jira_data.issues.key AS key, +jira_data.issues.fields.issueType.name AS issueType, +SUBSTR(jira_data.issues.fields.created, 1, 10) AS created, +SUBSTR(jira_data.issues.fields.updated, 1, 10) AS updated, +jira_data.issues.fields.assignee.displayName as assignee, +jira_data.issues.fields.creator.displayName as creator, +jira_data.issues.fields.summary AS summary, +jira_data.issues.fields.status.name AS currentStatus, +jira_data.issues.fields.priority.name AS priority, +jira_data.issues.fields.labels AS labels, +jira_data.issues.fields.subtasks AS subtasks +FROM ( +SELECT flatten(t1.issues) as issues +FROM http.jira.`search?jql=project=<project>&&maxResults=100` AS t1 +) AS jira_data +``` +The query below counts the number of issues by priority: + +```sql +SELECT +jira_data.issues.fields.priority.name AS priority, +COUNT(*) AS issue_count +FROM ( +SELECT flatten(t1.issues) as issues +FROM http.jira.`search?jql=project=<project>&maxResults=100` AS t1 +) AS jira_data +GROUP BY priority +ORDER BY issue_count DESC +``` + +<img src="images/issue_count.png" alt="Issue Count by Priority"/> + + +## Limitations + +1. The plugin is supposed to follow redirects, however if you are using Authentication, you may encounter errors or empty responses if you are counting on the endpoint for + redirection. + +2. At this time, the plugin does not support any authentication other than basic authentication. Future functionality may include OAUTH2 authentication and/or PKI + authentication for REST APIs. + +3. This plugin does not implement filter pushdowns. Filter pushdown has the potential to improve performance. + +4. This plugin only reads JSON responses. Future functionality may include the ability to parse XML, CSV or other common rest responses. + +5. At this time `POST` bodies can only be in the format of key/value pairs. Some APIs accept JSON based `POST` bodies and this is not currently supported. + +6. The returned message should contain only records, as a JSON array of objects (or as a series of JSON objects as in a JSON file). The + present version does not yet have the ability to ignore message "overhead" such as status codes, etc. You can of course, select individual fields in your query to ignore + "overhead" fields. diff --git a/contrib/storage-http/images/issue_count.png b/contrib/storage-http/images/issue_count.png new file mode 100644 index 0000000..49bc04f Binary files /dev/null and b/contrib/storage-http/images/issue_count.png differ diff --git a/contrib/storage-http/pom.xml b/contrib/storage-http/pom.xml new file mode 100644 index 0000000..0c2c875 --- /dev/null +++ b/contrib/storage-http/pom.xml @@ -0,0 +1,101 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>drill-contrib-parent</artifactId> + <groupId>org.apache.drill.contrib</groupId> + <version>1.18.0-SNAPSHOT</version> + </parent> + + <artifactId>drill-storage-http</artifactId> + <name>contrib/http-storage-plugin</name> + + <properties> + <okhttp.version>4.5.0</okhttp.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.drill.exec</groupId> + <artifactId>drill-java-exec</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <version>${okhttp.version}</version> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.apache.drill.exec</groupId> + <artifactId>drill-java-exec</artifactId> + <classifier>tests</classifier> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.drill</groupId> + <artifactId>drill-common</artifactId> + <classifier>tests</classifier> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>mockwebserver</artifactId> + <version>${okhttp.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + <build> + <plugins> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-java-sources</id> + <phase>process-sources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${basedir}/target/classes/org/apache/drill/exec/store/http + </outputDirectory> + <resources> + <resource> + <directory>src/main/java/org/apache/drill/exec/store/http</directory> + <filtering>true</filtering> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConfig.java new file mode 100644 index 0000000..a850d1d --- /dev/null +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConfig.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.http; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.exceptions.UserException; +import org.apache.parquet.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Objects; + +public class HttpAPIConfig { + private static final Logger logger = LoggerFactory.getLogger(HttpAPIConfig.class); + + private final String url; + + private final HttpMethods method; + + private final Map<String, String> headers; + + private final String authType; + + private final String userName; + + private final String password; + + private final String postBody; + + public enum HttpMethods { + /** + * Value for HTTP GET method + */ + GET, + /** + * Value for HTTP POST method + */ + POST; + } + + public HttpAPIConfig(@JsonProperty("url") String url, + @JsonProperty("method") String method, + @JsonProperty("headers") Map<String, String> headers, + @JsonProperty("authType") String authType, + @JsonProperty("userName") String userName, + @JsonProperty("password") String password, + @JsonProperty("postBody") String postBody) { + + this.headers = headers; + this.method = Strings.isNullOrEmpty(method) + ? HttpMethods.GET : HttpMethods.valueOf(method.trim().toUpperCase()); + + // Get the request method. Only accept GET and POST requests. Anything else will default to GET. + switch (this.method) { + case GET: + case POST: + break; + default: + throw UserException + .validationError() + .message("Invalid HTTP method: %s. Drill supports 'GET' and , 'POST'.", method) + .build(logger); + } + if (Strings.isNullOrEmpty(url)) { + throw UserException + .validationError() + .message("URL is required for the HTTP storage plugin.") + .build(logger); + } + + // Put a trailing slash on the URL if it is missing + if (url.charAt(url.length() - 1) != '/') { + this.url = url + "/"; + } else { + this.url = url; + } + + // Get the authentication method. Future functionality will include OAUTH2 authentication but for now + // Accept either basic or none. The default is none. + this.authType = Strings.isNullOrEmpty(authType) ? "none" : authType; + this.userName = userName; + this.password = password; + this.postBody = postBody; + } + + @JsonProperty("url") + public String url() { return url; } + + @JsonProperty("method") + public String method() { return method.toString(); } + + @JsonProperty("headers") + public Map<String, String> headers() { return headers; } + + @JsonProperty("authType") + public String authType() { return authType; } + + @JsonProperty("userName") + public String userName() { return userName; } + + @JsonProperty("password") + public String password() { return password; } + + @JsonProperty("postBody") + public String postBody() { return postBody; } + + @JsonIgnore + public HttpMethods getMethodType() { + return HttpMethods.valueOf(this.method()); + } + + @Override + public int hashCode() { + return Objects.hash(url, method, headers, authType, userName, password, postBody); + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("url", url) + .field("method", method) + .field("headers", headers) + .field("authType", authType) + .field("username", userName) + .field("password", password) + .field("postBody", postBody) + .toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + HttpAPIConfig other = (HttpAPIConfig) obj; + return Objects.equals(url, other.url) + && Objects.equals(method, other.method) + && Objects.equals(headers, other.headers) + && Objects.equals(authType, other.authType) + && Objects.equals(userName, other.userName) + && Objects.equals(password, other.password) + && Objects.equals(postBody, other.postBody); + } +} diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java new file mode 100644 index 0000000..d3fa51f --- /dev/null +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.http; + +import org.apache.calcite.schema.Table; +import org.apache.drill.exec.planner.logical.DynamicDrillTable; +import org.apache.drill.exec.store.AbstractSchema; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * In the HTTP storage plugin, users can define specific connections or APIs. + * This class represents the database component of other storage plugins. + */ +public class HttpAPIConnectionSchema extends AbstractSchema { + + private final Map<String, DynamicDrillTable> activeTables = new HashMap<>(); + + private final HttpStoragePlugin plugin; + + private final String pluginName; + + public HttpAPIConnectionSchema(HttpSchemaFactory.HttpSchema httpSchema, + String name, + HttpStoragePlugin plugin) { + super(httpSchema.getSchemaPath(), name); + this.plugin = plugin; + pluginName = plugin.getName(); + } + + @Override + public String getTypeName() { + return HttpStoragePluginConfig.NAME; + } + + /** + * Gets the table that is received from the query. In this case, the table actually are arguments which are passed + * in the URL string. + * + * @param tableName + * The "tableName" actually will contain the URL arguments passed to + * the record reader + * @return the selected table + */ + @Override + public Table getTable(String tableName) { + DynamicDrillTable table = activeTables.get(name); + if (table != null) { + // Return the found table + return table; + } else { + // Register a new table + return registerTable(name, new DynamicDrillTable(plugin, pluginName, new HttpScanSpec(pluginName, name, tableName, plugin.getConfig()))); + } + } + + @Override + public Set<String> getTableNames() { + return activeTables.keySet(); + } + + private DynamicDrillTable registerTable(String name, DynamicDrillTable table) { + activeTables.put(name, table); + return table; + } +} diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java new file mode 100644 index 0000000..7aaa1e8 --- /dev/null +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.http; + +import java.io.File; + +import org.apache.drill.common.exceptions.CustomErrorContext; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.store.easy.json.loader.JsonLoader; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder; +import org.apache.drill.exec.store.http.util.HttpProxyConfig; +import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder; +import org.apache.drill.exec.store.http.util.SimpleHttp; + +import com.typesafe.config.Config; + +public class HttpBatchReader implements ManagedReader<SchemaNegotiator> { + private final HttpStoragePluginConfig config; + private final HttpSubScan subScan; + private JsonLoader jsonLoader; + + public HttpBatchReader(HttpStoragePluginConfig config, HttpSubScan subScan) { + this.config = config; + this.subScan = subScan; + } + + @Override + public boolean open(SchemaNegotiator negotiator) { + CustomErrorContext errorContext = negotiator.parentErrorContext(); + + // Result set loader setup + String tempDirPath = negotiator + .drillConfig() + .getString(ExecConstants.DRILL_TMP_DIR); + ResultSetLoader loader = negotiator.build(); + + // Http client setup + SimpleHttp http = new SimpleHttp(config, new File(tempDirPath), subScan.tableSpec().database(), proxySettings(negotiator.drillConfig()), errorContext); + + // JSON loader setup + jsonLoader = new JsonLoaderBuilder() + .resultSetLoader(loader) + .standardOptions(negotiator.queryOptions()) + .errorContext(errorContext) + .fromStream(http.getInputStream(subScan.getFullURL())) + .build(); + + // Please read the first batch + return true; + } + + private HttpProxyConfig proxySettings(Config drillConfig) { + ProxyBuilder builder = HttpProxyConfig.builder() + .fromConfigForURL(drillConfig, subScan.getFullURL()); + String proxyType = config.proxyType(); + if (proxyType != null && !"direct".equals(proxyType)) { + builder + .type(config.proxyType()) + .host(config.proxyHost()) + .port(config.proxyPort()) + .username(config.proxyUsername()) + .password(config.proxyPassword()); + } + return builder.build(); + } + + @Override + public boolean next() { + return jsonLoader.readBatch(); + } + + @Override + public void close() { + if (jsonLoader != null) { + jsonLoader.close(); + jsonLoader = null; + } + } +} diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java new file mode 100644 index 0000000..9e4e86d --- /dev/null +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.http; + +import java.util.List; +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.expression.SchemaPath; + +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.planner.cost.DrillCostBase; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; + + +@JsonTypeName("http-scan") +public class HttpGroupScan extends AbstractGroupScan { + + private final List<SchemaPath> columns; + private final HttpScanSpec httpScanSpec; + private final HttpStoragePluginConfig config; + + public HttpGroupScan ( + HttpStoragePluginConfig config, + HttpScanSpec scanSpec, + List<SchemaPath> columns) { + super("no-user"); + this.config = config; + this.httpScanSpec = scanSpec; + this.columns = columns; + } + + public HttpGroupScan(HttpGroupScan that) { + super(that); + config = that.config(); + httpScanSpec = that.httpScanSpec(); + columns = that.getColumns(); + } + + public HttpGroupScan(HttpGroupScan that, List<SchemaPath> columns) { + super("no-user"); + this.columns = columns; + this.config = that.config; + this.httpScanSpec = that.httpScanSpec; + } + + @JsonCreator + public HttpGroupScan( + @JsonProperty("config") HttpStoragePluginConfig config, + @JsonProperty("columns") List<SchemaPath> columns, + @JsonProperty("httpScanSpec") HttpScanSpec httpScanSpec + ) { + super("no-user"); + this.config = config; + this.columns = columns; + this.httpScanSpec = httpScanSpec; + } + + @JsonProperty("config") + public HttpStoragePluginConfig config() { return config; } + + @JsonProperty("columns") + public List<SchemaPath> columns() { return columns; } + + @JsonProperty("httpScanSpec") + public HttpScanSpec httpScanSpec() { return httpScanSpec; } + + @Override + public void applyAssignments(List<DrillbitEndpoint> endpoints) { + // No filter pushdowns yet, so this method does nothing + return; + } + + @Override + @JsonIgnore + public int getMaxParallelizationWidth() { + return 1; + } + + @Override + public boolean canPushdownProjects(List<SchemaPath> columns) { + return true; + } + + @Override + public SubScan getSpecificScan(int minorFragmentId) { + return new HttpSubScan(config, httpScanSpec, columns); + } + + @Override + public GroupScan clone(List<SchemaPath> columns) { + return new HttpGroupScan(this, columns); + } + + @Override + public String getDigest() { + return toString(); + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + Preconditions.checkArgument(children.isEmpty()); + return new HttpGroupScan(this); + } + + @Override + public ScanStats getScanStats() { + int estRowCount = 10_000; + int rowWidth = columns == null ? 200 : 100; + int estDataSize = estRowCount * 200 * rowWidth; + int estCpuCost = DrillCostBase.PROJECT_CPU_COST; + return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, + estRowCount, estCpuCost, estDataSize); + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("httpScanSpec", httpScanSpec) + .field("columns", columns) + .field("httpStoragePluginConfig", config) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hash(httpScanSpec, columns, config); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + HttpGroupScan other = (HttpGroupScan) obj; + return Objects.equals(httpScanSpec, other.httpScanSpec()) + && Objects.equals(columns, other.columns()) + && Objects.equals(config, other.config()); + } +} diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java new file mode 100644 index 0000000..46d9838 --- /dev/null +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.http; + +import java.util.List; + +import org.apache.drill.common.exceptions.ChildErrorContext; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ops.ExecutorFragmentContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder; +import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; + +public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> { + + @Override + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, HttpSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + + try { + ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan); + return builder.buildScanOperator(context, subScan); + } catch (UserException e) { + // Rethrow user exceptions directly + throw e; + } catch (Throwable e) { + // Wrap all others + throw new ExecutionSetupException(e); + } + } + + private ScanFrameworkBuilder createBuilder(OptionManager options, + HttpSubScan subScan) { + HttpStoragePluginConfig config = subScan.config(); + ScanFrameworkBuilder builder = new ScanFrameworkBuilder(); + builder.projection(subScan.columns()); + builder.setUserName(subScan.getUserName()); + + // Provide custom error context + builder.errorContext( + new ChildErrorContext(builder.errorContext()) { + @Override + public void addContext(UserException.Builder builder) { + builder.addContext("URL", subScan.getFullURL()); + } + }); + + // Reader + ReaderFactory readerFactory = new HttpReaderFactory(config, subScan); + builder.setReaderFactory(readerFactory); + builder.nullType(Types.optional(MinorType.VARCHAR)); + return builder; + } + + private static class HttpReaderFactory implements ReaderFactory { + + private final HttpStoragePluginConfig config; + private final HttpSubScan subScan; + private int count; + + public HttpReaderFactory(HttpStoragePluginConfig config, HttpSubScan subScan) { + this.config = config; + this.subScan = subScan; + } + + @Override + public void bind(ManagedScanFramework framework) { } + + @Override + public ManagedReader<SchemaNegotiator> next() { + + // Only a single scan (in a single thread) + if (count++ == 0) { + return new HttpBatchReader(config, subScan); + } else { + return null; + } + } + }} diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java new file mode 100644 index 0000000..19921bc --- /dev/null +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.PlanStringBuilder; + +@JsonTypeName("http-scan-spec") +public class HttpScanSpec { + + protected final String schemaName; + + protected final String database; + + protected final String tableName; + + protected final HttpStoragePluginConfig config; + + @JsonCreator + public HttpScanSpec(@JsonProperty("schemaName") String schemaName, + @JsonProperty("database") String database, + @JsonProperty("tableName") String tableName, + @JsonProperty("config") HttpStoragePluginConfig config) { + this.schemaName = schemaName; + this.database = database; + this.tableName = tableName; + this.config = config; + } + + @JsonProperty("database") + public String database() { + return database; + } + + @JsonProperty("tableName") + public String tableName() { + return tableName; + } + + @JsonIgnore + public String getURL() { + return database; + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("schemaName", schemaName) + .field("database", database) + .field("tableName", tableName) + .field("config", config) + .toString(); + } +} diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java new file mode 100644 index 0000000..c111d3d --- /dev/null +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.http; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.calcite.schema.SchemaPlus; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.store.AbstractSchema; +import org.apache.drill.exec.store.AbstractSchemaFactory; +import org.apache.drill.exec.store.SchemaConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HttpSchemaFactory extends AbstractSchemaFactory { + private static final Logger logger = LoggerFactory.getLogger(HttpSchemaFactory.class); + + private final HttpStoragePlugin plugin; + + public HttpSchemaFactory(HttpStoragePlugin plugin, String schemaName) { + super(schemaName); + this.plugin = plugin; + } + + @Override + public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) { + HttpSchema schema = new HttpSchema(getName()); + logger.debug("Registering {} {}", schema.getName(), schema.toString()); + + SchemaPlus schemaPlus = parent.add(getName(), schema); + schema.setHolder(schemaPlus); + } + + class HttpSchema extends AbstractSchema { + + public HttpSchema(String name) { + super(Collections.emptyList(), name); + } + + void setHolder(SchemaPlus plusOfThis) { + for (String s : getSubSchemaNames()) { + plusOfThis.add(s, getSubSchemaKnownExists(s)); + } + } + + @Override + public Set<String> getSubSchemaNames() { + HttpStoragePluginConfig config = plugin.getConfig(); + Map<String, HttpAPIConfig> connections = config.connections(); + Set<String> subSchemaNames = new HashSet<>(); + + // Get the possible subschemas. + for (Map.Entry<String, HttpAPIConfig> entry : connections.entrySet()) { + subSchemaNames.add(entry.getKey()); + } + return subSchemaNames; + } + + @Override + public AbstractSchema getSubSchema(String name) { + if (plugin.getConfig().connections().containsKey(name)) { + return getSubSchemaKnownExists(name); + } else { + throw UserException + .connectionError() + .message("API '{}' does not exist in HTTP Storage plugin '{}'", name, getName()) + .build(logger); + } + } + + /** + * Helper method to get subschema when we know it exists (already checked the existence) + */ + private HttpAPIConnectionSchema getSubSchemaKnownExists(String name) { + return new HttpAPIConnectionSchema(this, name, plugin); + } + + @Override + public String getTypeName() { + return HttpStoragePluginConfig.NAME; + } + } +} diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java similarity index 53% copy from contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java copy to contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java index 35c974d..c660a2e 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java @@ -15,67 +15,47 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.drill.exec.store.kudu; - -import java.io.IOException; +package org.apache.drill.exec.store.http; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.JSONOptions; +import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.SchemaConfig; -import org.apache.kudu.client.KuduClient; - import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; -public class KuduStoragePlugin extends AbstractStoragePlugin { +public class HttpStoragePlugin extends AbstractStoragePlugin { - private final KuduStoragePluginConfig engineConfig; - private final KuduSchemaFactory schemaFactory; + private final HttpStoragePluginConfig config; - private final KuduClient client; + private final HttpSchemaFactory schemaFactory; - public KuduStoragePlugin(KuduStoragePluginConfig configuration, DrillbitContext context, String name) - throws IOException { + public HttpStoragePlugin(HttpStoragePluginConfig configuration, DrillbitContext context, String name) { super(context, name); - this.schemaFactory = new KuduSchemaFactory(this, name); - this.engineConfig = configuration; - this.client = new KuduClient.KuduClientBuilder(configuration.getMasterAddresses()).build(); - } - - public KuduClient getClient() { - return client; - } - - @Override - public void close() throws Exception { - client.close(); + this.config = configuration; + this.schemaFactory = new HttpSchemaFactory(this, name); } @Override - public boolean supportsRead() { - return true; + public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) { + schemaFactory.registerSchemas(schemaConfig, parent); } @Override - public KuduGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException { - KuduScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<KuduScanSpec>() {}); - return new KuduGroupScan(this, scanSpec, null); + public HttpStoragePluginConfig getConfig() { + return config; } @Override - public boolean supportsWrite() { + public boolean supportsRead() { return true; } @Override - public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException { - schemaFactory.registerSchemas(schemaConfig, parent); - } - - @Override - public KuduStoragePluginConfig getConfig() { - return engineConfig; + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException { + HttpScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<HttpScanSpec>() {}); + return new HttpGroupScan(config, scanSpec, null); } } diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java new file mode 100644 index 0000000..2b65bb9 --- /dev/null +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.http; + +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.map.CaseInsensitiveMap; +import org.apache.drill.common.logical.StoragePluginConfigBase; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Objects; + + +@JsonTypeName(HttpStoragePluginConfig.NAME) +public class HttpStoragePluginConfig extends StoragePluginConfigBase { + private static final Logger logger = LoggerFactory.getLogger(HttpStoragePluginConfig.class); + + public static final String NAME = "http"; + + public final Map<String, HttpAPIConfig> connections; + + public final boolean cacheResults; + + public final String proxyHost; + + public final int proxyPort; + + public final String proxyType; + + public final String proxyUsername; + + public final String proxyPassword; + + /** + * Timeout in seconds. + */ + public int timeout; + + @JsonCreator + public HttpStoragePluginConfig(@JsonProperty("cacheResults") Boolean cacheResults, + @JsonProperty("connections") Map<String, HttpAPIConfig> connections, + @JsonProperty("timeout") Integer timeout, + @JsonProperty("proxyHost") String proxyHost, + @JsonProperty("proxyPort") Integer proxyPort, + @JsonProperty("proxyType") String proxyType, + @JsonProperty("proxyUsername") String proxyUsername, + @JsonProperty("proxyPassword") String proxyPassword + ) { + this.cacheResults = cacheResults == null ? false : cacheResults; + + this.connections = CaseInsensitiveMap.newHashMap(); + if (connections != null) { + this.connections.putAll(connections); + } + + this.timeout = timeout == null ? 0 : timeout; + this.proxyHost = normalize(proxyHost); + this.proxyPort = proxyPort == null ? 0 : proxyPort; + this.proxyUsername = normalize(proxyUsername); + this.proxyPassword = normalize(proxyPassword); + proxyType = normalize(proxyType); + this.proxyType = proxyType == null + ? "direct" : proxyType.trim().toLowerCase(); + + // Validate Proxy Type + if (this.proxyType != null) { + switch (this.proxyType) { + case "direct": + case "http": + case "socks": + break; + default: + throw UserException + .validationError() + .message("Invalid Proxy Type: %s. Drill supports 'direct', 'http' and 'socks' proxies.", proxyType) + .build(logger); + } + } + } + + private static String normalize(String value) { + if (value == null) { + return value; + } + value = value.trim(); + return value.isEmpty() ? null : value; + } + + @Override + public boolean equals(Object that) { + if (this == that) { + return true; + } else if (that == null || getClass() != that.getClass()) { + return false; + } + HttpStoragePluginConfig thatConfig = (HttpStoragePluginConfig) that; + return Objects.equals(connections, thatConfig.connections) && + Objects.equals(cacheResults, thatConfig.cacheResults) && + Objects.equals(proxyHost, thatConfig.proxyHost) && + Objects.equals(proxyPort, thatConfig.proxyPort) && + Objects.equals(proxyType, thatConfig.proxyType) && + Objects.equals(proxyUsername, thatConfig.proxyUsername) && + Objects.equals(proxyPassword, thatConfig.proxyPassword); + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("connections", connections) + .field("cacheResults", cacheResults) + .field("timeout", timeout) + .field("proxyHost", proxyHost) + .field("proxyPort", proxyPort) + .field("proxyUsername", proxyUsername) + .field("proxyPassword", proxyPassword) + .field("proxyType", proxyType) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hash(connections, cacheResults, timeout, + proxyHost, proxyPort, proxyType, proxyUsername, proxyPassword); + } + + @JsonProperty("cacheResults") + public boolean cacheResults() { return cacheResults; } + + @JsonProperty("connections") + public Map<String, HttpAPIConfig> connections() { return connections; } + + @JsonProperty("timeout") + public int timeout() { return timeout;} + + @JsonProperty("proxyHost") + public String proxyHost() { return proxyHost; } + + @JsonProperty("proxyPort") + public int proxyPort() { return proxyPort; } + + @JsonProperty("proxyUsername") + public String proxyUsername() { return proxyUsername; } + + @JsonProperty("proxyPassword") + public String proxyPassword() { return proxyPassword; } + + @JsonProperty("proxyType") + public String proxyType() { return proxyType; } +} diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java new file mode 100644 index 0000000..700ad70 --- /dev/null +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.http; + +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; + +@JsonTypeName("http-sub-scan") +public class HttpSubScan extends AbstractBase implements SubScan { + + private final HttpScanSpec tableSpec; + private final HttpStoragePluginConfig config; + private final List<SchemaPath> columns; + + @JsonCreator + public HttpSubScan( + @JsonProperty("config") HttpStoragePluginConfig config, + @JsonProperty("tableSpec") HttpScanSpec tableSpec, + @JsonProperty("columns") List<SchemaPath> columns) { + super("user-if-needed"); + this.config = config; + this.tableSpec = tableSpec; + this.columns = columns; + } + @JsonProperty("tableSpec") + public HttpScanSpec tableSpec() { + return tableSpec; + } + + @JsonProperty("columns") + public List<SchemaPath> columns() { + return columns; + } + + @JsonProperty("config") + public HttpStoragePluginConfig config() { + return config; + } + + @JsonIgnore + public String getURL() { + return tableSpec.getURL(); + } + + @JsonIgnore + public String getFullURL() { + String selectedConnection = tableSpec.database(); + String url = config.connections().get(selectedConnection).url(); + return url + tableSpec.tableName(); + } + + @Override + public <T, X, E extends Throwable> T accept( + PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitSubScan(this, value); + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + return new HttpSubScan(config, tableSpec, columns); + } + + @Override + @JsonIgnore + public int getOperatorType() { + return CoreOperatorType.HTTP_SUB_SCAN_VALUE; + } + + @Override + public Iterator<PhysicalOperator> iterator() { + return ImmutableSet.<PhysicalOperator>of().iterator(); + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("tableSpec", tableSpec) + .field("columns", columns) + .field("config", config) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hash(tableSpec,columns,config); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + HttpSubScan other = (HttpSubScan) obj; + return Objects.equals(tableSpec, other.tableSpec) + && Objects.equals(columns, other.columns) + && Objects.equals(config, other.config); + } +} diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/HttpProxyConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/HttpProxyConfig.java new file mode 100644 index 0000000..41f6d35 --- /dev/null +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/HttpProxyConfig.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.http.util; + +import java.net.MalformedURLException; +import java.net.URL; + +import org.apache.drill.exec.ExecConstants; +import org.apache.parquet.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.typesafe.config.Config; + +/** + * HTTP proxy settings. Provides a builder to create settings + * from the Drill config or from code. Allows combining the two. + * The Drill config allows integrating with Linux env. vars. Allows + * combinations: take values from config, but selectively replace bits, + * such as the user name/password. + * <p> + * This class provides values passed to the HTTP client, whatever it + * might be. + * + * @see <a href="https://www.shellhacks.com/linux-proxy-server-settings-set-proxy-command-line/"> + * Proxy Server Settings</a> + * + */ +public class HttpProxyConfig { + private static final Logger logger = LoggerFactory.getLogger(HttpProxyConfig.class); + + public enum ProxyType { + NONE, HTTP, SOCKS + } + + public static class ProxyBuilder { + private String url; + private String typeStr; + private ProxyType type = ProxyType.NONE; + private String host; + private int port = 80; + private String username; + private String password; + + public ProxyBuilder fromHttpConfig(Config config) { + url(config.getString(ExecConstants.HTTP_PROXY_URL)); + type(config.getString(ExecConstants.HTTP_PROXY_TYPE)); + host(config.getString(ExecConstants.HTTP_PROXY_HOST)); + port(config.getInt(ExecConstants.HTTP_PROXY_PORT)); + username(config.getString(ExecConstants.HTTP_PROXY_USER_NAME)); + password(config.getString(ExecConstants.HTTP_PROXY_PASSWORD)); + return this; + } + + public ProxyBuilder fromHttpsConfig(Config config) { + url(config.getString(ExecConstants.HTTPS_PROXY_URL)); + type(config.getString(ExecConstants.HTTPS_PROXY_TYPE)); + host(config.getString(ExecConstants.HTTPS_PROXY_HOST)); + port(config.getInt(ExecConstants.HTTPS_PROXY_PORT)); + username(config.getString(ExecConstants.HTTPS_PROXY_USER_NAME)); + password(config.getString(ExecConstants.HTTPS_PROXY_PASSWORD)); + return this; + } + + public ProxyBuilder fromConfigForURL(Config config, String url) { + try { + URL parsed = new URL(url); + if (parsed.getProtocol().equals("https")) { + return fromHttpsConfig(config); + } + } catch (Exception e) { + // This is not the place to warn about a bad URL. + // Just assume HTTP, something later will fail. + } + return fromHttpConfig(config); + } + + public ProxyBuilder url(String url) { + this.url = url; + return this; + } + + public ProxyBuilder type(ProxyType type) { + this.type = type; + this.typeStr = null; + return this; + } + + public ProxyBuilder type(String type) { + this.typeStr = type; + this.type = null; + return this; + } + + public ProxyBuilder host(String host) { + this.host = host; + return this; + } + + public ProxyBuilder port(int port) { + this.port = port; + return this; + } + + public ProxyBuilder username(String username) { + this.username = username; + return this; + } + + public ProxyBuilder password(String password) { + this.password = password; + return this; + } + + public HttpProxyConfig build() { + buildFromUrl(); + buildType(); + + // Info can come from the config file. Ignore extra spaces. + host = host == null ? null : host.trim(); + username = username == null ? null : username.trim(); + password = password == null ? null : password.trim(); + return new HttpProxyConfig(this); + } + + private void buildFromUrl() { + url = url == null ? null : url.trim(); + if (Strings.isNullOrEmpty(url)) { + return; + } + typeStr = null; + URL parsed; + try { + parsed = new URL(url); + } catch (MalformedURLException e) { + logger.warn("Invalid proxy url: {}, assuming NONE", typeStr); + type = ProxyType.NONE; + return; + } + type = ProxyType.HTTP; + host = parsed.getHost(); + port = parsed.getPort(); + String userInfo = parsed.getUserInfo(); + if (userInfo != null) { + String[] parts = userInfo.split(":"); + username = parts[0]; + password = parts.length > 1 ? parts[1] : null; + } + } + + private void buildType() { + + // If type string, validate to a type + if (typeStr != null) { + typeStr = typeStr.trim().toUpperCase(); + if (!Strings.isNullOrEmpty(typeStr)) { + try { + type = ProxyType.valueOf(typeStr); + } catch (IllegalArgumentException e) { + logger.warn("Invalid proxy type: {}, assuming NONE", typeStr); + this.type = ProxyType.NONE; + } + } + } + + // If not type, assume NONE + if (type == null) { + type = ProxyType.NONE; + } + + // Validate host based on type + if (type != ProxyType.NONE) { + host = host == null ? null : host.trim(); + if (Strings.isNullOrEmpty(host)) { + logger.warn("{} proxy type specified, but host is null. Reverting to NONE", + type.name()); + type = ProxyType.NONE; + } + } + + // If no proxy, ignore other settings + if (type == ProxyType.NONE) { + host = null; + username = null; + password = null; + } + } + } + + public final ProxyType type; + public final String host; + public final int port; + public final String username; + public final String password; + + private HttpProxyConfig(ProxyBuilder builder) { + this.type = builder.type; + this.host = builder.host; + this.port = builder.port; + this.username = builder.username; + this.password = builder.password; + } + + public static ProxyBuilder builder() { return new ProxyBuilder(); } +} diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java new file mode 100644 index 0000000..9bc234f --- /dev/null +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.http.util; + +import okhttp3.Authenticator; +import okhttp3.Cache; +import okhttp3.Credentials; +import okhttp3.FormBody; +import okhttp3.Interceptor; +import okhttp3.OkHttpClient; +import okhttp3.OkHttpClient.Builder; +import okhttp3.Request; +import okhttp3.Response; + +import okhttp3.Route; + +import org.apache.drill.common.exceptions.CustomErrorContext; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.store.http.HttpAPIConfig; +import org.apache.drill.exec.store.http.HttpAPIConfig.HttpMethods; +import org.apache.drill.exec.store.http.HttpStoragePluginConfig; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + + +/** + * Performs the actual HTTP requests for the HTTP Storage Plugin. The core + * method is the getInputStream() method which accepts a url and opens an + * InputStream with that URL's contents. + */ +public class SimpleHttp { + private static final Logger logger = LoggerFactory.getLogger(SimpleHttp.class); + + private final OkHttpClient client; + private final HttpStoragePluginConfig config; + private final HttpAPIConfig apiConfig; + private final File tempDir; + private final HttpProxyConfig proxyConfig; + private final CustomErrorContext errorContext; + + public SimpleHttp(HttpStoragePluginConfig config, File tempDir, + String connectionName, HttpProxyConfig proxyConfig, + CustomErrorContext errorContext) { + this.config = config; + this.tempDir = tempDir; + this.apiConfig = config.connections().get(connectionName); + this.proxyConfig = proxyConfig; + this.errorContext = errorContext; + this.client = setupHttpClient(); + } + + public InputStream getInputStream(String urlStr) { + Request.Builder requestBuilder; + + requestBuilder = new Request.Builder() + .url(urlStr); + + // The configuration does not allow for any other request types other than POST and GET. + if (apiConfig.getMethodType() == HttpMethods.POST) { + // Handle POST requests + FormBody.Builder formBodyBuilder = buildPostBody(); + requestBuilder.post(formBodyBuilder.build()); + } + + // Add headers to request + if (apiConfig.headers() != null) { + for (Map.Entry<String, String> entry : apiConfig.headers().entrySet()) { + requestBuilder.addHeader(entry.getKey(), entry.getValue()); + } + } + + // Build the request object + Request request = requestBuilder.build(); + + try { + // Execute the request + Response response = client + .newCall(request) + .execute(); + + // If the request is unsuccessful, throw a UserException + if (!response.isSuccessful()) { + throw UserException + .dataReadError() + .message("Error retrieving data from HTTP Storage Plugin: %d %s", + response.code(), response.message()) + .addContext(errorContext) + .build(logger); + } + logger.debug("HTTP Request for {} successful.", urlStr); + logger.debug("Response Headers: {} ", response.headers().toString()); + + // Return the InputStream of the response + return Objects.requireNonNull(response.body()).byteStream(); + } catch (IOException e) { + throw UserException + .dataReadError(e) + .message("Error retrieving data from HTTP Storage Plugin: %s", e.getMessage()) + .addContext(errorContext) + .build(logger); + } + } + + /** + * Configures the OkHTTP3 server object with configuration info from the user. + * + * @return OkHttpClient configured server + */ + private OkHttpClient setupHttpClient() { + Builder builder = new OkHttpClient.Builder(); + + // Set up the HTTP Cache. Future possibilities include making the cache size and retention configurable but + // right now it is on or off. The writer will write to the Drill temp directory if it is accessible and + // output a warning if not. + if (config.cacheResults()) { + setupCache(builder); + } + + // If the API uses basic authentication add the authentication code. + if (apiConfig.authType().toLowerCase().equals("basic")) { + logger.debug("Adding Interceptor"); + builder.addInterceptor(new BasicAuthInterceptor(apiConfig.userName(), apiConfig.password())); + } + + // Set timeouts + builder.connectTimeout(config.timeout(), TimeUnit.SECONDS); + builder.writeTimeout(config.timeout(), TimeUnit.SECONDS); + builder.readTimeout(config.timeout(), TimeUnit.SECONDS); + + // Set the proxy configuration + + Proxy.Type proxyType; + switch (proxyConfig.type) { + case SOCKS: + proxyType = Proxy.Type.SOCKS; + break; + case HTTP: + proxyType = Proxy.Type.HTTP; + break; + default: + proxyType = Proxy.Type.DIRECT; + } + if (proxyType != Proxy.Type.DIRECT) { + builder.proxy(new Proxy(proxyType, + new InetSocketAddress(proxyConfig.host, proxyConfig.port))); + if (proxyConfig.username != null) { + builder.proxyAuthenticator(new Authenticator() { + @Override public Request authenticate(Route route, Response response) { + String credential = Credentials.basic(proxyConfig.username, proxyConfig.password); + return response.request().newBuilder() + .header("Proxy-Authorization", credential) + .build(); + } + }); + } + } + + return builder.build(); + } + + /** + * Configures response caching using a provided temp directory. + * + * @param builder + * Builder the Builder object to which the caching is to be + * configured + */ + private void setupCache(Builder builder) { + int cacheSize = 10 * 1024 * 1024; // TODO Add cache size in MB to config + File cacheDirectory = new File(tempDir, "http-cache"); + if (!cacheDirectory.mkdirs()) { + throw UserException.dataWriteError() + .message("Could not create the HTTP cache directory") + .addContext("Path", cacheDirectory.getAbsolutePath()) + .addContext("Please check the temp directory or disable HTTP caching.") + .addContext(errorContext) + .build(logger); + } + try { + Cache cache = new Cache(cacheDirectory, cacheSize); + logger.debug("Caching HTTP Query Results at: {}", cacheDirectory); + builder.cache(cache); + } catch (Exception e) { + throw UserException.dataWriteError(e) + .message("Could not create the HTTP cache") + .addContext("Path", cacheDirectory.getAbsolutePath()) + .addContext("Please check the temp directory or disable HTTP caching.") + .addContext(errorContext) + .build(logger); + } + } + + /** + * Accepts text from a post body in the format:<br> + * {@code key1=value1}<br> + * {@code key2=value2} + * <p> + * and creates the appropriate headers. + * + * @return FormBody.Builder The populated formbody builder + */ + private FormBody.Builder buildPostBody() { + final Pattern postBodyPattern = Pattern.compile("^.+=.+$"); + + FormBody.Builder formBodyBuilder = new FormBody.Builder(); + String[] lines = apiConfig.postBody().split("\\r?\\n"); + for(String line : lines) { + + // If the string is in the format key=value split it, + // Otherwise ignore + if (postBodyPattern.matcher(line).find()) { + //Split into key/value + String[] parts = line.split("="); + formBodyBuilder.add(parts[0], parts[1]); + } + } + return formBodyBuilder; + } + + /** + * Intercepts requests and adds authentication headers to the request + */ + public static class BasicAuthInterceptor implements Interceptor { + private final String credentials; + + public BasicAuthInterceptor(String user, String password) { + credentials = Credentials.basic(user, password); + } + + @NotNull + @Override + public Response intercept(Chain chain) throws IOException { + // Get the existing request + Request request = chain.request(); + + // Replace with new request containing the authorization headers and previous headers + Request authenticatedRequest = request.newBuilder().header("Authorization", credentials).build(); + return chain.proceed(authenticatedRequest); + } + } +} diff --git a/contrib/storage-http/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-http/src/main/resources/bootstrap-storage-plugins.json new file mode 100644 index 0000000..9afcaee --- /dev/null +++ b/contrib/storage-http/src/main/resources/bootstrap-storage-plugins.json @@ -0,0 +1,9 @@ +{ + "storage":{ + "http" : { + "type":"http", + "connections": {}, + "enabled": false + } + } +} diff --git a/contrib/storage-http/src/main/resources/drill-module.conf b/contrib/storage-http/src/main/resources/drill-module.conf new file mode 100644 index 0000000..c0f2a93 --- /dev/null +++ b/contrib/storage-http/src/main/resources/drill-module.conf @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This file tells Drill to consider this module when class path scanning. +# This file can also include any supplementary configuration information. +# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. + +drill: { + classpath.scanning: { + packages += "org.apache.drill.exec.store.http" + } +} diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java new file mode 100644 index 0000000..f99df7c --- /dev/null +++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.http; + +import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.util.DrillFileUtils; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.physical.rowSet.RowSetBuilder; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.shaded.guava.com.google.common.base.Charsets; +import org.apache.drill.shaded.guava.com.google.common.io.Files; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import okio.Buffer; +import okio.Okio; + +/** + * Tests the HTTP Storage plugin. Since the plugin makes use of REST requests, + * this test class makes use of the okhttp3 MockWebServer to simulate a remote + * web server. There are two unit tests that make remote REST calls, however + * these tests are ignored by default. + * <p> + * The HTTP reader uses Drill's existing JSON reader class, so the unit tests + * focus on testing the plugin configurations rather than how well it parses the + * JSON as this is tested elsewhere. + */ +public class TestHttpPlugin extends ClusterTest { + + private static final int MOCK_SERVER_PORT = 8091; + private static String TEST_JSON_RESPONSE; + + @BeforeClass + public static void setup() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher)); + + TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response.json"), Charsets.UTF_8).read(); + + dirTestWatcher.copyResourceToRoot(Paths.get("data/")); + + Map<String, String> headers = new HashMap<>(); + headers.put("header1", "value1"); + headers.put("header2", "value2"); + + HttpAPIConfig mockConfig = new HttpAPIConfig("http://localhost:8091/", "GET", headers, "basic", "user", "pass",null); + + HttpAPIConfig sunriseConfig = new HttpAPIConfig("https://api.sunrise-sunset.org/", "GET", null, null, null, null, null); + + HttpAPIConfig stockConfig = new HttpAPIConfig("https://api.worldtradingdata.com/api/v1/stock?symbol=SNAP,TWTR,VOD" + + ".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4", "get", null, null, null, null, null); + + HttpAPIConfig mockPostConfig = new HttpAPIConfig("http://localhost:8091/", "POST", headers, null, null, null,"key1=value1\nkey2=value2"); + + Map<String, HttpAPIConfig> configs = new HashMap<>(); + configs.put("stock", stockConfig); + configs.put("sunrise", sunriseConfig); + configs.put("mock", mockConfig); + configs.put("mockpost", mockPostConfig); + + HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 2, "", 80, "", "", ""); + mockStorageConfigWithWorkspace.setEnabled(true); + cluster.defineStoragePlugin("api", mockStorageConfigWithWorkspace); + } + + @Test + public void verifyPluginConfig() throws Exception { + String sql = "SELECT SCHEMA_NAME, TYPE FROM INFORMATION_SCHEMA.`SCHEMATA` WHERE TYPE='http'"; + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("SCHEMA_NAME", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("TYPE", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("api.mock", "http") + .addRow("api.mockpost", "http") + .addRow("api.stock", "http") + .addRow("api.sunrise", "http") + .addRow("api", "http") + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + + /** + * Evaluates the HTTP plugin with the results from an API that returns the + * sunrise/sunset times for a given lat/long and date. API documentation is + * available here: https://sunrise-sunset.org/api + * + * The API returns results in the following format: + * <pre><code> + * { + * "results": + * { + * "sunrise":"7:27:02 AM", + * "sunset":"5:05:55 PM", + * "solar_noon":"12:16:28 PM", + * "day_length":"9:38:53", + * "civil_twilight_begin":"6:58:14 AM", + * "civil_twilight_end":"5:34:43 PM", + * "nautical_twilight_begin":"6:25:47 AM", + * "nautical_twilight_end":"6:07:10 PM", + * "astronomical_twilight_begin":"5:54:14 AM", + * "astronomical_twilight_end":"6:38:43 PM" + * }, + * "status":"OK" + * } + * }</code></pre> + * + * @throws Exception + * Throws exception if something goes awry + */ + @Test + @Ignore("Requires Remote Server") + public void simpleStarQuery() throws Exception { + String sql = "SELECT * FROM api.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`"; + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addMap("results") + .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .resumeSchema() + .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .build(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow( mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK") + .build(); + + int resultCount = results.rowCount(); + new RowSetComparison(expected).verifyAndClearAll(results); + + assertEquals(1, resultCount); + } + + @Test + @Ignore("Requires Remote Server") + public void simpleSpecificQuery() throws Exception { + String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset FROM api.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1"; + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("6:13:58 AM", "5:59:55 PM") + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testSerDe() throws Exception { + try (MockWebServer server = startServer()) { + + server.enqueue( + new MockResponse().setResponseCode(200) + .setBody(TEST_JSON_RESPONSE) + ); + + String sql = "SELECT COUNT(*) FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`"; + String plan = queryBuilder().sql(sql).explainJson(); + long cnt = queryBuilder().physical(plan).singletonLong(); + assertEquals("Counts should match",1L, cnt); + } + } + + @Test + public void simpleTestWithMockServer() throws Exception { + try (MockWebServer server = startServer()) { + + server.enqueue( + new MockResponse().setResponseCode(200) + .setBody(TEST_JSON_RESPONSE) + ); + + String sql = "SELECT * FROM api.mock.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`"; + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addMap("results") + .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .resumeSchema() + .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .build(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK") + .build(); + + int resultCount = results.rowCount(); + new RowSetComparison(expected).verifyAndClearAll(results); + + assertEquals(1, resultCount); + } + } + + @Test + public void testPostWithMockServer() throws Exception { + try (MockWebServer server = startServer()) { + + server.enqueue( + new MockResponse() + .setResponseCode(200) + .setBody(TEST_JSON_RESPONSE) + ); + + String sql = "SELECT * FROM api.mockPost.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`"; + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addMap("results") + .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .resumeSchema() + .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .build(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK") + .build(); + + int resultCount = results.rowCount(); + new RowSetComparison(expected).verifyAndClearAll(results); + + RecordedRequest recordedRequest = server.takeRequest(); + assertEquals("POST", recordedRequest.getMethod()); + assertEquals(recordedRequest.getHeader("header1"), "value1"); + assertEquals(recordedRequest.getHeader("header2"), "value2"); + assertEquals(1, resultCount); + } + } + + @Test + public void specificTestWithMockServer() throws Exception { + try (MockWebServer server = startServer()) { + + server.enqueue( + new MockResponse().setResponseCode(200) + .setBody(TEST_JSON_RESPONSE) + ); + + String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1"; + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("6:13:58 AM", "5:59:55 PM") + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + } + + @Test + public void testSlowResponse() throws Exception { + try (MockWebServer server = startServer()) { + + server.enqueue( + new MockResponse().setResponseCode(200) + .setBody(TEST_JSON_RESPONSE) + .throttleBody(64, 4, TimeUnit.SECONDS) + ); + + String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1"; + + try { + client.queryBuilder().sql(sql).rowSet(); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().contains("DATA_READ ERROR: timeout")); + } + } + } + + @Test + public void testZeroByteResponse() throws Exception { + try (MockWebServer server = startServer()) { + + server.enqueue( + new MockResponse().setResponseCode(200) + .setBody("") + ); + + String sql = "SELECT * FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`"; + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + assertNull(results); + } + } + + // Note that, in this test, the response is not empty. Instead, the + // response has a single row with no columns. + @Test + public void testEmptyJSONObjectResponse() throws Exception { + try (MockWebServer server = startServer()) { + + server.enqueue( + new MockResponse().setResponseCode(200) + .setBody("{}") + ); + + String sql = "SELECT * FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`"; + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow() + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + } + + @Test + public void testErrorResponse() throws Exception { + try (MockWebServer server = startServer()) { + + server.enqueue( + new MockResponse().setResponseCode(404) + .setBody("{}") + ); + + String sql = "SELECT * FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`"; + + try { + client.queryBuilder().sql(sql).rowSet(); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().contains("DATA_READ ERROR: Error retrieving data from HTTP Storage Plugin: 404 Client Error")); + } + } + } + + @Test + public void testHeaders() throws Exception { + try (MockWebServer server = startServer()) { + + server.enqueue( + new MockResponse().setResponseCode(200) + .setBody(TEST_JSON_RESPONSE) + ); + + String sql = "SELECT * FROM api.mock.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`"; + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + + TupleMetadata expectedSchema = new SchemaBuilder() + .addMap("results") + .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .resumeSchema() + .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .build(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow( mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK") + .build(); + + int resultCount = results.rowCount(); + new RowSetComparison(expected).verifyAndClearAll(results); + + assertEquals(1, resultCount); + + RecordedRequest request = server.takeRequest(); + assertEquals("value1", request.getHeader("header1")); + assertEquals("value2", request.getHeader("header2")); + assertEquals("Basic dXNlcjpwYXNz", request.getHeader("Authorization")); + } + } + + /** + * Helper function to convert files to a readable input steam. + * @param file The input file to be read + * @return A buffer to the file + * @throws IOException If the file is unreadable, throws an IOException + */ + private Buffer fileToBytes(File file) throws IOException { + Buffer result = new Buffer(); + result.writeAll(Okio.source(file)); + return result; + } + + /** + * Helper function to start the MockHTTPServer + * @return Started Mock server + * @throws IOException If the server cannot start, throws IOException + */ + private MockWebServer startServer() throws IOException { + MockWebServer server = new MockWebServer(); + server.start(MOCK_SERVER_PORT); + return server; + } +} diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpProxy.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpProxy.java new file mode 100644 index 0000000..f3eca88 --- /dev/null +++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpProxy.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.http; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.store.http.util.HttpProxyConfig; +import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyType; +import org.apache.drill.test.BaseTest; +import org.apache.drill.test.ConfigBuilder; +import org.junit.Ignore; +import org.junit.Test; + +import com.typesafe.config.Config; + +public class TestHttpProxy extends BaseTest { + + @Test + public void testBasics() { + HttpProxyConfig proxy = HttpProxyConfig.builder() + .type("socks") + .host(" foo.com ") + .port(1234) + .username(" bob ") + .password(" secret ") + .build(); + assertEquals(ProxyType.SOCKS, proxy.type); + assertEquals("foo.com", proxy.host); + assertEquals(1234, proxy.port); + assertEquals("bob", proxy.username); + assertEquals("secret", proxy.password); + } + + @Test + public void testURL() { + // See https://www.shellhacks.com/linux-proxy-server-settings-set-proxy-command-line/ + HttpProxyConfig proxy = HttpProxyConfig.builder() + .url("http://bob:sec...@foo.com:1234") + .build(); + assertEquals(ProxyType.HTTP, proxy.type); + assertEquals("foo.com", proxy.host); + assertEquals(1234, proxy.port); + assertEquals("bob", proxy.username); + assertEquals("secret", proxy.password); + } + + @Test + public void testURLAndConfig() { + HttpProxyConfig proxy = HttpProxyConfig.builder() + .url("http://foo.com:1234") + .username("bob") + .password("secret") + .build(); + assertEquals(ProxyType.HTTP, proxy.type); + assertEquals("foo.com", proxy.host); + assertEquals(1234, proxy.port); + assertEquals("bob", proxy.username); + assertEquals("secret", proxy.password); + } + + @Test + public void testNone() { + HttpProxyConfig proxy = HttpProxyConfig.builder() + .type("") + .host("foo.com") + .port(1234) + .username("bob") + .password("secret") + .build(); + assertEquals(ProxyType.NONE, proxy.type); + assertNull(proxy.host); + assertNull(proxy.username); + assertNull(proxy.password); + } + + @Test + public void testBlankType() { + HttpProxyConfig proxy = HttpProxyConfig.builder() + .type(" ") + .host("foo.com") + .port(1234) + .username("bob") + .password("secret") + .build(); + assertEquals(ProxyType.NONE, proxy.type); + assertNull(proxy.host); + assertNull(proxy.username); + assertNull(proxy.password); + } + + @Test + public void testBadType() { + HttpProxyConfig proxy = HttpProxyConfig.builder() + .type("bogus") + .host("foo.com") + .port(1234) + .username("bob") + .password("secret") + .build(); + assertEquals(ProxyType.NONE, proxy.type); + assertNull(proxy.host); + assertNull(proxy.username); + assertNull(proxy.password); + } + + @Test + public void testHttpConfig() { + Config config = new ConfigBuilder() + .put(ExecConstants.HTTP_PROXY_URL, "http://bob:sec...@foo.com:1234") + .build(); + HttpProxyConfig proxy = HttpProxyConfig.builder() + .fromHttpConfig(config) + .build(); + assertEquals(ProxyType.HTTP, proxy.type); + assertEquals("foo.com", proxy.host); + assertEquals(1234, proxy.port); + assertEquals("bob", proxy.username); + assertEquals("secret", proxy.password); + } + + @Test + public void testHttpUrlConfig() { + Config config = new ConfigBuilder() + .put(ExecConstants.HTTP_PROXY_URL, "") + .put(ExecConstants.HTTP_PROXY_TYPE, "socks") + .put(ExecConstants.HTTP_PROXY_HOST, "foo.com") + .put(ExecConstants.HTTP_PROXY_PORT, 1234) + .put(ExecConstants.HTTP_PROXY_USER_NAME, "bob") + .put(ExecConstants.HTTP_PROXY_PASSWORD, "secret") + .build(); + HttpProxyConfig proxy = HttpProxyConfig.builder() + .fromHttpConfig(config) + .build(); + assertEquals(ProxyType.SOCKS, proxy.type); + assertEquals("foo.com", proxy.host); + assertEquals(1234, proxy.port); + assertEquals("bob", proxy.username); + assertEquals("secret", proxy.password); + } + + @Test + public void testHttpsUrlConfig() { + Config config = new ConfigBuilder() + .put(ExecConstants.HTTPS_PROXY_URL, "https://bob:sec...@foo.com:1234") + .build(); + HttpProxyConfig proxy = HttpProxyConfig.builder() + .fromHttpsConfig(config) + .build(); + assertEquals(ProxyType.HTTP, proxy.type); + assertEquals("foo.com", proxy.host); + assertEquals(1234, proxy.port); + assertEquals("bob", proxy.username); + assertEquals("secret", proxy.password); + } + + @Test + public void testHttpsConfig() { + Config config = new ConfigBuilder() + .put(ExecConstants.HTTPS_PROXY_URL, "") + .put(ExecConstants.HTTPS_PROXY_TYPE, "socks") + .put(ExecConstants.HTTPS_PROXY_HOST, "foo.com") + .put(ExecConstants.HTTPS_PROXY_PORT, 1234) + .put(ExecConstants.HTTPS_PROXY_USER_NAME, "bob") + .put(ExecConstants.HTTPS_PROXY_PASSWORD, "secret") + .build(); + HttpProxyConfig proxy = HttpProxyConfig.builder() + .fromHttpsConfig(config) + .build(); + assertEquals(ProxyType.SOCKS, proxy.type); + assertEquals("foo.com", proxy.host); + assertEquals(1234, proxy.port); + assertEquals("bob", proxy.username); + assertEquals("secret", proxy.password); + } + + @Test + public void testConfigForUrl() { + Config config = new ConfigBuilder() + .put(ExecConstants.HTTP_PROXY_URL, "http://bob:sec...@foo.com:1234") + .put(ExecConstants.HTTPS_PROXY_URL, "http://alice:s3c...@bar.com:2345") + .build(); + doTestConfigForUrl(config); + } + + private void doTestConfigForUrl(Config config) { + HttpProxyConfig proxy = HttpProxyConfig.builder() + .fromConfigForURL(config, "http://google.com") + .build(); + assertEquals(ProxyType.HTTP, proxy.type); + assertEquals("foo.com", proxy.host); + assertEquals(1234, proxy.port); + assertEquals("bob", proxy.username); + assertEquals("secret", proxy.password); + + proxy = HttpProxyConfig.builder() + .fromConfigForURL(config, "https://google.com") + .build(); + assertEquals(ProxyType.HTTP, proxy.type); + assertEquals("bar.com", proxy.host); + assertEquals(2345, proxy.port); + assertEquals("alice", proxy.username); + assertEquals("s3cr3t", proxy.password); + } + + // To run this test, set two env vars in your run/debug + // configuration, then comment out the @Ignore: + // http_proxy=http://bob:sec...@foo.com:1234 + // https_proxy=http://alice:s3c...@bar.com:2345 + @Test + @Ignore("Requires manual setup") + public void testEnvVar() { + Config config = new ConfigBuilder() + .build(); + doTestConfigForUrl(config); + } +} diff --git a/contrib/storage-http/src/test/resources/data/response.json b/contrib/storage-http/src/test/resources/data/response.json new file mode 100644 index 0000000..bde2d5b --- /dev/null +++ b/contrib/storage-http/src/test/resources/data/response.json @@ -0,0 +1,14 @@ +{"results": + {"sunrise":"6:13:58 AM", + "sunset":"5:59:55 PM", + "solar_noon":"12:06:56 PM", + "day_length":"11:45:57", + "civil_twilight_begin":"5:48:14 AM", + "civil_twilight_end":"6:25:38 PM", + "nautical_twilight_begin":"5:18:16 AM", + "nautical_twilight_end":"6:55:36 PM", + "astronomical_twilight_begin":"4:48:07 AM", + "astronomical_twilight_end":"7:25:45 PM" + }, + "status":"OK" +} diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java index 35c974d..fea8197 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java @@ -37,7 +37,7 @@ public class KuduStoragePlugin extends AbstractStoragePlugin { private final KuduClient client; public KuduStoragePlugin(KuduStoragePluginConfig configuration, DrillbitContext context, String name) - throws IOException { + throws IOException { super(context, name); this.schemaFactory = new KuduSchemaFactory(this, name); this.engineConfig = configuration; diff --git a/distribution/pom.xml b/distribution/pom.xml index 420e14e..24c14a3 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -288,6 +288,11 @@ </dependency> <dependency> <groupId>org.apache.drill.contrib</groupId> + <artifactId>drill-storage-http</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.drill.contrib</groupId> <artifactId>drill-opentsdb-storage</artifactId> <version>${project.version}</version> </dependency> diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml index 2eb50fd..fefbac4 100644 --- a/distribution/src/assemble/component.xml +++ b/distribution/src/assemble/component.xml @@ -49,6 +49,7 @@ <include>org.apache.drill.contrib:drill-jdbc-storage:jar</include> <include>org.apache.drill.contrib:drill-kudu-storage:jar</include> <include>org.apache.drill.contrib:drill-storage-kafka:jar</include> + <include>org.apache.drill.contrib:drill-storage-http:jar</include> <include>org.apache.drill.contrib:drill-opentsdb-storage:jar</include> <include>org.apache.drill.contrib:drill-udfs:jar</include> </includes> diff --git a/distribution/src/main/resources/drill-override-example.conf b/distribution/src/main/resources/drill-override-example.conf index 6fcca37..034307b 100644 --- a/distribution/src/main/resources/drill-override-example.conf +++ b/distribution/src/main/resources/drill-override-example.conf @@ -342,6 +342,37 @@ drill.exec: { #ssl provider. May be "JDK" or "OPENSSL". Default is "JDK" provider: "JDK" } + + # HTTP client proxy configuration + net_proxy: { + + # HTTP URL. Omit if from a Linux env var + # See https://www.shellhacks.com/linux-proxy-server-settings-set-proxy-command-line/ + http_url: "", + + # Explicit HTTP setup, used if URL is not set + http: { + type: "none", # none, http, socks. Blank same as none. + host: "", + port: 80, + user_name: "", + password: "" + }, + + # HTTPS URL. Omit if from a Linux env var + https_url: "", + + # Explicit HTTPS setup, used if URL is not set + https: { + type: "none", # none, http, socks. Blank same as none. + host: "", + port: 80, + user_name: "", + password: "" + } + } +} + }, drill.metrics : { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 3a16353..027ebb5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -1211,4 +1211,21 @@ public final class ExecConstants { ENABLE_DYNAMIC_CREDIT_BASED_FC, new OptionDescription("Enable dynamic credit based flow control.This feature allows " + "the sender to send out its data more rapidly, but you should know that it has a risk to OOM when the system is solving parallel " + "large queries until we have a more accurate resource manager.")); + + // HTTP proxy configuration (Drill config) + public static final String NET_PROXY_BASE = "drill.exec.net_proxy"; + // HTTP proxy config + public static final String HTTP_PROXY_URL = NET_PROXY_BASE + ".http_url"; + public static final String HTTP_PROXY_TYPE = NET_PROXY_BASE + ".http.type"; + public static final String HTTP_PROXY_HOST = NET_PROXY_BASE + ".http.host"; + public static final String HTTP_PROXY_PORT = NET_PROXY_BASE + ".http.port"; + public static final String HTTP_PROXY_USER_NAME = NET_PROXY_BASE + ".http.user_name"; + public static final String HTTP_PROXY_PASSWORD = NET_PROXY_BASE + ".http.password"; + // HTTPS proxy config + public static final String HTTPS_PROXY_URL = NET_PROXY_BASE + ".https_url"; + public static final String HTTPS_PROXY_TYPE = NET_PROXY_BASE + ".https.type"; + public static final String HTTPS_PROXY_HOST = NET_PROXY_BASE + ".https.host"; + public static final String HTTPS_PROXY_PORT = NET_PROXY_BASE + ".https.port"; + public static final String HTTPS_PROXY_USER_NAME = NET_PROXY_BASE + ".https.user_name"; + public static final String HTTPS_PROXY_PASSWORD = NET_PROXY_BASE + ".https.password"; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index 0ab4181..731bf2d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -70,6 +70,7 @@ public class JSONRecordReader extends AbstractRecordReader { private final boolean skipMalformedJSONRecords; private final boolean printSkippedMalformedJSONRecordLineNumber; private ReadState write; + private InputStream inputStream; /** * Create a JSON Record Reader that uses a file based input stream. @@ -81,7 +82,7 @@ public class JSONRecordReader extends AbstractRecordReader { */ public JSONRecordReader(FragmentContext fragmentContext, Path inputPath, DrillFileSystem fileSystem, List<SchemaPath> columns) throws OutOfMemoryException { - this(fragmentContext, inputPath, null, fileSystem, columns); + this(fragmentContext, inputPath, null, fileSystem, columns, false); } /** @@ -94,16 +95,28 @@ public class JSONRecordReader extends AbstractRecordReader { */ public JSONRecordReader(FragmentContext fragmentContext, JsonNode embeddedContent, DrillFileSystem fileSystem, List<SchemaPath> columns) throws OutOfMemoryException { - this(fragmentContext, null, embeddedContent, fileSystem, columns); + this(fragmentContext, null, embeddedContent, fileSystem, columns, false); + } + + /** + * Create a JSON Record Reader that uses an InputStream directly + * @param fragmentContext The Drill Fragmement + * @param inputStream The inputStream from which data will be received + * @param columns pathnames of columns/subfields to read + * @throws OutOfMemoryException + */ + public JSONRecordReader(FragmentContext fragmentContext, List<SchemaPath> columns) throws OutOfMemoryException { + this(fragmentContext, null, null, null, columns, true); } private JSONRecordReader(FragmentContext fragmentContext, Path inputPath, JsonNode embeddedContent, - DrillFileSystem fileSystem, List<SchemaPath> columns) { + DrillFileSystem fileSystem, List<SchemaPath> columns, boolean hasInputStream) { Preconditions.checkArgument( - (inputPath == null && embeddedContent != null) || - (inputPath != null && embeddedContent == null), - "One of inputPath or embeddedContent must be set but not both." + (inputPath == null && embeddedContent != null && !hasInputStream) || + (inputPath != null && embeddedContent == null && !hasInputStream) || + (inputPath == null && embeddedContent == null && hasInputStream), + "One of inputPath, inputStream or embeddedContent must be set but not all." ); if (inputPath != null) { @@ -170,6 +183,8 @@ public class JSONRecordReader extends AbstractRecordReader { private void setupParser() throws IOException { if (hadoopPath != null) { jsonReader.setSource(stream); + } else if (inputStream!= null) { + jsonReader.setSource(inputStream); } else { jsonReader.setSource(embeddedContent); } @@ -253,11 +268,20 @@ public class JSONRecordReader extends AbstractRecordReader { runningRecordCount += recordCount; } + public void setInputStream(InputStream in) { + this.inputStream = in; + } + @Override public void close() throws Exception { if (stream != null) { stream.close(); stream = null; } + + if (inputStream != null) { + inputStream.close(); + inputStream = null; + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java index 4dd146e..1ce89fc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java @@ -63,7 +63,7 @@ public class JsonReader extends BaseJsonReader { private final FieldSelection selection; - private JsonReader(Builder builder) { + public JsonReader(Builder builder) { super(builder.managedBuf, builder.enableNanInf, builder.enableEscapeAnyChar, builder.skipOuterList); selection = FieldSelection.getFieldSelection(builder.columns); workingBuffer = builder.workingBuffer; diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 46ae53a..01f454d 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -458,10 +458,49 @@ drill.exec: { # to have a grace period that is atleast twice the amount of zookeeper # refresh time. grace_period_ms : 0, + # port hunting for drillbits. Enabled only for testing purposes. port_hunt : false, + # Allow drillbit to bind to loopback address in distributed mode. Enabled only for testing purposes. allow_loopback_address_binding : false + + # HTTP client proxy configuration + net_proxy: { + + # HTTP URL, usually from a Linux env var + # See https://www.shellhacks.com/linux-proxy-server-settings-set-proxy-command-line/ + http_url: "", + http_url: ${?HTTP_PROXY}, + http_url: ${?http_proxy}, + http_url: ${?all_proxy}, + http_url: ${?ALL_PROXY}, + + # Explicit HTTP setup, used if URL is not set + http: { + type: "none", # none, http, socks. Blank same as none. + host: "", + port: 80, + user_name: "", + password: "" + }, + + # HTTPS URL, usually from a Linux env var + https_url: "", + https_url: ${?HTTPS_PROXY}, + https_url: ${?https_proxy}, + https_url: ${?all_proxy}, + https_url: ${?ALL_PROXY}, + + # Explicit HTTPS setup, used if URL is not set + https: { + type: "none", # none, http, socks. Blank same as none. + host: "", + port: 80, + user_name: "", + password: "" + } + } } drill.jdbc: { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserBasics.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserBasics.java index d51e1a8..f8006cc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserBasics.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserBasics.java @@ -109,7 +109,7 @@ public class TestJsonParserBasics extends BaseTestJsonParser { @Test public void testExtendedFloat() { final String json = - "{a: NaN} {a: Infinity} {a: -Infinity}"; + "{a: NaN} {a: Infinity} {a: -Infinity}"; JsonParserFixture fixture = new JsonParserFixture(); fixture.options.allowNanInf = true; fixture.open(json); @@ -158,8 +158,8 @@ public class TestJsonParserBasics extends BaseTestJsonParser { public void testRootTuple() { final String json = "{id: 1, name: \"Fred\", balance: 100.0}\n" + - "{id: 2, name: \"Barney\"}\n" + - "{id: 3, name: \"Wilma\", balance: 500.00}"; + "{id: 2, name: \"Barney\"}\n" + + "{id: 3, name: \"Wilma\", balance: 500.00}"; JsonParserFixture fixture = new JsonParserFixture(); fixture.open(json); assertEquals(3, fixture.read()); @@ -235,7 +235,7 @@ public class TestJsonParserBasics extends BaseTestJsonParser { @Test public void testProjection() { final String json = - "{a: 1, b: [[{x: [[{y: []}]]}]]}\n" + + "{a: 1, b: [[{x: [[{y: []}]]}]]}\n" + "{a: 2}\n" + "{b: \"bar\"}"; JsonParserFixture fixture = new JsonParserFixture(); @@ -254,13 +254,13 @@ public class TestJsonParserBasics extends BaseTestJsonParser { @Test public void testAllTextMode() { final String json = - "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}"; + "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}"; JsonParserFixture fixture = new JsonParserFixture(); fixture.options.allTextMode = true; fixture.open(json); fixture.expect("a", - new Object[] {"1", "foo", "true", "20.5", null}); + new Object[] {"1", "foo", "true", "20.5", null}); assertFalse(fixture.next()); fixture.close(); } @@ -268,13 +268,13 @@ public class TestJsonParserBasics extends BaseTestJsonParser { @Test public void testColumnTextMode() { final String json = - "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}"; + "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}"; JsonParserFixture fixture = new JsonParserFixture(); fixture.rootObject.fieldType = FieldType.TEXT; fixture.open(json); fixture.expect("a", - new Object[] {"1", "foo", "true", "20.5", null}); + new Object[] {"1", "foo", "true", "20.5", null}); assertFalse(fixture.next()); fixture.close(); } @@ -282,13 +282,13 @@ public class TestJsonParserBasics extends BaseTestJsonParser { @Test public void testJsonModeScalars() { final String json = - "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}"; + "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}"; JsonParserFixture fixture = new JsonParserFixture(); fixture.rootObject.fieldType = FieldType.JSON; fixture.open(json); fixture.expect("a", - new Object[] {"1", "\"foo\"", "true", "20.5", "null"}); + new Object[] {"1", "\"foo\"", "true", "20.5", "null"}); assertFalse(fixture.next()); fixture.close(); } @@ -296,15 +296,15 @@ public class TestJsonParserBasics extends BaseTestJsonParser { @Test public void testJsonModeArrays() { final String json = - "{a: []} {a: [null]} {a: [null, null]} {a: [[]]}\n" + + "{a: []} {a: [null]} {a: [null, null]} {a: [[]]}\n" + "{a: [1, \"foo\", true]} {a: [[1, 2], [3, 4]]}\n"; JsonParserFixture fixture = new JsonParserFixture(); fixture.rootObject.fieldType = FieldType.JSON; fixture.open(json); fixture.expect("a", - new Object[] {"[]", "[null]", "[null, null]", "[[]]", - "[1, \"foo\", true]", "[[1, 2], [3, 4]]"}); + new Object[] {"[]", "[null]", "[null, null]", "[[]]", + "[1, \"foo\", true]", "[[1, 2], [3, 4]]"}); assertFalse(fixture.next()); fixture.close(); } @@ -312,15 +312,15 @@ public class TestJsonParserBasics extends BaseTestJsonParser { @Test public void testJsonModeObjects() { final String json = - "{a: {}} {a: {b: null}} {a: {b: null, b: null}}\n" + + "{a: {}} {a: {b: null}} {a: {b: null, b: null}}\n" + "{a: {b: {c: {d: [{e: 10}, null, 20], f: \"foo\"}, g:30}, h: 40}}\n"; JsonParserFixture fixture = new JsonParserFixture(); fixture.rootObject.fieldType = FieldType.JSON; fixture.open(json); fixture.expect("a", - new Object[] {"{}", "{\"b\": null}", "{\"b\": null, \"b\": null}", - "{\"b\": {\"c\": {\"d\": [{\"e\": 10}, null, 20], \"f\": \"foo\"}, \"g\": 30}, \"h\": 40}"}); + new Object[] {"{}", "{\"b\": null}", "{\"b\": null, \"b\": null}", + "{\"b\": {\"c\": {\"d\": [{\"e\": 10}, null, 20], \"f\": \"foo\"}, \"g\": 30}, \"h\": 40}"}); assertFalse(fixture.next()); fixture.close(); } diff --git a/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java b/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java index 0b5292d..1f4cfee 100644 --- a/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java +++ b/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java @@ -322,6 +322,27 @@ public class SchemaPath extends LogicalExpressionBase { return rootSegment; } + public String getAsUnescapedPath() { + StringBuilder sb = new StringBuilder(); + PathSegment seg = getRootSegment(); + if (seg.isArray()) { + throw new IllegalStateException("Drill doesn't currently support top level arrays"); + } + sb.append(seg.getNameSegment().getPath()); + + while ( (seg = seg.getChild()) != null) { + if (seg.isNamed()) { + sb.append('.'); + sb.append(seg.getNameSegment().getPath()); + } else { + sb.append('['); + sb.append(seg.getArraySegment().getIndex()); + sb.append(']'); + } + } + return sb.toString(); + } + @Override public MajorType getMajorType() { return Types.LATE_BIND_TYPE; diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 2583595..af148ef 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -667,6 +667,10 @@ public final class UserBitShared { * <code>METADATA_CONTROLLER = 67;</code> */ METADATA_CONTROLLER(67), + /** + * <code>HTTP_SUB_SCAN = 70;</code> + */ + HTTP_SUB_SCAN(70), ; /** @@ -941,6 +945,10 @@ public final class UserBitShared { * <code>METADATA_CONTROLLER = 67;</code> */ public static final int METADATA_CONTROLLER_VALUE = 67; + /** + * <code>HTTP_SUB_SCAN = 70;</code> + */ + public static final int HTTP_SUB_SCAN_VALUE = 70; public final int getNumber() { @@ -1025,6 +1033,7 @@ public final class UserBitShared { case 65: return SHP_SUB_SCAN; case 66: return METADATA_HANDLER; case 67: return METADATA_CONTROLLER; + case 70: return HTTP_SUB_SCAN; default: return null; } } @@ -27924,7 +27933,7 @@ public final class UserBitShared { "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" + "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014" + "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" + - "\032\n\026CANCELLATION_REQUESTED\020\006*\344\n\n\020CoreOper" + + "\032\n\026CANCELLATION_REQUESTED\020\006*\367\n\n\020CoreOper" + "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" + "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" + "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" + @@ -27959,11 +27968,11 @@ public final class UserBitShared { "MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>\022\021\n\rHDF5_SUB_S" + "CAN\020?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA" + "N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO" + - "NTROLLER\020C*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN" + - "\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002" + - "\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033o" + - "rg.apache.drill.exec.protoB\rUserBitShare" + - "dH\001" + "NTROLLER\020C\022\021\n\rHTTP_SUB_SCAN\020F*g\n\nSaslSta" + + "tus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n" + + "\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n" + + "\013SASL_FAILED\020\004B.\n\033org.apache.drill.exec." + + "protoB\rUserBitSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index 55cfde5..c51cc66 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -379,6 +379,7 @@ enum CoreOperatorType { SHP_SUB_SCAN = 65; METADATA_HANDLER = 66; METADATA_CONTROLLER = 67; + HTTP_SUB_SCAN = 70; } /* Registry that contains list of jars, each jar contains its name and list of function signatures.