This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 981aa64aa6 DRILL-8437: Add Header Index Pagination (#2806)
981aa64aa6 is described below
commit 981aa64aa600496456706f0da0e036a04064923f
Author: Charles S. Givre <[email protected]>
AuthorDate: Sun May 21 16:28:59 2023 -0400
DRILL-8437: Add Header Index Pagination (#2806)
---
contrib/storage-http/.gitignore | 2 +-
contrib/storage-http/Pagination.md | 12 +++
.../drill/exec/store/http/HttpBatchReader.java | 4 +-
.../drill/exec/store/http/HttpCSVBatchReader.java | 1 +
.../drill/exec/store/http/HttpPaginatorConfig.java | 13 ++-
.../exec/store/http/HttpScanBatchCreator.java | 23 ++--
.../store/http/paginator/HeaderIndexPaginator.java | 116 +++++++++++++++++++++
.../exec/store/http/paginator/PagePaginator.java | 2 +-
.../drill/exec/store/http/util/SimpleHttp.java | 9 +-
.../drill/exec/store/http/TestPagination.java | 37 +++++++
10 files changed, 204 insertions(+), 15 deletions(-)
diff --git a/contrib/storage-http/.gitignore b/contrib/storage-http/.gitignore
index 710368b187..05c3e7ac9a 100644
--- a/contrib/storage-http/.gitignore
+++ b/contrib/storage-http/.gitignore
@@ -1 +1 @@
-./src/test/resources/logback-test.xml
+/src/test/resources/logback-test.xml
diff --git a/contrib/storage-http/Pagination.md
b/contrib/storage-http/Pagination.md
index f555060f03..0647dbc3bb 100644
--- a/contrib/storage-http/Pagination.md
+++ b/contrib/storage-http/Pagination.md
@@ -76,3 +76,15 @@ There are three possible parameters:
** Note: Index / Keyset Pagination is only implemented for APIs that return
JSON **
+
+## Header Index Pagination
+Header index pagination is used when the API in question returns a link to the
next page in the response header. Shopify is one such example of an API that
does this.
+
+The only configuration option is the `nextPageParam` which is the parameter
that Drill should look for in the response header.
+
+```json
+ "paginator": {
+ "nextPageParam": "page",
+ "method": "HEADER_INDEX"
+ }
+```
\ No newline at end of file
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index e6661ad043..85d8073ddf 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.InputStream;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -118,6 +119,7 @@ public class HttpBatchReader implements
ManagedReader<SchemaNegotiator> {
.scanDefn(subScan)
.url(url)
.tempDir(new File(tempDirPath))
+ .paginator(paginator)
.proxyConfig(proxySettings(negotiator.drillConfig(), url))
.errorContext(errorContext)
.build();
@@ -225,7 +227,7 @@ public class HttpBatchReader implements
ManagedReader<SchemaNegotiator> {
protected Map<String, Object> generatePaginationFieldMap() {
if (paginator == null || paginator.getMode() != PaginatorMethod.INDEX) {
- return null;
+ return Collections.emptyMap();
}
Map<String, Object> fieldMap = new HashMap<>();
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
index df55433678..ab710e9463 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
@@ -99,6 +99,7 @@ public class HttpCSVBatchReader extends HttpBatchReader {
.scanDefn(subScan)
.url(url)
.tempDir(new File(tempDirPath))
+ .paginator(paginator)
.proxyConfig(proxySettings(negotiator.drillConfig(), url))
.errorContext(errorContext)
.build();
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
index e6b226f8c6..9eb2f3d7e6 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
@@ -133,10 +133,18 @@ public class HttpPaginatorConfig {
.build(logger);
}
break;
+ case HEADER_INDEX:
+ if (StringUtils.isEmpty(this.nextPageParam)) {
+ throw UserException
+ .validationError()
+ .message("Invalid paginator configuration. For HEADER_INDEX
pagination, the nextPageParam must be defined.")
+ .build(logger);
+ }
+ break;
default:
throw UserException
.validationError()
- .message("Invalid paginator method: %s. Drill supports 'OFFSET',
'INDEX' and 'PAGE'", method)
+ .message("Invalid paginator method: %s. Drill supports 'OFFSET',
'INDEX', 'HEADER_INDEX' and 'PAGE'", method)
.build(logger);
}
}
@@ -230,7 +238,8 @@ public class HttpPaginatorConfig {
public enum PaginatorMethod {
OFFSET,
PAGE,
- INDEX
+ INDEX,
+ HEADER_INDEX
}
@JsonIgnore
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
index 0da06a4c43..d5f277fe5f 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
+import org.apache.drill.exec.store.http.paginator.HeaderIndexPaginator;
import org.apache.drill.exec.store.http.paginator.IndexPaginator;
import org.apache.drill.exec.store.http.paginator.OffsetPaginator;
import org.apache.drill.exec.store.http.paginator.PagePaginator;
@@ -92,7 +93,7 @@ public class HttpScanBatchCreator implements
BatchCreator<HttpSubScan> {
private static class HttpReaderFactory implements ReaderFactory {
private final HttpSubScan subScan;
private final HttpPaginatorConfig paginatorConfig;
- private Paginator paginator;
+ private final Paginator paginator;
private int count;
@@ -105,6 +106,8 @@ public class HttpScanBatchCreator implements
BatchCreator<HttpSubScan> {
// Initialize the paginator and generate the base URLs
this.paginator = getPaginator();
+ } else {
+ this.paginator = null;
}
}
@@ -119,8 +122,6 @@ public class HttpScanBatchCreator implements
BatchCreator<HttpSubScan> {
rawUrl = HttpUrl.parse(subScan.tableSpec().connectionConfig().url());
}
-
-
// If the URL is not parsable or otherwise invalid
if (rawUrl == null) {
throw UserException.validationError()
@@ -130,28 +131,33 @@ public class HttpScanBatchCreator implements
BatchCreator<HttpSubScan> {
urlBuilder = rawUrl.newBuilder();
- Paginator paginator = null;
if (paginatorConfig.getMethodType() == PaginatorMethod.OFFSET) {
- paginator = new OffsetPaginator(urlBuilder,
+ return new OffsetPaginator(urlBuilder,
subScan.maxRecords(),
paginatorConfig.pageSize(),
paginatorConfig.limitParam(),
paginatorConfig.offsetParam());
} else if (paginatorConfig.getMethodType() == PaginatorMethod.PAGE) {
- paginator = new PagePaginator(urlBuilder,
+ return new PagePaginator(urlBuilder,
subScan.maxRecords(),
paginatorConfig.pageSize(),
paginatorConfig.pageParam(),
paginatorConfig.pageSizeParam());
} else if (paginatorConfig.getMethodType() == PaginatorMethod.INDEX) {
- paginator = new IndexPaginator(urlBuilder,
+ return new IndexPaginator(urlBuilder,
0, // Page size not used for Index/Keyset pagination
subScan.maxRecords(),
paginatorConfig.hasMoreParam(),
paginatorConfig.indexParam(),
paginatorConfig.nextPageParam());
+ } else if (paginatorConfig.getMethodType() ==
PaginatorMethod.HEADER_INDEX) {
+ return new HeaderIndexPaginator(urlBuilder,
+ subScan.maxRecords(),
+ paginatorConfig.pageSize(),
+ paginatorConfig.nextPageParam(),
+ subScan.tableSpec().connectionConfig().url());
}
- return paginator;
+ return null;
}
@Override
@@ -181,6 +187,7 @@ public class HttpScanBatchCreator implements
BatchCreator<HttpSubScan> {
* the group scan such that the calls could be sent to different
drillbits.
*/
if (!paginator.hasNext()) {
+ logger.debug("Ending Batch Generation.");
return null;
}
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/HeaderIndexPaginator.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/HeaderIndexPaginator.java
new file mode 100644
index 0000000000..8bf388fc11
--- /dev/null
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/HeaderIndexPaginator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http.paginator;
+
+import okhttp3.Headers;
+import okhttp3.HttpUrl.Builder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * The Header Index Paginator is used when the API in question send a link in
the HTTP header
+ * containing the URL for the next page.
+ */
+public class HeaderIndexPaginator extends Paginator {
+
+ private static final Logger logger =
LoggerFactory.getLogger(HeaderIndexPaginator.class);
+ private static final Pattern URL_REGEX =
Pattern.compile("(https?:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*))");
+
+ private final String nextPageParam;
+ private final String firstPageURL;
+ private Headers headers;
+ private boolean firstPage;
+ private int pageCount;
+
+ public HeaderIndexPaginator(Builder builder, int pageSize, int limit, String
nextPageParam, String firstPageURL) {
+ super(builder, PaginatorMethod.HEADER_INDEX, pageSize, limit);
+ this.nextPageParam = nextPageParam;
+ this.firstPageURL = firstPageURL;
+ this.firstPage = true;
+ this.pageCount = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ // If the headers are null and it isn't the first page, end pagination
+ if ( !firstPage &&
+ (headers == null || StringUtils.isEmpty(headers.get(nextPageParam)))
+ ) {
+ notifyPartialPage();
+ logger.debug("Ending pagination. No additional info in headers.");
+ return false;
+ }
+
+ return !partialPageReceived;
+ }
+
+ /**
+ * This method sets the headers for the Header Index Paginator. This must
be called with updated headers
+ * before the {@link #next()} method is called.
+ * @param headers A {@link Headers} object containing the response headers
from the previous API call.
+ */
+ public void setResponseHeaders(Headers headers) {
+ logger.debug("Setting response headers. ");
+ this.headers = headers;
+
+ // If the next page URL is empty or otherwise undefined, halt pagination.
+ if (StringUtils.isEmpty(headers.get(nextPageParam))) {
+ notifyPartialPage();
+ }
+ }
+
+ @Override
+ public String next() {
+ pageCount++;
+ if (firstPage) {
+ firstPage = false;
+ return firstPageURL;
+ }
+
+ if (headers == null) {
+ throw UserException.dataReadError()
+ .message("Headers are empty. HeaderIndex Pagination requires
parameters that are passed in the HTTP header." + pageCount)
+ .build(logger);
+ }
+ // Now attempt to retrieve the field from the response headers.
+ String nextPage = headers.get(nextPageParam);
+
+ // If the next page value is null or empty, halt pagination
+ if (StringUtils.isEmpty(nextPage)) {
+ super.notifyPartialPage();
+ return null;
+ }
+
+ logger.debug("Found next page URL: {}", nextPage);
+
+ // Clean up any extraneous garbage from the header field.
+ Matcher urlMatcher = URL_REGEX.matcher(nextPage);
+ if (urlMatcher.find()) {
+ return urlMatcher.group(1);
+ }
+
+ return nextPage;
+ }
+}
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
index b640d3a3dd..a7bd7e5598 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
@@ -28,7 +28,7 @@ import java.util.NoSuchElementException;
public class PagePaginator extends Paginator {
- private static final Logger logger =
LoggerFactory.getLogger(OffsetPaginator.class);
+ private static final Logger logger =
LoggerFactory.getLogger(PagePaginator.class);
private final int limit;
private final String pageParam;
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 3568fe9213..6ba9918f6d 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -50,9 +50,11 @@ import
org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.exec.store.http.HttpApiConfig;
import org.apache.drill.exec.store.http.HttpApiConfig.HttpMethod;
import org.apache.drill.exec.store.http.HttpApiConfig.PostLocation;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
import org.apache.drill.exec.store.http.HttpStoragePlugin;
import org.apache.drill.exec.store.http.HttpStoragePluginConfig;
import org.apache.drill.exec.store.http.HttpSubScan;
+import org.apache.drill.exec.store.http.paginator.HeaderIndexPaginator;
import org.apache.drill.exec.store.http.paginator.Paginator;
import org.apache.drill.exec.store.http.oauth.AccessTokenAuthenticator;
import org.apache.drill.exec.store.http.oauth.AccessTokenInterceptor;
@@ -110,8 +112,6 @@ public class SimpleHttp implements AutoCloseable {
.writeTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
.readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
.build();
-
-
private final OkHttpClient client;
private final File tempDir;
private final HttpProxyConfig proxyConfig;
@@ -436,6 +436,11 @@ public class SimpleHttp implements AutoCloseable {
logger.debug("HTTP Request for {} successful.", url());
logger.debug("Response Headers: {} ", response.headers());
+ // In the case of Header Index Pagination, send the header(s) to the
paginator
+ if (paginator != null && paginator.getMode() ==
PaginatorMethod.HEADER_INDEX) {
+
((HeaderIndexPaginator)paginator).setResponseHeaders(response.headers());
+ }
+
// Return the InputStream of the response. Note that it is necessary and
// and sufficient that the caller invokes close() on the returned stream.
return Objects.requireNonNull(response.body()).byteStream();
diff --git
a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
index afa5fe3581..e5294a9e00 100644
---
a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
+++
b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
@@ -164,6 +164,20 @@ public class TestPagination extends ClusterTest {
headers.put("header1", "value1");
headers.put("header2", "value2");
+ HttpPaginatorConfig headerIndexPaginator = HttpPaginatorConfig.builder()
+ .nextPageParam("link")
+ .pageSize(10)
+ .method("header_index")
+ .build();
+
+ HttpApiConfig mockJsonConfigWithHeaderIndex = HttpApiConfig.builder()
+ .url("http://localhost:8092/json")
+ .method("get")
+ .requireTail(false)
+ .paginator(headerIndexPaginator)
+ .inputType("json")
+ .build();
+
HttpPaginatorConfig offsetPaginatorForJson = HttpPaginatorConfig.builder()
.limitParam("limit")
@@ -329,6 +343,7 @@ public class TestPagination extends ClusterTest {
configs.put("json_tail", mockJsonConfigWithPaginatorAndTail);
configs.put("xml_paginator", mockXmlConfigWithPaginator);
configs.put("xml_paginator_url_params",
mockXmlConfigWithPaginatorAndUrlParams);
+ configs.put("customers", mockJsonConfigWithHeaderIndex);
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
new HttpStoragePluginConfig(false, configs, 2,1000, null, null, "", 80,
"", "", "", null,
@@ -338,6 +353,28 @@ public class TestPagination extends ClusterTest {
}
+ @Test
+ public void testPagePaginationWithHeaderIndex() throws Exception {
+ String sql = "SELECT col1, _response_url FROM `local`.`customers`";
+ try (MockWebServer server = startServer()) {
+ server.enqueue(new
MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1).setHeader("link",
"http://localhost:8092/json?page=2"));
+ server.enqueue(new
MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE2).setHeader("link",
"http://localhost:8092/json?page=3"));
+ server.enqueue(new
MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE3));
+
+ List<QueryDataBatch> results = client.queryBuilder()
+ .sql(sql)
+ .results();
+
+ int count = 0;
+ for(QueryDataBatch b : results){
+ count += b.getHeader().getRowCount();
+ b.release();
+ }
+
+ assertEquals(3, results.size());
+ }
+ }
+
@Test
@Ignore("Requires Live Connection to Github")
public void testPagePaginationWithURLParameters() throws Exception {