This is an automated email from the ASF dual-hosted git repository.
progers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 842ee554de Refinements to input-source specific table functions
(#13780)
842ee554de is described below
commit 842ee554de0556f190ed4bdc042d3ee6f54a210d
Author: Paul Rogers <[email protected]>
AuthorDate: Mon Feb 13 16:21:27 2023 -0800
Refinements to input-source specific table functions (#13780)
Refinements to table functions
Fixes various bugs
Improves the structure of the table function classes
Adds unit and integration tests
---
.../catalog/guice/CatalogCoordinatorModule.java | 11 +-
.../druid/testsEx/msq/ITMultiStageQuery.java | 80 +++++++--
.../catalog/model/table/BaseInputSourceDefn.java | 4 +
.../catalog/model/table/BaseTableFunction.java | 13 +-
.../model/table/FormattedInputSourceDefn.java | 11 +-
.../catalog/model/table/HttpInputSourceDefn.java | 18 ++-
.../catalog/model/table/LocalInputSourceDefn.java | 5 +-
.../model/table/HttpInputSourceDefnTest.java | 76 +++++++--
.../CatalogExternalTableOperatorConversion.java | 178 ---------------------
.../sql/calcite/external/DruidTableMacro.java | 101 ++++++++++++
.../external/DruidUserDefinedTableMacro.java | 70 ++++++++
.../DruidUserDefinedTableMacroConversion.java | 81 ++++++++++
.../druid/sql/calcite/external/ExtendOperator.java | 37 ++++-
.../external/ExternalOperatorConversion.java | 10 +-
.../calcite/external/HttpOperatorConversion.java | 2 +-
.../calcite/external/InlineOperatorConversion.java | 6 +-
.../calcite/external/LocalOperatorConversion.java | 6 +-
....java => SchemaAwareUserDefinedTableMacro.java} | 51 ++++--
.../calcite/planner/SqlParameterizerShuttle.java | 7 +-
.../druid/sql/calcite/CalciteInsertDmlTest.java | 45 ++++++
.../druid/sql/calcite/IngestTableFunctionTest.java | 24 +--
21 files changed, 570 insertions(+), 266 deletions(-)
diff --git
a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogCoordinatorModule.java
b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogCoordinatorModule.java
index 649f3515dc..d80fb32462 100644
---
a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogCoordinatorModule.java
+++
b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogCoordinatorModule.java
@@ -36,8 +36,8 @@ import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.metadata.input.InputSourceModule;
-import java.util.Collections;
import java.util.List;
/**
@@ -82,12 +82,17 @@ public class CatalogCoordinatorModule implements DruidModule
// Public REST API and private cache sync API.
Jerseys.addResource(binder, CatalogResource.class);
+
+ // The HTTP input source requires a HttpInputSourceConfig instance
+ // which is defined by the InputSourceModule. Note that MSQ also includes
+ // this module, but MSQ is not included in the Coordinator.
+ binder.install(new InputSourceModule());
}
@Override
public List<? extends Module> getJacksonModules()
{
- return Collections.emptyList();
+ // We want this module to bring input sources along for the ride.
+ return new InputSourceModule().getJacksonModules();
}
}
-
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
index 94d6a479cf..8084b0e9bf 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
@@ -19,13 +19,10 @@
package org.apache.druid.testsEx.msq;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.sql.SqlTaskStatus;
-import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.clients.SqlResourceTestClient;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.MsqTestQueryHelper;
import org.apache.druid.testsEx.categories.MultiStageQuery;
@@ -42,15 +39,6 @@ public class ITMultiStageQuery
@Inject
private MsqTestQueryHelper msqHelper;
- @Inject
- private SqlResourceTestClient msqClient;
-
- @Inject
- private IntegrationTestingConfig config;
-
- @Inject
- private ObjectMapper jsonMapper;
-
@Inject
private DataLoaderHelper dataLoaderHelper;
@@ -102,8 +90,72 @@ public class ITMultiStageQuery
+ "
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
[...]
+ " )\n"
+ ")\n"
- + "PARTITIONED BY DAY\n"
- + "CLUSTERED BY \"__time\"",
+ + "PARTITIONED BY DAY\n",
+ datasource
+ );
+
+ // Submit the task and wait for the datasource to get loaded
+ SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(queryLocal);
+
+ if (sqlTaskStatus.getState().isFailure()) {
+ Assert.fail(StringUtils.format(
+ "Unable to start the task successfully.\nPossible exception: %s",
+ sqlTaskStatus.getError()
+ ));
+ }
+
+ msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
+ dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+
+ msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
+ }
+
+ @Test
+ public void testMsqIngestionAndQueryingWithLocalFn() throws Exception
+ {
+ String datasource = "dst";
+
+ // Clear up the datasource from the previous runs
+ coordinatorClient.unloadSegmentsForDataSource(datasource);
+
+ String queryLocal =
+ StringUtils.format(
+ "INSERT INTO %s\n"
+ + "SELECT\n"
+ + " TIME_PARSE(\"timestamp\") AS __time,\n"
+ + " isRobot,\n"
+ + " diffUrl,\n"
+ + " added,\n"
+ + " countryIsoCode,\n"
+ + " regionName,\n"
+ + " channel,\n"
+ + " flags,\n"
+ + " delta,\n"
+ + " isUnpatrolled,\n"
+ + " isNew,\n"
+ + " deltaBucket,\n"
+ + " isMinor,\n"
+ + " isAnonymous,\n"
+ + " deleted,\n"
+ + " cityName,\n"
+ + " metroCode,\n"
+ + " namespace,\n"
+ + " comment,\n"
+ + " page,\n"
+ + " commentLength,\n"
+ + " countryName,\n"
+ + " user,\n"
+ + " regionIsoCode\n"
+ + "FROM TABLE(\n"
+ + " LOCALFILES(\n"
+ + " files =>
ARRAY['/resources/data/batch_index/json/wikipedia_index_data1.json'],\n"
+ + " format => 'json'\n"
+ + " ))\n"
+ + " (\"timestamp\" VARCHAR, isRobot VARCHAR, diffUrl VARCHAR,
added BIGINT, countryIsoCode VARCHAR, regionName VARCHAR,\n"
+ + " channel VARCHAR, flags VARCHAR, delta BIGINT, isUnpatrolled
VARCHAR, isNew VARCHAR, deltaBucket DOUBLE,\n"
+ + " isMinor VARCHAR, isAnonymous VARCHAR, deleted BIGINT,
cityName VARCHAR, metroCode BIGINT, namespace VARCHAR,\n"
+ + " comment VARCHAR, page VARCHAR, commentLength BIGINT,
countryName VARCHAR, \"user\" VARCHAR, regionIsoCode VARCHAR)\n"
+ + "PARTITIONED BY DAY\n",
datasource
);
diff --git
a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
index 30d3a05ada..9ec432b71c 100644
---
a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
+++
b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
@@ -26,6 +26,7 @@ import org.apache.druid.catalog.model.TableDefnRegistry;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.CollectionUtils;
import java.util.HashMap;
@@ -40,6 +41,8 @@ import java.util.Map;
*/
public abstract class BaseInputSourceDefn implements InputSourceDefn
{
+ private static final Logger LOG = new Logger(BaseInputSourceDefn.class);
+
/**
* The "from-scratch" table function for this input source. The parameters
* are those defined by the subclass, and the apply simply turns around and
@@ -238,6 +241,7 @@ public abstract class BaseInputSourceDefn implements
InputSourceDefn
return jsonMapper.convertValue(jsonMap, inputSourceClass());
}
catch (Exception e) {
+ LOG.debug(e, "Invalid input source specification");
throw new IAE(e, "Invalid input source specification");
}
}
diff --git
a/server/src/main/java/org/apache/druid/catalog/model/table/BaseTableFunction.java
b/server/src/main/java/org/apache/druid/catalog/model/table/BaseTableFunction.java
index ff88b7726f..0f4c7315c7 100644
---
a/server/src/main/java/org/apache/druid/catalog/model/table/BaseTableFunction.java
+++
b/server/src/main/java/org/apache/druid/catalog/model/table/BaseTableFunction.java
@@ -61,6 +61,15 @@ public abstract class BaseTableFunction implements
TableFunction
{
return optional;
}
+
+ @Override
+ public String toString()
+ {
+ return "Parameter{name=" + name
+ + ", type=" + type
+ + ", optional=" + optional
+ + "}";
+ }
}
private final List<ParameterDefn> parameters;
@@ -76,11 +85,11 @@ public abstract class BaseTableFunction implements
TableFunction
return parameters;
}
- protected void requireSchema(String fnName, List<ColumnSpec> columns)
+ protected static void requireSchema(String fnName, List<ColumnSpec> columns)
{
if (columns == null) {
throw new IAE(
- "The %s table function requires an EXTEND clause with a schema",
+ "Function requires a schema: TABLE(%s(...)) (<col> <type>...)",
fnName
);
}
diff --git
a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
index c2c8fe0a23..7cd16ee7ca 100644
---
a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
+++
b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
@@ -95,7 +95,10 @@ public abstract class FormattedInputSourceDefn extends
BaseInputSourceDefn
)
{
final List<ParameterDefn> toAdd = new ArrayList<>();
- final ParameterDefn formatProp = new Parameter(FORMAT_PARAMETER,
ParameterType.VARCHAR, false);
+ // While the format parameter is required, we mark it as optional. Else
+ // if the source defines optional parameters, they will still be ignored
+ // as Calcite treats (optional, optional, required) as (required,
required, required)
+ final ParameterDefn formatProp = new Parameter(FORMAT_PARAMETER,
ParameterType.VARCHAR, true);
toAdd.add(formatProp);
final Map<String, ParameterDefn> formatProps = new HashMap<>();
for (InputFormatDefn format : formats.values()) {
@@ -122,7 +125,7 @@ public abstract class FormattedInputSourceDefn extends
BaseInputSourceDefn
{
final String formatTag = CatalogUtils.getString(table.inputFormatMap,
InputFormat.TYPE_PROPERTY);
if (formatTag == null) {
- throw new IAE("%s property must be set", InputFormat.TYPE_PROPERTY);
+ throw new IAE("[%s] property must be provided",
InputFormat.TYPE_PROPERTY);
}
final InputFormatDefn formatDefn = formats.get(formatTag);
if (formatDefn == null) {
@@ -140,12 +143,12 @@ public abstract class FormattedInputSourceDefn extends
BaseInputSourceDefn
{
final String formatTag = CatalogUtils.getString(args, FORMAT_PARAMETER);
if (formatTag == null) {
- throw new IAE("%s parameter must be set", FORMAT_PARAMETER);
+ throw new IAE("Must provide a value for the [%s] parameter",
FORMAT_PARAMETER);
}
final InputFormatDefn formatDefn = formats.get(formatTag);
if (formatDefn == null) {
throw new IAE(
- "Format type [%s] for property %s is not valid",
+ "Format type [%s] for property [%s] is not valid",
formatTag,
FORMAT_PARAMETER
);
diff --git
a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
index 08178a08cc..b4a3cf3425 100644
---
a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
+++
b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
@@ -104,9 +104,9 @@ public class HttpInputSourceDefn extends
FormattedInputSourceDefn
);
// Field names in the HttpInputSource
- private static final String URIS_FIELD = "uris";
- private static final String PASSWORD_FIELD = "httpAuthenticationPassword";
- private static final String USERNAME_FIELD = "httpAuthenticationUsername";
+ protected static final String URIS_FIELD = "uris";
+ protected static final String PASSWORD_FIELD = "httpAuthenticationPassword";
+ protected static final String USERNAME_FIELD = "httpAuthenticationUsername";
@Override
public String typeValue()
@@ -172,6 +172,18 @@ public class HttpInputSourceDefn extends
FormattedInputSourceDefn
super.validate(table);
}
+ @Override
+ protected void auditInputSource(Map<String, Object> jsonMap)
+ {
+ // A partial table may not include the URI parameter. For example, we might
+ // define an HTTP input source "with URIs to be named later." Even though
the
+ // input source is partial, we still want to validate the other parameters.
+ // The HttpInputSource will fail if the URIs is not set. So, we have to
make
+ // up a fake one just so we can validate the other fields by asking the
+ // input source to deserialize itself from the jsonMap.
+ jsonMap.putIfAbsent(URIS_PARAMETER, "http://bogus.com");
+ }
+
private Matcher templateMatcher(String uriTemplate)
{
Pattern p = Pattern.compile("\\{}");
diff --git
a/server/src/main/java/org/apache/druid/catalog/model/table/LocalInputSourceDefn.java
b/server/src/main/java/org/apache/druid/catalog/model/table/LocalInputSourceDefn.java
index e011fd9e44..71ed700d52 100644
---
a/server/src/main/java/org/apache/druid/catalog/model/table/LocalInputSourceDefn.java
+++
b/server/src/main/java/org/apache/druid/catalog/model/table/LocalInputSourceDefn.java
@@ -181,10 +181,11 @@ public class LocalInputSourceDefn extends
FormattedInputSourceDefn
public TableFunction partialTableFn(ResolvedExternalTable table)
{
final Map<String, Object> sourceMap = new HashMap<>(table.inputSourceMap);
+ final boolean hasBaseDir =
!Strings.isNullOrEmpty(CatalogUtils.getString(sourceMap, BASE_DIR_FIELD));
final boolean hasFiles =
!CollectionUtils.isNullOrEmpty(CatalogUtils.safeGet(sourceMap, FILES_FIELD,
List.class));
final boolean hasFilter =
!Strings.isNullOrEmpty(CatalogUtils.getString(sourceMap, FILTER_FIELD));
List<ParameterDefn> params = new ArrayList<>();
- if (!hasFiles && !hasFilter) {
+ if (hasBaseDir && !hasFiles && !hasFilter) {
params.add(FILES_PARAM_DEFN);
params.add(FILTER_PARAM_DEFN);
}
@@ -219,8 +220,8 @@ public class LocalInputSourceDefn extends
FormattedInputSourceDefn
}
final boolean hasFilter =
!Strings.isNullOrEmpty(CatalogUtils.getString(sourceMap, FILTER_FIELD));
final List<String> filesParam = CatalogUtils.getStringArray(args,
FILES_PARAMETER);
- final String filterParam = CatalogUtils.getString(args,
FILTER_PARAMETER);
final boolean hasFilesParam = !CollectionUtils.isNullOrEmpty(filesParam);
+ final String filterParam = CatalogUtils.getString(args,
FILTER_PARAMETER);
final boolean hasFilterParam = !Strings.isNullOrEmpty(filterParam);
if (!hasFilter && !hasFilesParam && !hasFilterParam) {
throw new IAE(
diff --git
a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
index 8485705b65..0be1f951bf 100644
---
a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
+++
b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
@@ -199,7 +199,7 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
args.put(HttpInputSourceDefn.PASSWORD_PARAMETER, "secret");
args.put(FormattedInputSourceDefn.FORMAT_PARAMETER,
CsvFormatDefn.TYPE_KEY);
ExternalTableSpec externSpec = fn.apply("x", args, COLUMNS, mapper);
- validateHappyPath(externSpec);
+ validateHappyPath(externSpec, true);
// But, it fails if there are no columns.
assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(),
mapper));
@@ -231,7 +231,7 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
// Convert to an external spec
ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
ExternalTableSpec externSpec = externDefn.convert(resolved);
- validateHappyPath(externSpec);
+ validateHappyPath(externSpec, true);
// Get the partial table function
TableFunction fn = externDefn.tableFn(resolved);
@@ -239,23 +239,17 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
// Convert to an external table.
externSpec = fn.apply("x", Collections.emptyMap(),
Collections.emptyList(), mapper);
- validateHappyPath(externSpec);
+ validateHappyPath(externSpec, true);
// But, it fails columns are provided since the table already has them.
assertThrows(IAE.class, () -> fn.apply("x", Collections.emptyMap(),
COLUMNS, mapper));
}
@Test
- public void testTemplateSpecWithFormatHappyPath() throws URISyntaxException
+ public void testTemplateSpecWithFormatHappyPath()
{
- HttpInputSource inputSource = new HttpInputSource(
- Collections.singletonList(new URI("http://foo.com/my.csv")), // removed
- "bob",
- new DefaultPasswordProvider("secret"),
- new HttpInputSourceConfig(null)
- );
TableMetadata table = TableBuilder.external("foo")
- .inputSource(httpToMap(inputSource))
+ .inputSource(ImmutableMap.of("type", HttpInputSource.TYPE_KEY))
.inputFormat(CSV_FORMAT)
.property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY,
"http://foo.com/{}")
.column("x", Columns.VARCHAR)
@@ -264,8 +258,54 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
// Check validation
table.validate();
+ ResolvedTable resolved = registry.resolve(table.spec());
+ assertNotNull(resolved);
- // Check registry
+ // Not a full table, can't directly convert
+ // Convert to an external spec
+ ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
+ assertThrows(IAE.class, () -> externDefn.convert(resolved));
+
+ // Get the partial table function
+ TableFunction fn = externDefn.tableFn(resolved);
+ assertEquals(4, fn.parameters().size());
+ assertTrue(hasParam(fn, HttpInputSourceDefn.URIS_PARAMETER));
+ assertTrue(hasParam(fn, HttpInputSourceDefn.USER_PARAMETER));
+ assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_PARAMETER));
+ assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_ENV_VAR_PARAMETER));
+
+ // Convert to an external table.
+ ExternalTableSpec externSpec = fn.apply(
+ "x",
+ ImmutableMap.of(
+ HttpInputSourceDefn.URIS_PARAMETER,
+ Collections.singletonList("my.csv")
+ ),
+ Collections.emptyList(),
+ mapper
+ );
+ validateHappyPath(externSpec, false);
+ }
+
+ @Test
+ public void testTemplateSpecWithFormatAndPassword()
+ {
+ TableMetadata table = TableBuilder.external("foo")
+ .inputSource(ImmutableMap.of(
+ "type", HttpInputSource.TYPE_KEY,
+ HttpInputSourceDefn.USERNAME_FIELD, "bob",
+ HttpInputSourceDefn.PASSWORD_FIELD, ImmutableMap.of(
+ "type", "default",
+ "password", "secret"
+ )
+ ))
+ .inputFormat(CSV_FORMAT)
+ .property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY,
"http://foo.com/{}")
+ .column("x", Columns.VARCHAR)
+ .column("y", Columns.BIGINT)
+ .build();
+
+ table.validate();
ResolvedTable resolved = registry.resolve(table.spec());
assertNotNull(resolved);
@@ -289,7 +329,7 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
Collections.emptyList(),
mapper
);
- validateHappyPath(externSpec);
+ validateHappyPath(externSpec, true);
}
@Test
@@ -326,7 +366,7 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
args.put(HttpInputSourceDefn.URIS_PARAMETER,
Collections.singletonList("my.csv"));
args.put(FormattedInputSourceDefn.FORMAT_PARAMETER,
CsvFormatDefn.TYPE_KEY);
ExternalTableSpec externSpec = fn.apply("x", args, COLUMNS, mapper);
- validateHappyPath(externSpec);
+ validateHappyPath(externSpec, true);
}
@Test
@@ -455,11 +495,13 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
assertEquals("SECRET", ((EnvironmentVariablePasswordProvider)
sourceSpec.getHttpAuthenticationPasswordProvider()).getVariable());
}
- private void validateHappyPath(ExternalTableSpec externSpec)
+ private void validateHappyPath(ExternalTableSpec externSpec, boolean
withUser)
{
HttpInputSource sourceSpec = (HttpInputSource) externSpec.inputSource;
- assertEquals("bob", sourceSpec.getHttpAuthenticationUsername());
- assertEquals("secret", ((DefaultPasswordProvider)
sourceSpec.getHttpAuthenticationPasswordProvider()).getPassword());
+ if (withUser) {
+ assertEquals("bob", sourceSpec.getHttpAuthenticationUsername());
+ assertEquals("secret", ((DefaultPasswordProvider)
sourceSpec.getHttpAuthenticationPasswordProvider()).getPassword());
+ }
assertEquals("http://foo.com/my.csv",
sourceSpec.getUris().get(0).toString());
// Just a sanity check: details of CSV conversion are tested elsewhere.
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/CatalogExternalTableOperatorConversion.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/CatalogExternalTableOperatorConversion.java
deleted file mode 100644
index d3d0f02047..0000000000
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/CatalogExternalTableOperatorConversion.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.sql.calcite.external;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.FunctionParameter;
-import org.apache.calcite.schema.TranslatableTable;
-import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
-import org.apache.druid.catalog.model.TableDefnRegistry;
-import org.apache.druid.catalog.model.table.ExternalTableSpec;
-import org.apache.druid.catalog.model.table.InputSourceDefn;
-import org.apache.druid.catalog.model.table.TableFunction;
-import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
-import org.apache.druid.sql.calcite.expression.DruidExpression;
-import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
-import
org.apache.druid.sql.calcite.external.UserDefinedTableMacroFunction.ExtendedTableMacro;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Base class for input-source-specific table functions with arguments derived
from
- * a catalog external table definition. Such functions work in conjunction
with the
- * EXTERN key word to provide a schema. Example of the HTTP form:
- * <code><pre>
- * INSERT INTO myTable SELECT ...
- * FROM TABLE(http(
- * userName => 'bob',
- * password => 'secret',
- * uris => ARRAY['http:foo.com/bar.csv'],
- * format => 'csv'))
- * EXTEND (x VARCHAR, y VARCHAR, z BIGINT)
- * PARTITIONED BY ...
- * </pre></code>
- */
-public abstract class CatalogExternalTableOperatorConversion implements
SqlOperatorConversion
-{
- private final SqlUserDefinedTableMacro operator;
-
- public CatalogExternalTableOperatorConversion(
- final String name,
- final TableDefnRegistry registry,
- final String tableType,
- final ObjectMapper jsonMapper
- )
- {
- this(
- name,
- ((InputSourceDefn)
registry.inputSourceDefnFor(tableType)).adHocTableFn(),
- jsonMapper
- );
- }
-
- public CatalogExternalTableOperatorConversion(
- final String name,
- final TableFunction fn,
- final ObjectMapper jsonMapper
- )
- {
- this.operator = new CatalogExternalTableOperator(
- new CatalogTableMacro(name, fn, jsonMapper)
- );
- }
-
- @Override
- public SqlOperator calciteOperator()
- {
- return operator;
- }
-
- @Nullable
- @Override
- public DruidExpression toDruidExpression(PlannerContext plannerContext,
RowSignature rowSignature, RexNode rexNode)
- {
- return null;
- }
-
- public static class CatalogExternalTableOperator extends
UserDefinedTableMacroFunction implements AuthorizableOperator
- {
- public CatalogExternalTableOperator(final CatalogTableMacro macro)
- {
- super(
- new SqlIdentifier(macro.name, SqlParserPos.ZERO),
- ReturnTypes.CURSOR,
- null,
- // Use our own definition of variadic since Calcite's doesn't allow
- // optional parameters.
- Externals.variadic(macro.parameters),
- Externals.dataTypes(macro.parameters),
- macro
- );
- }
-
- @Override
- public Set<ResourceAction> computeResources(final SqlCall call)
- {
- return Collections.singleton(Externals.EXTERNAL_RESOURCE_ACTION);
- }
- }
-
- public static class CatalogTableMacro implements ExtendedTableMacro
- {
- private final String name;
- private final List<FunctionParameter> parameters;
- private final TableFunction fn;
- private final ObjectMapper jsonMapper;
-
- public CatalogTableMacro(
- final String name,
- final TableFunction fn,
- final ObjectMapper jsonMapper
- )
- {
- this.name = name;
- this.jsonMapper = jsonMapper;
- this.fn = fn;
- this.parameters = Externals.convertParameters(fn);
- }
-
- /**
- * Called when the function is used without an {@code EXTEND} clause.
- * {@code EXTERN} allows this, most others do not.
- */
- @Override
- public TranslatableTable apply(final List<Object> arguments)
- {
- return apply(arguments, null);
- }
-
- @Override
- public TranslatableTable apply(List<Object> arguments, SqlNodeList schema)
- {
- final ExternalTableSpec externSpec = fn.apply(
- name,
- Externals.convertArguments(fn, arguments),
- schema == null ? null : Externals.convertColumns(schema),
- jsonMapper
- );
- return Externals.buildExternalTable(externSpec, jsonMapper);
- }
-
- @Override
- public List<FunctionParameter> getParameters()
- {
- return parameters;
- }
- }
-}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidTableMacro.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidTableMacro.java
new file mode 100644
index 0000000000..5b4069f1c0
--- /dev/null
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidTableMacro.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.external;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.FunctionParameter;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.druid.catalog.model.ResolvedTable;
+import org.apache.druid.catalog.model.table.ExternalTableDefn;
+import org.apache.druid.catalog.model.table.ExternalTableSpec;
+import org.apache.druid.catalog.model.table.TableFunction;
+import
org.apache.druid.sql.calcite.external.SchemaAwareUserDefinedTableMacro.ExtendedTableMacro;
+
+import java.util.List;
+
+/**
+ * Table macro which wraps a catalog table function and which accepts
+ * a schema from an EXTENDS clause. This macro is wrapped by the
+ * {@link DruidUserDefinedTableMacro} operator that itself extends
+ * {@link SchemaAwareUserDefinedTableMacro} which interfaces with the
+ * extend operator to pass the schema via a "back channel." The plumbing
+ * is complex because we're adding functionality a bit outside the SQL
+ * standard, and we have to fit our logic into the Calcite stack.
+ */
+public class DruidTableMacro implements ExtendedTableMacro
+{
+ protected final String name;
+ final List<FunctionParameter> parameters;
+ private final TableFunction fn;
+ private final ObjectMapper jsonMapper;
+
+ public DruidTableMacro(
+ final String name,
+ final TableFunction fn,
+ final ObjectMapper jsonMapper
+ )
+ {
+ this.name = name;
+ this.jsonMapper = jsonMapper;
+ this.fn = fn;
+ this.parameters = Externals.convertParameters(fn);
+ }
+
+ public DruidTableMacro(
+ final String tableName,
+ final ResolvedTable externalTable
+ )
+ {
+ this.name = tableName;
+ ExternalTableDefn tableDefn = (ExternalTableDefn) externalTable.defn();
+ this.fn = tableDefn.tableFn(externalTable);
+ this.parameters = Externals.convertParameters(fn);
+ this.jsonMapper = externalTable.jsonMapper();
+ }
+
+ /**
+ * Called when the function is used without an {@code EXTEND} clause.
+ * {@code EXTERN} allows this, most others do not.
+ */
+ @Override
+ public TranslatableTable apply(final List<Object> arguments)
+ {
+ return apply(arguments, null);
+ }
+
+ @Override
+ public TranslatableTable apply(List<Object> arguments, SqlNodeList schema)
+ {
+ final ExternalTableSpec externSpec = fn.apply(
+ name,
+ Externals.convertArguments(fn, arguments),
+ schema == null ? null : Externals.convertColumns(schema),
+ jsonMapper
+ );
+ return Externals.buildExternalTable(externSpec, jsonMapper);
+ }
+
+ @Override
+ public List<FunctionParameter> getParameters()
+ {
+ return parameters;
+ }
+}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacro.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacro.java
new file mode 100644
index 0000000000..09f495fa19
--- /dev/null
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacro.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.external;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Base class for input-source-specific table functions with arguments derived
from
+ * a catalog external table definition. Such functions work in conjunction
with the
+ * EXTEND key word to provide a schema. Example of the HTTP form:
+ * <code><pre>
+ * INSERT INTO myTable SELECT ...
+ * FROM TABLE(http(
+ * userName => 'bob',
+ * password => 'secret',
+ * uris => ARRAY['http:foo.com/bar.csv'],
+ * format => 'csv'))
+ * EXTEND (x VARCHAR, y VARCHAR, z BIGINT)
+ * PARTITIONED BY ...
+ * </pre></code>
+ * <p>
+ * This version
+ */
+public class DruidUserDefinedTableMacro extends
SchemaAwareUserDefinedTableMacro implements AuthorizableOperator
+{
+ public DruidUserDefinedTableMacro(final DruidTableMacro macro)
+ {
+ super(
+ new SqlIdentifier(macro.name, SqlParserPos.ZERO),
+ ReturnTypes.CURSOR,
+ null,
+ // Use our own definition of variadic since Calcite's doesn't allow
+ // optional parameters.
+ Externals.variadic(macro.parameters),
+ Externals.dataTypes(macro.parameters),
+ macro
+ );
+ }
+
+ @Override
+ public Set<ResourceAction> computeResources(final SqlCall call)
+ {
+ return Collections.singleton(Externals.EXTERNAL_RESOURCE_ACTION);
+ }
+}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacroConversion.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacroConversion.java
new file mode 100644
index 0000000000..e532b871a1
--- /dev/null
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacroConversion.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.external;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.catalog.model.table.InputSourceDefn;
+import org.apache.druid.catalog.model.table.TableFunction;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+
+import javax.annotation.Nullable;
+
+/**
+ * Operator conversion for user-defined table macros (functions) based on the
+ * {@link TableFunction} abstraction defined by the catalog.
+ */
+public abstract class DruidUserDefinedTableMacroConversion implements
SqlOperatorConversion
+{
+ private final SqlUserDefinedTableMacro operator;
+
+ public DruidUserDefinedTableMacroConversion(
+ final String name,
+ final TableDefnRegistry registry,
+ final String tableType,
+ final ObjectMapper jsonMapper
+ )
+ {
+ this(
+ name,
+ ((InputSourceDefn)
registry.inputSourceDefnFor(tableType)).adHocTableFn(),
+ jsonMapper
+ );
+ }
+
+ public DruidUserDefinedTableMacroConversion(
+ final String name,
+ final TableFunction fn,
+ final ObjectMapper jsonMapper
+ )
+ {
+ this.operator = new DruidUserDefinedTableMacro(
+ new DruidTableMacro(name, fn, jsonMapper)
+ );
+ }
+
+ @Override
+ public SqlOperator calciteOperator()
+ {
+ return operator;
+ }
+
+ @Nullable
+ @Override
+ public DruidExpression toDruidExpression(PlannerContext plannerContext,
RowSignature rowSignature, RexNode rexNode)
+ {
+ return null;
+ }
+}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExtendOperator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExtendOperator.java
index 14f4d7f1b9..c2162491e5 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExtendOperator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExtendOperator.java
@@ -28,6 +28,7 @@ import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.fun.SqlCollectionTableOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
/**
@@ -35,7 +36,7 @@ import org.apache.druid.java.util.common.ISE;
* is said to have been added for Apache Phoenix, and which we repurpose to
* supply a schema for an ingest input table.
*
- * @see {@link UserDefinedTableMacroFunction} for details
+ * @see {@link SchemaAwareUserDefinedTableMacro} for details
*/
public class ExtendOperator extends SqlInternalOperator
{
@@ -52,23 +53,47 @@ public class ExtendOperator extends SqlInternalOperator
* squirreled away in an ad-hoc instance of the macro. We must do it
* this way because we can't change Calcite to define a new node type
* that holds onto the schema.
+ * <p>
+ * The node structure is:<pre>{@code
+ * EXTEND(
+ * TABLE(
+ * <table fn>(
+ * <table macro>
+ * )
+ * ),
+ * <schema>
+ * )}</pre>
+ * <p>
+ * Note that the table macro is not an operand: it is an implicit
+ * member of the table macro function operator.
*/
@Override
public SqlNode rewriteCall(SqlValidator validator, SqlCall call)
{
SqlBasicCall tableOpCall = (SqlBasicCall) call.operand(0);
if (!(tableOpCall.getOperator() instanceof SqlCollectionTableOperator)) {
- throw new ISE("First argument to EXTEND must be a table function");
+ throw new ISE("First argument to EXTEND must be TABLE");
}
+
+ // The table function must be a Druid-defined table macro function
+ // which is aware of the EXTEND schema.
SqlBasicCall tableFnCall = (SqlBasicCall) tableOpCall.operand(0);
- if (!(tableFnCall.getOperator() instanceof UserDefinedTableMacroFunction))
{
+ if (!(tableFnCall.getOperator() instanceof
SchemaAwareUserDefinedTableMacro)) {
// May be an unresolved function.
- return call;
+ throw new IAE(
+ "Function %s does not accept an EXTEND clause (or a schema list)",
+ tableFnCall.getOperator().getName()
+ );
}
- UserDefinedTableMacroFunction macro = (UserDefinedTableMacroFunction)
tableFnCall.getOperator();
+ // Move the schema from the second operand of EXTEND into a member
+ // function of a shim table macro.
+ SchemaAwareUserDefinedTableMacro tableFn =
(SchemaAwareUserDefinedTableMacro) tableFnCall.getOperator();
SqlNodeList schema = (SqlNodeList) call.operand(1);
- SqlCall newCall = macro.rewriteCall(tableFnCall, schema);
+ SqlCall newCall = tableFn.rewriteCall(tableFnCall, schema);
+
+ // Create a new TABLE(table_fn) node to replace the EXTEND node. After
this,
+ // the table macro function acts just like a standard Calcite version.
return
SqlStdOperatorTable.COLLECTION_TABLE.createCall(call.getParserPosition(),
newCall);
}
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
index 6ef3aa1f5e..1d5a5c4fef 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
@@ -54,7 +54,7 @@ import java.util.Map;
* a Druid JSON signature, or an SQL {@code EXTEND} list of columns.
* As with all table functions, the {@code EXTEND} is optional.
*/
-public class ExternalOperatorConversion extends
CatalogExternalTableOperatorConversion
+public class ExternalOperatorConversion extends
DruidUserDefinedTableMacroConversion
{
public static final String FUNCTION_NAME = "EXTERN";
@@ -72,12 +72,12 @@ public class ExternalOperatorConversion extends
CatalogExternalTableOperatorConv
public ExternFunction()
{
super(Arrays.asList(
- new Parameter(INPUT_SOURCE_PARAM, ParameterType.VARCHAR, true),
- new Parameter(INPUT_FORMAT_PARAM, ParameterType.VARCHAR, true),
+ new Parameter(INPUT_SOURCE_PARAM, ParameterType.VARCHAR, false),
+ new Parameter(INPUT_FORMAT_PARAM, ParameterType.VARCHAR, false),
- // Not required: the user can either provide the signature OR
+ // Optional: the user can either provide the signature OR
// an EXTEND clause. Checked in the implementation.
- new Parameter(SIGNATURE_PARAM, ParameterType.VARCHAR, false)
+ new Parameter(SIGNATURE_PARAM, ParameterType.VARCHAR, true)
));
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/HttpOperatorConversion.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/HttpOperatorConversion.java
index 35a24fd390..6b49bde969 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/HttpOperatorConversion.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/HttpOperatorConversion.java
@@ -23,7 +23,7 @@ import com.google.inject.Inject;
import org.apache.druid.catalog.model.TableDefnRegistry;
import org.apache.druid.catalog.model.table.HttpInputSourceDefn;
-public class HttpOperatorConversion extends
CatalogExternalTableOperatorConversion
+public class HttpOperatorConversion extends
DruidUserDefinedTableMacroConversion
{
public static final String FUNCTION_NAME = "http";
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/InlineOperatorConversion.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/InlineOperatorConversion.java
index ee58db2fd0..c684428a9c 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/InlineOperatorConversion.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/InlineOperatorConversion.java
@@ -23,14 +23,12 @@ import com.google.inject.Inject;
import org.apache.druid.catalog.model.TableDefnRegistry;
import org.apache.druid.catalog.model.table.InlineInputSourceDefn;
-public class InlineOperatorConversion extends
CatalogExternalTableOperatorConversion
+public class InlineOperatorConversion extends
DruidUserDefinedTableMacroConversion
{
public static final String FUNCTION_NAME = "inline";
@Inject
- public InlineOperatorConversion(
- final TableDefnRegistry registry
- )
+ public InlineOperatorConversion(final TableDefnRegistry registry)
{
super(FUNCTION_NAME, registry, InlineInputSourceDefn.TYPE_KEY,
registry.jsonMapper());
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/LocalOperatorConversion.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/LocalOperatorConversion.java
index 676216e9ae..50c0dca927 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/LocalOperatorConversion.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/LocalOperatorConversion.java
@@ -23,16 +23,14 @@ import com.google.inject.Inject;
import org.apache.druid.catalog.model.TableDefnRegistry;
import org.apache.druid.catalog.model.table.LocalInputSourceDefn;
-public class LocalOperatorConversion extends
CatalogExternalTableOperatorConversion
+public class LocalOperatorConversion extends
DruidUserDefinedTableMacroConversion
{
// Cannot use "local" because it is a SQL keyword and the user would
// be required to quote the name.
public static final String FUNCTION_NAME = "localfiles";
@Inject
- public LocalOperatorConversion(
- final TableDefnRegistry registry
- )
+ public LocalOperatorConversion(final TableDefnRegistry registry)
{
super(FUNCTION_NAME, registry, LocalInputSourceDefn.TYPE_KEY,
registry.jsonMapper());
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/UserDefinedTableMacroFunction.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
similarity index 81%
rename from
sql/src/main/java/org/apache/druid/sql/calcite/external/UserDefinedTableMacroFunction.java
rename to
sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
index 82855336ba..855b9ceebf 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/UserDefinedTableMacroFunction.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
@@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.external;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.FunctionParameter;
import org.apache.calcite.schema.TableMacro;
import org.apache.calcite.schema.TranslatableTable;
@@ -36,6 +37,7 @@ import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlOperandTypeChecker;
import org.apache.calcite.sql.type.SqlOperandTypeInference;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.druid.java.util.common.UOE;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
@@ -63,14 +65,12 @@ import java.util.Set;
* SELECT ..
* FROM myTable EXTEND (x VARCHAR, ...)
* </pre></code>
- * Though, oddly, a search of Apache Phoenix itself does not find a hit for
- * EXTEND, so perhaps the feature was never completed?
* <p>
* For Druid, we want the above form: extend a table function, not a
* literal table. Since we can't change the Calcite parser, we instead use
* tricks within the constraints of the parser.
* <ul>
- * <li>First, use use a Python script to modify the parser to add the
+ * <li>The Calcite parser is revised to add the
* EXTEND rule for a table function.</li>
* <li>Calcite expects the EXTEND operator to have two arguments: an identifier
* and the column list. Since our case has a function call as the first
argument,
@@ -99,9 +99,10 @@ import java.util.Set;
* </pre></code>
* Since we seldom use unparse, we can perhaps live with this limitation for
now.
*/
-public abstract class UserDefinedTableMacroFunction extends
BaseUserDefinedTableMacro implements AuthorizableOperator
+public abstract class SchemaAwareUserDefinedTableMacro
+ extends BaseUserDefinedTableMacro implements AuthorizableOperator
{
- public UserDefinedTableMacroFunction(
+ public SchemaAwareUserDefinedTableMacro(
SqlIdentifier opName,
SqlReturnTypeInference returnTypeInference,
SqlOperandTypeInference operandTypeInference,
@@ -119,15 +120,20 @@ public abstract class UserDefinedTableMacroFunction
extends BaseUserDefinedTable
*/
public SqlBasicCall rewriteCall(SqlBasicCall oldCall, SqlNodeList schema)
{
- return new ExtendedCall(oldCall, new ShimTableMacroFunction(this, schema));
+ return new ExtendedCall(oldCall, new ShimUserDefinedTableMacro(this,
schema));
}
- private static class ShimTableMacroFunction extends
BaseUserDefinedTableMacro implements AuthorizableOperator
+ // Note the confusing use of "table macro". A TablMacro is a non-SqlNode
that does the
+ // actual translation to a table. A *UserDefinedTableMacro is a function
that wraps
+ // a table macro. The result is that "macro" by itself is ambiguous: it can
be the
+ // implementation (TableMacro) or the function that wraps the implementation.
+ private static class ShimUserDefinedTableMacro extends
BaseUserDefinedTableMacro implements AuthorizableOperator
{
- protected final UserDefinedTableMacroFunction base;
+ protected final SchemaAwareUserDefinedTableMacro base;
protected final SqlNodeList schema;
+ private TranslatableTable table;
- public ShimTableMacroFunction(final UserDefinedTableMacroFunction base,
final SqlNodeList schema)
+ public ShimUserDefinedTableMacro(final SchemaAwareUserDefinedTableMacro
base, final SqlNodeList schema)
{
super(
base.getNameAsId(),
@@ -141,6 +147,21 @@ public abstract class UserDefinedTableMacroFunction
extends BaseUserDefinedTable
this.schema = schema;
}
+ @Override
+ public TranslatableTable getTable(
+ RelDataTypeFactory typeFactory,
+ List<SqlNode> operandList
+ )
+ {
+ if (table == null) {
+ // Cache the table to avoid multiple conversions
+ // Possible because each call has a distinct instance
+ // of this operator.
+ table = super.getTable(typeFactory, operandList);
+ }
+ return table;
+ }
+
@Override
public Set<ResourceAction> computeResources(final SqlCall call)
{
@@ -155,7 +176,7 @@ public abstract class UserDefinedTableMacroFunction extends
BaseUserDefinedTable
{
private final SqlNodeList schema;
- public ExtendedCall(SqlBasicCall oldCall, ShimTableMacroFunction macro)
+ public ExtendedCall(SqlBasicCall oldCall, ShimUserDefinedTableMacro macro)
{
super(
macro,
@@ -208,6 +229,16 @@ public abstract class UserDefinedTableMacroFunction
extends BaseUserDefinedTable
schema.unparse(writer, leftPrec, rightPrec);
writer.endList(frame);
}
+
+ /**
+ * Required by GHA CodeQL even though Calcite doesn't use this
+ * particular method.
+ */
+ @Override
+ public Object clone()
+ {
+ throw new UOE("Not supported");
+ }
}
public interface ExtendedTableMacro extends TableMacro
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlParameterizerShuttle.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlParameterizerShuttle.java
index 98363d9eb2..a0c9fcd728 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlParameterizerShuttle.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlParameterizerShuttle.java
@@ -114,7 +114,12 @@ public class SqlParameterizerShuttle extends SqlShuttle
*/
private SqlNode createArrayLiteral(Object value)
{
- List<?> list = Arrays.asList((Object[]) value);
+ List<?> list;
+ if (value instanceof List) {
+ list = (List<?>) value;
+ } else {
+ list = Arrays.asList((Object[]) value);
+ }
List<SqlNode> args = new ArrayList<>(list.size());
for (Object element : list) {
if (element == null) {
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index 2a92a182d3..93e519091f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -44,6 +44,7 @@ import
org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.Externals;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.IngestHandler;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
@@ -306,6 +307,50 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
.verify();
}
+ @Test
+ public void testInsertFromExternalWithSchema()
+ {
+ String extern;
+ ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
+ try {
+ extern = StringUtils.format(
+ "TABLE(extern(%s, %s))",
+ Calcites.escapeStringLiteral(
+ queryJsonMapper.writeValueAsString(
+ new InlineInputSource("a,b,1\nc,d,2\n")
+ )
+ ),
+ Calcites.escapeStringLiteral(
+ queryJsonMapper.writeValueAsString(
+ new CsvInputFormat(ImmutableList.of("x", "y", "z"), null,
false, false, 0)
+ )
+ )
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ testIngestionQuery()
+ .sql("INSERT INTO dst SELECT * FROM %s\n" +
+ " (x VARCHAR, y VARCHAR, z BIGINT)\n" +
+ "PARTITIONED BY ALL TIME",
+ extern
+ )
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("dst", externalDataSource.getSignature())
+ .expectResources(dataSourceWrite("dst"),
Externals.EXTERNAL_RESOURCE_ACTION)
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("x", "y", "z")
+ .context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .expectLogicalPlanFrom("insertFromExternal")
+ .verify();
+ }
+
@Test
public void testInsertWithPartitionedBy()
{
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
index ed2349af8e..f23a135359 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
@@ -101,7 +101,7 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
.dataSource(httpDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
-
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("httpExtern")
@@ -143,7 +143,7 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
.dataSource(httpDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
-
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("httpExtern")
@@ -173,7 +173,7 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
.dataSource(httpDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
-
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("httpExtern")
@@ -200,7 +200,7 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
.dataSource(httpDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
-
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("httpExtern")
@@ -223,7 +223,7 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
-
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("insertFromExternal")
@@ -291,7 +291,7 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
-
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("insertFromExternal")
@@ -319,7 +319,7 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
-
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("insertFromExternal")
@@ -356,7 +356,7 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
.dataSource(localDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
-
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("localExtern")
@@ -384,7 +384,7 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
.dataSource(localDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
-
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("localExtern")
@@ -412,7 +412,7 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
.dataSource(localDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
-
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("localExtern")
@@ -442,7 +442,7 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
.dataSource(localDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
-
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("localExtern")
@@ -472,7 +472,7 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
.dataSource(localDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
-
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("localExtern")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]