This is an automated email from the ASF dual-hosted git repository. cgivre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 981aa64aa6 DRILL-8437: Add Header Index Pagination (#2806) 981aa64aa6 is described below commit 981aa64aa600496456706f0da0e036a04064923f Author: Charles S. Givre <cgi...@apache.org> AuthorDate: Sun May 21 16:28:59 2023 -0400 DRILL-8437: Add Header Index Pagination (#2806) --- contrib/storage-http/.gitignore | 2 +- contrib/storage-http/Pagination.md | 12 +++ .../drill/exec/store/http/HttpBatchReader.java | 4 +- .../drill/exec/store/http/HttpCSVBatchReader.java | 1 + .../drill/exec/store/http/HttpPaginatorConfig.java | 13 ++- .../exec/store/http/HttpScanBatchCreator.java | 23 ++-- .../store/http/paginator/HeaderIndexPaginator.java | 116 +++++++++++++++++++++ .../exec/store/http/paginator/PagePaginator.java | 2 +- .../drill/exec/store/http/util/SimpleHttp.java | 9 +- .../drill/exec/store/http/TestPagination.java | 37 +++++++ 10 files changed, 204 insertions(+), 15 deletions(-) diff --git a/contrib/storage-http/.gitignore b/contrib/storage-http/.gitignore index 710368b187..05c3e7ac9a 100644 --- a/contrib/storage-http/.gitignore +++ b/contrib/storage-http/.gitignore @@ -1 +1 @@ -./src/test/resources/logback-test.xml +/src/test/resources/logback-test.xml diff --git a/contrib/storage-http/Pagination.md b/contrib/storage-http/Pagination.md index f555060f03..0647dbc3bb 100644 --- a/contrib/storage-http/Pagination.md +++ b/contrib/storage-http/Pagination.md @@ -76,3 +76,15 @@ There are three possible parameters: ** Note: Index / Keyset Pagination is only implemented for APIs that return JSON ** + +## Header Index Pagination +Header index pagination is used when the API in question returns a link to the next page in the response header. Shopify is one such example of an API that does this. + +The only configuration option is the `nextPageParam` which is the parameter that Drill should look for in the response header. + +```json + "paginator": { + "nextPageParam": "page", + "method": "HEADER_INDEX" + } +``` \ No newline at end of file diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java index e6661ad043..85d8073ddf 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java @@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.InputStream; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -118,6 +119,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> { .scanDefn(subScan) .url(url) .tempDir(new File(tempDirPath)) + .paginator(paginator) .proxyConfig(proxySettings(negotiator.drillConfig(), url)) .errorContext(errorContext) .build(); @@ -225,7 +227,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> { protected Map<String, Object> generatePaginationFieldMap() { if (paginator == null || paginator.getMode() != PaginatorMethod.INDEX) { - return null; + return Collections.emptyMap(); } Map<String, Object> fieldMap = new HashMap<>(); diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java index df55433678..ab710e9463 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java @@ -99,6 +99,7 @@ public class HttpCSVBatchReader extends HttpBatchReader { .scanDefn(subScan) .url(url) .tempDir(new File(tempDirPath)) + .paginator(paginator) .proxyConfig(proxySettings(negotiator.drillConfig(), url)) .errorContext(errorContext) .build(); diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java index e6b226f8c6..9eb2f3d7e6 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java @@ -133,10 +133,18 @@ public class HttpPaginatorConfig { .build(logger); } break; + case HEADER_INDEX: + if (StringUtils.isEmpty(this.nextPageParam)) { + throw UserException + .validationError() + .message("Invalid paginator configuration. For HEADER_INDEX pagination, the nextPageParam must be defined.") + .build(logger); + } + break; default: throw UserException .validationError() - .message("Invalid paginator method: %s. Drill supports 'OFFSET', 'INDEX' and 'PAGE'", method) + .message("Invalid paginator method: %s. Drill supports 'OFFSET', 'INDEX', 'HEADER_INDEX' and 'PAGE'", method) .build(logger); } } @@ -230,7 +238,8 @@ public class HttpPaginatorConfig { public enum PaginatorMethod { OFFSET, PAGE, - INDEX + INDEX, + HEADER_INDEX } @JsonIgnore diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java index 0da06a4c43..d5f277fe5f 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java @@ -34,6 +34,7 @@ import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod; +import org.apache.drill.exec.store.http.paginator.HeaderIndexPaginator; import org.apache.drill.exec.store.http.paginator.IndexPaginator; import org.apache.drill.exec.store.http.paginator.OffsetPaginator; import org.apache.drill.exec.store.http.paginator.PagePaginator; @@ -92,7 +93,7 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> { private static class HttpReaderFactory implements ReaderFactory { private final HttpSubScan subScan; private final HttpPaginatorConfig paginatorConfig; - private Paginator paginator; + private final Paginator paginator; private int count; @@ -105,6 +106,8 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> { // Initialize the paginator and generate the base URLs this.paginator = getPaginator(); + } else { + this.paginator = null; } } @@ -119,8 +122,6 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> { rawUrl = HttpUrl.parse(subScan.tableSpec().connectionConfig().url()); } - - // If the URL is not parsable or otherwise invalid if (rawUrl == null) { throw UserException.validationError() @@ -130,28 +131,33 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> { urlBuilder = rawUrl.newBuilder(); - Paginator paginator = null; if (paginatorConfig.getMethodType() == PaginatorMethod.OFFSET) { - paginator = new OffsetPaginator(urlBuilder, + return new OffsetPaginator(urlBuilder, subScan.maxRecords(), paginatorConfig.pageSize(), paginatorConfig.limitParam(), paginatorConfig.offsetParam()); } else if (paginatorConfig.getMethodType() == PaginatorMethod.PAGE) { - paginator = new PagePaginator(urlBuilder, + return new PagePaginator(urlBuilder, subScan.maxRecords(), paginatorConfig.pageSize(), paginatorConfig.pageParam(), paginatorConfig.pageSizeParam()); } else if (paginatorConfig.getMethodType() == PaginatorMethod.INDEX) { - paginator = new IndexPaginator(urlBuilder, + return new IndexPaginator(urlBuilder, 0, // Page size not used for Index/Keyset pagination subScan.maxRecords(), paginatorConfig.hasMoreParam(), paginatorConfig.indexParam(), paginatorConfig.nextPageParam()); + } else if (paginatorConfig.getMethodType() == PaginatorMethod.HEADER_INDEX) { + return new HeaderIndexPaginator(urlBuilder, + subScan.maxRecords(), + paginatorConfig.pageSize(), + paginatorConfig.nextPageParam(), + subScan.tableSpec().connectionConfig().url()); } - return paginator; + return null; } @Override @@ -181,6 +187,7 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> { * the group scan such that the calls could be sent to different drillbits. */ if (!paginator.hasNext()) { + logger.debug("Ending Batch Generation."); return null; } diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/HeaderIndexPaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/HeaderIndexPaginator.java new file mode 100644 index 0000000000..8bf388fc11 --- /dev/null +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/HeaderIndexPaginator.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.http.paginator; + +import okhttp3.Headers; +import okhttp3.HttpUrl.Builder; +import org.apache.commons.lang3.StringUtils; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * The Header Index Paginator is used when the API in question send a link in the HTTP header + * containing the URL for the next page. + */ +public class HeaderIndexPaginator extends Paginator { + + private static final Logger logger = LoggerFactory.getLogger(HeaderIndexPaginator.class); + private static final Pattern URL_REGEX = Pattern.compile("(https?:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*))"); + + private final String nextPageParam; + private final String firstPageURL; + private Headers headers; + private boolean firstPage; + private int pageCount; + + public HeaderIndexPaginator(Builder builder, int pageSize, int limit, String nextPageParam, String firstPageURL) { + super(builder, PaginatorMethod.HEADER_INDEX, pageSize, limit); + this.nextPageParam = nextPageParam; + this.firstPageURL = firstPageURL; + this.firstPage = true; + this.pageCount = 0; + } + + @Override + public boolean hasNext() { + // If the headers are null and it isn't the first page, end pagination + if ( !firstPage && + (headers == null || StringUtils.isEmpty(headers.get(nextPageParam))) + ) { + notifyPartialPage(); + logger.debug("Ending pagination. No additional info in headers."); + return false; + } + + return !partialPageReceived; + } + + /** + * This method sets the headers for the Header Index Paginator. This must be called with updated headers + * before the {@link #next()} method is called. + * @param headers A {@link Headers} object containing the response headers from the previous API call. + */ + public void setResponseHeaders(Headers headers) { + logger.debug("Setting response headers. "); + this.headers = headers; + + // If the next page URL is empty or otherwise undefined, halt pagination. + if (StringUtils.isEmpty(headers.get(nextPageParam))) { + notifyPartialPage(); + } + } + + @Override + public String next() { + pageCount++; + if (firstPage) { + firstPage = false; + return firstPageURL; + } + + if (headers == null) { + throw UserException.dataReadError() + .message("Headers are empty. HeaderIndex Pagination requires parameters that are passed in the HTTP header." + pageCount) + .build(logger); + } + // Now attempt to retrieve the field from the response headers. + String nextPage = headers.get(nextPageParam); + + // If the next page value is null or empty, halt pagination + if (StringUtils.isEmpty(nextPage)) { + super.notifyPartialPage(); + return null; + } + + logger.debug("Found next page URL: {}", nextPage); + + // Clean up any extraneous garbage from the header field. + Matcher urlMatcher = URL_REGEX.matcher(nextPage); + if (urlMatcher.find()) { + return urlMatcher.group(1); + } + + return nextPage; + } +} diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java index b640d3a3dd..a7bd7e5598 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java @@ -28,7 +28,7 @@ import java.util.NoSuchElementException; public class PagePaginator extends Paginator { - private static final Logger logger = LoggerFactory.getLogger(OffsetPaginator.class); + private static final Logger logger = LoggerFactory.getLogger(PagePaginator.class); private final int limit; private final String pageParam; diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java index 3568fe9213..6ba9918f6d 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java @@ -50,9 +50,11 @@ import org.apache.drill.exec.store.StoragePluginRegistry.PluginException; import org.apache.drill.exec.store.http.HttpApiConfig; import org.apache.drill.exec.store.http.HttpApiConfig.HttpMethod; import org.apache.drill.exec.store.http.HttpApiConfig.PostLocation; +import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod; import org.apache.drill.exec.store.http.HttpStoragePlugin; import org.apache.drill.exec.store.http.HttpStoragePluginConfig; import org.apache.drill.exec.store.http.HttpSubScan; +import org.apache.drill.exec.store.http.paginator.HeaderIndexPaginator; import org.apache.drill.exec.store.http.paginator.Paginator; import org.apache.drill.exec.store.http.oauth.AccessTokenAuthenticator; import org.apache.drill.exec.store.http.oauth.AccessTokenInterceptor; @@ -110,8 +112,6 @@ public class SimpleHttp implements AutoCloseable { .writeTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS) .readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS) .build(); - - private final OkHttpClient client; private final File tempDir; private final HttpProxyConfig proxyConfig; @@ -436,6 +436,11 @@ public class SimpleHttp implements AutoCloseable { logger.debug("HTTP Request for {} successful.", url()); logger.debug("Response Headers: {} ", response.headers()); + // In the case of Header Index Pagination, send the header(s) to the paginator + if (paginator != null && paginator.getMode() == PaginatorMethod.HEADER_INDEX) { + ((HeaderIndexPaginator)paginator).setResponseHeaders(response.headers()); + } + // Return the InputStream of the response. Note that it is necessary and // and sufficient that the caller invokes close() on the returned stream. return Objects.requireNonNull(response.body()).byteStream(); diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java index afa5fe3581..e5294a9e00 100644 --- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java +++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java @@ -164,6 +164,20 @@ public class TestPagination extends ClusterTest { headers.put("header1", "value1"); headers.put("header2", "value2"); + HttpPaginatorConfig headerIndexPaginator = HttpPaginatorConfig.builder() + .nextPageParam("link") + .pageSize(10) + .method("header_index") + .build(); + + HttpApiConfig mockJsonConfigWithHeaderIndex = HttpApiConfig.builder() + .url("http://localhost:8092/json") + .method("get") + .requireTail(false) + .paginator(headerIndexPaginator) + .inputType("json") + .build(); + HttpPaginatorConfig offsetPaginatorForJson = HttpPaginatorConfig.builder() .limitParam("limit") @@ -329,6 +343,7 @@ public class TestPagination extends ClusterTest { configs.put("json_tail", mockJsonConfigWithPaginatorAndTail); configs.put("xml_paginator", mockXmlConfigWithPaginator); configs.put("xml_paginator_url_params", mockXmlConfigWithPaginatorAndUrlParams); + configs.put("customers", mockJsonConfigWithHeaderIndex); HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 2,1000, null, null, "", 80, "", "", "", null, @@ -338,6 +353,28 @@ public class TestPagination extends ClusterTest { } + @Test + public void testPagePaginationWithHeaderIndex() throws Exception { + String sql = "SELECT col1, _response_url FROM `local`.`customers`"; + try (MockWebServer server = startServer()) { + server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1).setHeader("link", "http://localhost:8092/json?page=2")); + server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE2).setHeader("link", "http://localhost:8092/json?page=3")); + server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE3)); + + List<QueryDataBatch> results = client.queryBuilder() + .sql(sql) + .results(); + + int count = 0; + for(QueryDataBatch b : results){ + count += b.getHeader().getRowCount(); + b.release(); + } + + assertEquals(3, results.size()); + } + } + @Test @Ignore("Requires Live Connection to Github") public void testPagePaginationWithURLParameters() throws Exception {