This is an automated email from the ASF dual-hosted git repository.

progers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 842ee554de Refinements to input-source specific table functions 
(#13780)
842ee554de is described below

commit 842ee554de0556f190ed4bdc042d3ee6f54a210d
Author: Paul Rogers <[email protected]>
AuthorDate: Mon Feb 13 16:21:27 2023 -0800

    Refinements to input-source specific table functions (#13780)
    
    Refinements to table functions
    
    Fixes various bugs
    Improves the structure of the table function classes
    Adds unit and integration tests
---
 .../catalog/guice/CatalogCoordinatorModule.java    |  11 +-
 .../druid/testsEx/msq/ITMultiStageQuery.java       |  80 +++++++--
 .../catalog/model/table/BaseInputSourceDefn.java   |   4 +
 .../catalog/model/table/BaseTableFunction.java     |  13 +-
 .../model/table/FormattedInputSourceDefn.java      |  11 +-
 .../catalog/model/table/HttpInputSourceDefn.java   |  18 ++-
 .../catalog/model/table/LocalInputSourceDefn.java  |   5 +-
 .../model/table/HttpInputSourceDefnTest.java       |  76 +++++++--
 .../CatalogExternalTableOperatorConversion.java    | 178 ---------------------
 .../sql/calcite/external/DruidTableMacro.java      | 101 ++++++++++++
 .../external/DruidUserDefinedTableMacro.java       |  70 ++++++++
 .../DruidUserDefinedTableMacroConversion.java      |  81 ++++++++++
 .../druid/sql/calcite/external/ExtendOperator.java |  37 ++++-
 .../external/ExternalOperatorConversion.java       |  10 +-
 .../calcite/external/HttpOperatorConversion.java   |   2 +-
 .../calcite/external/InlineOperatorConversion.java |   6 +-
 .../calcite/external/LocalOperatorConversion.java  |   6 +-
 ....java => SchemaAwareUserDefinedTableMacro.java} |  51 ++++--
 .../calcite/planner/SqlParameterizerShuttle.java   |   7 +-
 .../druid/sql/calcite/CalciteInsertDmlTest.java    |  45 ++++++
 .../druid/sql/calcite/IngestTableFunctionTest.java |  24 +--
 21 files changed, 570 insertions(+), 266 deletions(-)

diff --git 
a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogCoordinatorModule.java
 
b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogCoordinatorModule.java
index 649f3515dc..d80fb32462 100644
--- 
a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogCoordinatorModule.java
+++ 
b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogCoordinatorModule.java
@@ -36,8 +36,8 @@ import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.annotations.LoadScope;
 import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.metadata.input.InputSourceModule;
 
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -82,12 +82,17 @@ public class CatalogCoordinatorModule implements DruidModule
 
     // Public REST API and private cache sync API.
     Jerseys.addResource(binder, CatalogResource.class);
+
+    // The HTTP input source requires a HttpInputSourceConfig instance
+    // which is defined by the InputSourceModule. Note that MSQ also includes
+    // this module, but MSQ is not included in the Coordinator.
+    binder.install(new InputSourceModule());
   }
 
   @Override
   public List<? extends Module> getJacksonModules()
   {
-    return Collections.emptyList();
+    // We want this module to bring input sources along for the ride.
+    return new InputSourceModule().getJacksonModules();
   }
 }
-
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
index 94d6a479cf..8084b0e9bf 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
@@ -19,13 +19,10 @@
 
 package org.apache.druid.testsEx.msq;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.msq.sql.SqlTaskStatus;
-import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.clients.SqlResourceTestClient;
 import org.apache.druid.testing.utils.DataLoaderHelper;
 import org.apache.druid.testing.utils.MsqTestQueryHelper;
 import org.apache.druid.testsEx.categories.MultiStageQuery;
@@ -42,15 +39,6 @@ public class ITMultiStageQuery
   @Inject
   private MsqTestQueryHelper msqHelper;
 
-  @Inject
-  private SqlResourceTestClient msqClient;
-
-  @Inject
-  private IntegrationTestingConfig config;
-
-  @Inject
-  private ObjectMapper jsonMapper;
-
   @Inject
   private DataLoaderHelper dataLoaderHelper;
 
@@ -102,8 +90,72 @@ public class ITMultiStageQuery
             + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
 [...]
             + "  )\n"
             + ")\n"
-            + "PARTITIONED BY DAY\n"
-            + "CLUSTERED BY \"__time\"",
+            + "PARTITIONED BY DAY\n",
+            datasource
+        );
+
+    // Submit the task and wait for the datasource to get loaded
+    SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(queryLocal);
+
+    if (sqlTaskStatus.getState().isFailure()) {
+      Assert.fail(StringUtils.format(
+          "Unable to start the task successfully.\nPossible exception: %s",
+          sqlTaskStatus.getError()
+      ));
+    }
+
+    msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
+    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+
+    msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
+  }
+
+  @Test
+  public void testMsqIngestionAndQueryingWithLocalFn() throws Exception
+  {
+    String datasource = "dst";
+
+    // Clear up the datasource from the previous runs
+    coordinatorClient.unloadSegmentsForDataSource(datasource);
+
+    String queryLocal =
+        StringUtils.format(
+            "INSERT INTO %s\n"
+            + "SELECT\n"
+            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
+            + "  isRobot,\n"
+            + "  diffUrl,\n"
+            + "  added,\n"
+            + "  countryIsoCode,\n"
+            + "  regionName,\n"
+            + "  channel,\n"
+            + "  flags,\n"
+            + "  delta,\n"
+            + "  isUnpatrolled,\n"
+            + "  isNew,\n"
+            + "  deltaBucket,\n"
+            + "  isMinor,\n"
+            + "  isAnonymous,\n"
+            + "  deleted,\n"
+            + "  cityName,\n"
+            + "  metroCode,\n"
+            + "  namespace,\n"
+            + "  comment,\n"
+            + "  page,\n"
+            + "  commentLength,\n"
+            + "  countryName,\n"
+            + "  user,\n"
+            + "  regionIsoCode\n"
+            + "FROM TABLE(\n"
+            + "  LOCALFILES(\n"
+            + "    files => 
ARRAY['/resources/data/batch_index/json/wikipedia_index_data1.json'],\n"
+            + "    format => 'json'\n"
+            + "  ))\n"
+            + "  (\"timestamp\" VARCHAR, isRobot VARCHAR, diffUrl VARCHAR, 
added BIGINT, countryIsoCode VARCHAR, regionName VARCHAR,\n"
+            + "   channel VARCHAR, flags VARCHAR, delta BIGINT, isUnpatrolled 
VARCHAR, isNew VARCHAR, deltaBucket DOUBLE,\n"
+            + "   isMinor VARCHAR, isAnonymous VARCHAR, deleted BIGINT, 
cityName VARCHAR, metroCode BIGINT, namespace VARCHAR,\n"
+            + "   comment VARCHAR, page VARCHAR, commentLength BIGINT, 
countryName VARCHAR, \"user\" VARCHAR, regionIsoCode VARCHAR)\n"
+            + "PARTITIONED BY DAY\n",
             datasource
         );
 
diff --git 
a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
 
b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
index 30d3a05ada..9ec432b71c 100644
--- 
a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
+++ 
b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
@@ -26,6 +26,7 @@ import org.apache.druid.catalog.model.TableDefnRegistry;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.utils.CollectionUtils;
 
 import java.util.HashMap;
@@ -40,6 +41,8 @@ import java.util.Map;
  */
 public abstract class BaseInputSourceDefn implements InputSourceDefn
 {
+  private static final Logger LOG = new Logger(BaseInputSourceDefn.class);
+
   /**
    * The "from-scratch" table function for this input source. The parameters
    * are those defined by the subclass, and the apply simply turns around and
@@ -238,6 +241,7 @@ public abstract class BaseInputSourceDefn implements 
InputSourceDefn
       return jsonMapper.convertValue(jsonMap, inputSourceClass());
     }
     catch (Exception e) {
+      LOG.debug(e, "Invalid input source specification");
       throw new IAE(e, "Invalid input source specification");
     }
   }
diff --git 
a/server/src/main/java/org/apache/druid/catalog/model/table/BaseTableFunction.java
 
b/server/src/main/java/org/apache/druid/catalog/model/table/BaseTableFunction.java
index ff88b7726f..0f4c7315c7 100644
--- 
a/server/src/main/java/org/apache/druid/catalog/model/table/BaseTableFunction.java
+++ 
b/server/src/main/java/org/apache/druid/catalog/model/table/BaseTableFunction.java
@@ -61,6 +61,15 @@ public abstract class BaseTableFunction implements 
TableFunction
     {
       return optional;
     }
+
+    @Override
+    public String toString()
+    {
+      return "Parameter{name=" + name
+          + ", type=" + type
+          + ", optional=" + optional
+          + "}";
+    }
   }
 
   private final List<ParameterDefn> parameters;
@@ -76,11 +85,11 @@ public abstract class BaseTableFunction implements 
TableFunction
     return parameters;
   }
 
-  protected void requireSchema(String fnName, List<ColumnSpec> columns)
+  protected static void requireSchema(String fnName, List<ColumnSpec> columns)
   {
     if (columns == null) {
       throw new IAE(
-          "The %s table function requires an EXTEND clause with a schema",
+          "Function requires a schema: TABLE(%s(...)) (<col> <type>...)",
           fnName
       );
     }
diff --git 
a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
 
b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
index c2c8fe0a23..7cd16ee7ca 100644
--- 
a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
+++ 
b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
@@ -95,7 +95,10 @@ public abstract class FormattedInputSourceDefn extends 
BaseInputSourceDefn
   )
   {
     final List<ParameterDefn> toAdd = new ArrayList<>();
-    final ParameterDefn formatProp = new Parameter(FORMAT_PARAMETER, 
ParameterType.VARCHAR, false);
+    // While the format parameter is required, we mark it as optional. Else
+    // if the source defines optional parameters, they will still be ignored
+    // as Calcite treats (optional, optional, required) as (required, 
required, required)
+    final ParameterDefn formatProp = new Parameter(FORMAT_PARAMETER, 
ParameterType.VARCHAR, true);
     toAdd.add(formatProp);
     final Map<String, ParameterDefn> formatProps = new HashMap<>();
     for (InputFormatDefn format : formats.values()) {
@@ -122,7 +125,7 @@ public abstract class FormattedInputSourceDefn extends 
BaseInputSourceDefn
   {
     final String formatTag = CatalogUtils.getString(table.inputFormatMap, 
InputFormat.TYPE_PROPERTY);
     if (formatTag == null) {
-      throw new IAE("%s property must be set", InputFormat.TYPE_PROPERTY);
+      throw new IAE("[%s] property must be provided", 
InputFormat.TYPE_PROPERTY);
     }
     final InputFormatDefn formatDefn = formats.get(formatTag);
     if (formatDefn == null) {
@@ -140,12 +143,12 @@ public abstract class FormattedInputSourceDefn extends 
BaseInputSourceDefn
   {
     final String formatTag = CatalogUtils.getString(args, FORMAT_PARAMETER);
     if (formatTag == null) {
-      throw new IAE("%s parameter must be set", FORMAT_PARAMETER);
+      throw new IAE("Must provide a value for the [%s] parameter", 
FORMAT_PARAMETER);
     }
     final InputFormatDefn formatDefn = formats.get(formatTag);
     if (formatDefn == null) {
       throw new IAE(
-          "Format type [%s] for property %s is not valid",
+          "Format type [%s] for property [%s] is not valid",
           formatTag,
           FORMAT_PARAMETER
       );
diff --git 
a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
 
b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
index 08178a08cc..b4a3cf3425 100644
--- 
a/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
+++ 
b/server/src/main/java/org/apache/druid/catalog/model/table/HttpInputSourceDefn.java
@@ -104,9 +104,9 @@ public class HttpInputSourceDefn extends 
FormattedInputSourceDefn
   );
 
   // Field names in the HttpInputSource
-  private static final String URIS_FIELD = "uris";
-  private static final String PASSWORD_FIELD = "httpAuthenticationPassword";
-  private static final String USERNAME_FIELD = "httpAuthenticationUsername";
+  protected static final String URIS_FIELD = "uris";
+  protected static final String PASSWORD_FIELD = "httpAuthenticationPassword";
+  protected static final String USERNAME_FIELD = "httpAuthenticationUsername";
 
   @Override
   public String typeValue()
@@ -172,6 +172,18 @@ public class HttpInputSourceDefn extends 
FormattedInputSourceDefn
     super.validate(table);
   }
 
+  @Override
+  protected void auditInputSource(Map<String, Object> jsonMap)
+  {
+    // A partial table may not include the URI parameter. For example, we might
+    // define an HTTP input source "with URIs to be named later." Even though 
the
+    // input source is partial, we still want to validate the other parameters.
+    // The HttpInputSource will fail if the URIs is not set. So, we have to 
make
+    // up a fake one just so we can validate the other fields by asking the
+    // input source to deserialize itself from the jsonMap.
+    jsonMap.putIfAbsent(URIS_PARAMETER, "http://bogus.com";);
+  }
+
   private Matcher templateMatcher(String uriTemplate)
   {
     Pattern p = Pattern.compile("\\{}");
diff --git 
a/server/src/main/java/org/apache/druid/catalog/model/table/LocalInputSourceDefn.java
 
b/server/src/main/java/org/apache/druid/catalog/model/table/LocalInputSourceDefn.java
index e011fd9e44..71ed700d52 100644
--- 
a/server/src/main/java/org/apache/druid/catalog/model/table/LocalInputSourceDefn.java
+++ 
b/server/src/main/java/org/apache/druid/catalog/model/table/LocalInputSourceDefn.java
@@ -181,10 +181,11 @@ public class LocalInputSourceDefn extends 
FormattedInputSourceDefn
   public TableFunction partialTableFn(ResolvedExternalTable table)
   {
     final Map<String, Object> sourceMap = new HashMap<>(table.inputSourceMap);
+    final boolean hasBaseDir = 
!Strings.isNullOrEmpty(CatalogUtils.getString(sourceMap, BASE_DIR_FIELD));
     final boolean hasFiles = 
!CollectionUtils.isNullOrEmpty(CatalogUtils.safeGet(sourceMap, FILES_FIELD, 
List.class));
     final boolean hasFilter = 
!Strings.isNullOrEmpty(CatalogUtils.getString(sourceMap, FILTER_FIELD));
     List<ParameterDefn> params = new ArrayList<>();
-    if (!hasFiles && !hasFilter) {
+    if (hasBaseDir && !hasFiles && !hasFilter) {
       params.add(FILES_PARAM_DEFN);
       params.add(FILTER_PARAM_DEFN);
     }
@@ -219,8 +220,8 @@ public class LocalInputSourceDefn extends 
FormattedInputSourceDefn
       }
       final boolean hasFilter = 
!Strings.isNullOrEmpty(CatalogUtils.getString(sourceMap, FILTER_FIELD));
       final List<String> filesParam = CatalogUtils.getStringArray(args, 
FILES_PARAMETER);
-      final String filterParam = CatalogUtils.getString(args, 
FILTER_PARAMETER);
       final boolean hasFilesParam = !CollectionUtils.isNullOrEmpty(filesParam);
+      final String filterParam = CatalogUtils.getString(args, 
FILTER_PARAMETER);
       final boolean hasFilterParam = !Strings.isNullOrEmpty(filterParam);
       if (!hasFilter && !hasFilesParam && !hasFilterParam) {
         throw new IAE(
diff --git 
a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
 
b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
index 8485705b65..0be1f951bf 100644
--- 
a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
+++ 
b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
@@ -199,7 +199,7 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
     args.put(HttpInputSourceDefn.PASSWORD_PARAMETER, "secret");
     args.put(FormattedInputSourceDefn.FORMAT_PARAMETER, 
CsvFormatDefn.TYPE_KEY);
     ExternalTableSpec externSpec = fn.apply("x", args, COLUMNS, mapper);
-    validateHappyPath(externSpec);
+    validateHappyPath(externSpec, true);
 
     // But, it fails if there are no columns.
     assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), 
mapper));
@@ -231,7 +231,7 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
     // Convert to an external spec
     ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
     ExternalTableSpec externSpec = externDefn.convert(resolved);
-    validateHappyPath(externSpec);
+    validateHappyPath(externSpec, true);
 
     // Get the partial table function
     TableFunction fn = externDefn.tableFn(resolved);
@@ -239,23 +239,17 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
 
     // Convert to an external table.
     externSpec = fn.apply("x", Collections.emptyMap(), 
Collections.emptyList(), mapper);
-    validateHappyPath(externSpec);
+    validateHappyPath(externSpec, true);
 
     // But, it fails columns are provided since the table already has them.
     assertThrows(IAE.class, () -> fn.apply("x", Collections.emptyMap(), 
COLUMNS, mapper));
   }
 
   @Test
-  public void testTemplateSpecWithFormatHappyPath() throws URISyntaxException
+  public void testTemplateSpecWithFormatHappyPath()
   {
-    HttpInputSource inputSource = new HttpInputSource(
-        Collections.singletonList(new URI("http://foo.com/my.csv";)), // removed
-        "bob",
-        new DefaultPasswordProvider("secret"),
-        new HttpInputSourceConfig(null)
-    );
     TableMetadata table = TableBuilder.external("foo")
-        .inputSource(httpToMap(inputSource))
+        .inputSource(ImmutableMap.of("type", HttpInputSource.TYPE_KEY))
         .inputFormat(CSV_FORMAT)
         .property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, 
"http://foo.com/{}";)
         .column("x", Columns.VARCHAR)
@@ -264,8 +258,54 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
 
     // Check validation
     table.validate();
+    ResolvedTable resolved = registry.resolve(table.spec());
+    assertNotNull(resolved);
 
-    // Check registry
+    // Not a full table, can't directly convert
+    // Convert to an external spec
+    ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
+    assertThrows(IAE.class, () -> externDefn.convert(resolved));
+
+    // Get the partial table function
+    TableFunction fn = externDefn.tableFn(resolved);
+    assertEquals(4, fn.parameters().size());
+    assertTrue(hasParam(fn, HttpInputSourceDefn.URIS_PARAMETER));
+    assertTrue(hasParam(fn, HttpInputSourceDefn.USER_PARAMETER));
+    assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_PARAMETER));
+    assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_ENV_VAR_PARAMETER));
+
+    // Convert to an external table.
+    ExternalTableSpec externSpec = fn.apply(
+        "x",
+        ImmutableMap.of(
+            HttpInputSourceDefn.URIS_PARAMETER,
+            Collections.singletonList("my.csv")
+        ),
+        Collections.emptyList(),
+        mapper
+    );
+    validateHappyPath(externSpec, false);
+  }
+
+  @Test
+  public void testTemplateSpecWithFormatAndPassword()
+  {
+    TableMetadata table = TableBuilder.external("foo")
+        .inputSource(ImmutableMap.of(
+            "type", HttpInputSource.TYPE_KEY,
+            HttpInputSourceDefn.USERNAME_FIELD, "bob",
+            HttpInputSourceDefn.PASSWORD_FIELD, ImmutableMap.of(
+                "type", "default",
+                "password", "secret"
+            )
+         ))
+        .inputFormat(CSV_FORMAT)
+        .property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, 
"http://foo.com/{}";)
+        .column("x", Columns.VARCHAR)
+        .column("y", Columns.BIGINT)
+        .build();
+
+    table.validate();
     ResolvedTable resolved = registry.resolve(table.spec());
     assertNotNull(resolved);
 
@@ -289,7 +329,7 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
         Collections.emptyList(),
         mapper
     );
-    validateHappyPath(externSpec);
+    validateHappyPath(externSpec, true);
   }
 
   @Test
@@ -326,7 +366,7 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
     args.put(HttpInputSourceDefn.URIS_PARAMETER, 
Collections.singletonList("my.csv"));
     args.put(FormattedInputSourceDefn.FORMAT_PARAMETER, 
CsvFormatDefn.TYPE_KEY);
     ExternalTableSpec externSpec = fn.apply("x", args, COLUMNS, mapper);
-    validateHappyPath(externSpec);
+    validateHappyPath(externSpec, true);
   }
 
   @Test
@@ -455,11 +495,13 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
     assertEquals("SECRET", ((EnvironmentVariablePasswordProvider) 
sourceSpec.getHttpAuthenticationPasswordProvider()).getVariable());
   }
 
-  private void validateHappyPath(ExternalTableSpec externSpec)
+  private void validateHappyPath(ExternalTableSpec externSpec, boolean 
withUser)
   {
     HttpInputSource sourceSpec = (HttpInputSource) externSpec.inputSource;
-    assertEquals("bob", sourceSpec.getHttpAuthenticationUsername());
-    assertEquals("secret", ((DefaultPasswordProvider) 
sourceSpec.getHttpAuthenticationPasswordProvider()).getPassword());
+    if (withUser) {
+      assertEquals("bob", sourceSpec.getHttpAuthenticationUsername());
+      assertEquals("secret", ((DefaultPasswordProvider) 
sourceSpec.getHttpAuthenticationPasswordProvider()).getPassword());
+    }
     assertEquals("http://foo.com/my.csv";, 
sourceSpec.getUris().get(0).toString());
 
     // Just a sanity check: details of CSV conversion are tested elsewhere.
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/CatalogExternalTableOperatorConversion.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/CatalogExternalTableOperatorConversion.java
deleted file mode 100644
index d3d0f02047..0000000000
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/CatalogExternalTableOperatorConversion.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.sql.calcite.external;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.FunctionParameter;
-import org.apache.calcite.schema.TranslatableTable;
-import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
-import org.apache.druid.catalog.model.TableDefnRegistry;
-import org.apache.druid.catalog.model.table.ExternalTableSpec;
-import org.apache.druid.catalog.model.table.InputSourceDefn;
-import org.apache.druid.catalog.model.table.TableFunction;
-import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
-import org.apache.druid.sql.calcite.expression.DruidExpression;
-import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
-import 
org.apache.druid.sql.calcite.external.UserDefinedTableMacroFunction.ExtendedTableMacro;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Base class for input-source-specific table functions with arguments derived 
from
- * a catalog external table definition. Such functions work in conjunction 
with the
- * EXTERN key word to provide a schema. Example of the HTTP form:
- * <code><pre>
- * INSERT INTO myTable SELECT ...
- * FROM TABLE(http(
- *     userName => 'bob',
- *     password => 'secret',
- *     uris => ARRAY['http:foo.com/bar.csv'],
- *     format => 'csv'))
- *   EXTEND (x VARCHAR, y VARCHAR, z BIGINT)
- * PARTITIONED BY ...
- * </pre></code>
- */
-public abstract class CatalogExternalTableOperatorConversion implements 
SqlOperatorConversion
-{
-  private final SqlUserDefinedTableMacro operator;
-
-  public CatalogExternalTableOperatorConversion(
-      final String name,
-      final TableDefnRegistry registry,
-      final String tableType,
-      final ObjectMapper jsonMapper
-  )
-  {
-    this(
-        name,
-        ((InputSourceDefn) 
registry.inputSourceDefnFor(tableType)).adHocTableFn(),
-        jsonMapper
-    );
-  }
-
-  public CatalogExternalTableOperatorConversion(
-      final String name,
-      final TableFunction fn,
-      final ObjectMapper jsonMapper
-  )
-  {
-    this.operator = new CatalogExternalTableOperator(
-        new CatalogTableMacro(name, fn, jsonMapper)
-    );
-  }
-
-  @Override
-  public SqlOperator calciteOperator()
-  {
-    return operator;
-  }
-
-  @Nullable
-  @Override
-  public DruidExpression toDruidExpression(PlannerContext plannerContext, 
RowSignature rowSignature, RexNode rexNode)
-  {
-    return null;
-  }
-
-  public static class CatalogExternalTableOperator extends 
UserDefinedTableMacroFunction implements AuthorizableOperator
-  {
-    public CatalogExternalTableOperator(final CatalogTableMacro macro)
-    {
-      super(
-          new SqlIdentifier(macro.name, SqlParserPos.ZERO),
-          ReturnTypes.CURSOR,
-          null,
-          // Use our own definition of variadic since Calcite's doesn't allow
-          // optional parameters.
-          Externals.variadic(macro.parameters),
-          Externals.dataTypes(macro.parameters),
-          macro
-      );
-    }
-
-    @Override
-    public Set<ResourceAction> computeResources(final SqlCall call)
-    {
-      return Collections.singleton(Externals.EXTERNAL_RESOURCE_ACTION);
-    }
-  }
-
-  public static class CatalogTableMacro implements ExtendedTableMacro
-  {
-    private final String name;
-    private final List<FunctionParameter> parameters;
-    private final TableFunction fn;
-    private final ObjectMapper jsonMapper;
-
-    public CatalogTableMacro(
-        final String name,
-        final TableFunction fn,
-        final ObjectMapper jsonMapper
-    )
-    {
-      this.name = name;
-      this.jsonMapper = jsonMapper;
-      this.fn = fn;
-      this.parameters = Externals.convertParameters(fn);
-    }
-
-    /**
-     * Called when the function is used without an {@code EXTEND} clause.
-     * {@code EXTERN} allows this, most others do not.
-     */
-    @Override
-    public TranslatableTable apply(final List<Object> arguments)
-    {
-      return apply(arguments, null);
-    }
-
-    @Override
-    public TranslatableTable apply(List<Object> arguments, SqlNodeList schema)
-    {
-      final ExternalTableSpec externSpec = fn.apply(
-          name,
-          Externals.convertArguments(fn, arguments),
-          schema == null ? null : Externals.convertColumns(schema),
-          jsonMapper
-      );
-      return Externals.buildExternalTable(externSpec, jsonMapper);
-    }
-
-    @Override
-    public List<FunctionParameter> getParameters()
-    {
-      return parameters;
-    }
-  }
-}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidTableMacro.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidTableMacro.java
new file mode 100644
index 0000000000..5b4069f1c0
--- /dev/null
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidTableMacro.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.external;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.FunctionParameter;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.druid.catalog.model.ResolvedTable;
+import org.apache.druid.catalog.model.table.ExternalTableDefn;
+import org.apache.druid.catalog.model.table.ExternalTableSpec;
+import org.apache.druid.catalog.model.table.TableFunction;
+import 
org.apache.druid.sql.calcite.external.SchemaAwareUserDefinedTableMacro.ExtendedTableMacro;
+
+import java.util.List;
+
+/**
+ * Table macro which wraps a catalog table function and which accepts
+ * a schema from an EXTENDS clause. This macro is wrapped by the
+ * {@link DruidUserDefinedTableMacro} operator that itself extends
+ * {@link SchemaAwareUserDefinedTableMacro} which interfaces with the
+ * extend operator to pass the schema via a "back channel." The plumbing
+ * is complex because we're adding functionality a bit outside the SQL
+ * standard, and we have to fit our logic into the Calcite stack.
+ */
+public class DruidTableMacro implements ExtendedTableMacro
+{
+  protected final String name;
+  final List<FunctionParameter> parameters;
+  private final TableFunction fn;
+  private final ObjectMapper jsonMapper;
+
+  public DruidTableMacro(
+      final String name,
+      final TableFunction fn,
+      final ObjectMapper jsonMapper
+  )
+  {
+    this.name = name;
+    this.jsonMapper = jsonMapper;
+    this.fn = fn;
+    this.parameters = Externals.convertParameters(fn);
+  }
+
+  public DruidTableMacro(
+      final String tableName,
+      final ResolvedTable externalTable
+  )
+  {
+    this.name = tableName;
+    ExternalTableDefn tableDefn = (ExternalTableDefn) externalTable.defn();
+    this.fn = tableDefn.tableFn(externalTable);
+    this.parameters = Externals.convertParameters(fn);
+    this.jsonMapper = externalTable.jsonMapper();
+  }
+
+  /**
+   * Called when the function is used without an {@code EXTEND} clause.
+   * {@code EXTERN} allows this, most others do not.
+   */
+  @Override
+  public TranslatableTable apply(final List<Object> arguments)
+  {
+    return apply(arguments, null);
+  }
+
+  @Override
+  public TranslatableTable apply(List<Object> arguments, SqlNodeList schema)
+  {
+    final ExternalTableSpec externSpec = fn.apply(
+        name,
+        Externals.convertArguments(fn, arguments),
+        schema == null ? null : Externals.convertColumns(schema),
+        jsonMapper
+    );
+    return Externals.buildExternalTable(externSpec, jsonMapper);
+  }
+
+  @Override
+  public List<FunctionParameter> getParameters()
+  {
+    return parameters;
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacro.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacro.java
new file mode 100644
index 0000000000..09f495fa19
--- /dev/null
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacro.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.external;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Base class for input-source-specific table functions with arguments derived 
from
+ * a catalog external table definition. Such functions work in conjunction 
with the
+ * EXTEND key word to provide a schema. Example of the HTTP form:
+ * <code><pre>
+ * INSERT INTO myTable SELECT ...
+ * FROM TABLE(http(
+ *     userName => 'bob',
+ *     password => 'secret',
+ *     uris => ARRAY['http:foo.com/bar.csv'],
+ *     format => 'csv'))
+ *   EXTEND (x VARCHAR, y VARCHAR, z BIGINT)
+ * PARTITIONED BY ...
+ * </pre></code>
+ * <p>
+ * This version
+ */
+public class DruidUserDefinedTableMacro extends 
SchemaAwareUserDefinedTableMacro implements AuthorizableOperator
+{
+  public DruidUserDefinedTableMacro(final DruidTableMacro macro)
+  {
+    super(
+        new SqlIdentifier(macro.name, SqlParserPos.ZERO),
+        ReturnTypes.CURSOR,
+        null,
+        // Use our own definition of variadic since Calcite's doesn't allow
+        // optional parameters.
+        Externals.variadic(macro.parameters),
+        Externals.dataTypes(macro.parameters),
+        macro
+    );
+  }
+
+  @Override
+  public Set<ResourceAction> computeResources(final SqlCall call)
+  {
+    return Collections.singleton(Externals.EXTERNAL_RESOURCE_ACTION);
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacroConversion.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacroConversion.java
new file mode 100644
index 0000000000..e532b871a1
--- /dev/null
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacroConversion.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.external;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.catalog.model.table.InputSourceDefn;
+import org.apache.druid.catalog.model.table.TableFunction;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+
+import javax.annotation.Nullable;
+
+/**
+ * Operator conversion for user-defined table macros (functions) based on the
+ * {@link TableFunction} abstraction defined by the catalog.
+ */
+public abstract class DruidUserDefinedTableMacroConversion implements 
SqlOperatorConversion
+{
+  private final SqlUserDefinedTableMacro operator;
+
+  public DruidUserDefinedTableMacroConversion(
+      final String name,
+      final TableDefnRegistry registry,
+      final String tableType,
+      final ObjectMapper jsonMapper
+  )
+  {
+    this(
+        name,
+        ((InputSourceDefn) 
registry.inputSourceDefnFor(tableType)).adHocTableFn(),
+        jsonMapper
+    );
+  }
+
+  public DruidUserDefinedTableMacroConversion(
+      final String name,
+      final TableFunction fn,
+      final ObjectMapper jsonMapper
+  )
+  {
+    this.operator = new DruidUserDefinedTableMacro(
+        new DruidTableMacro(name, fn, jsonMapper)
+    );
+  }
+
+  @Override
+  public SqlOperator calciteOperator()
+  {
+    return operator;
+  }
+
+  @Nullable
+  @Override
+  public DruidExpression toDruidExpression(PlannerContext plannerContext, 
RowSignature rowSignature, RexNode rexNode)
+  {
+    return null;
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExtendOperator.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExtendOperator.java
index 14f4d7f1b9..c2162491e5 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExtendOperator.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExtendOperator.java
@@ -28,6 +28,7 @@ import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.fun.SqlCollectionTableOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 
 /**
@@ -35,7 +36,7 @@ import org.apache.druid.java.util.common.ISE;
  * is said to have been added for Apache Phoenix, and which we repurpose to
  * supply a schema for an ingest input table.
  *
- * @see {@link UserDefinedTableMacroFunction} for details
+ * @see {@link SchemaAwareUserDefinedTableMacro} for details
  */
 public class ExtendOperator extends SqlInternalOperator
 {
@@ -52,23 +53,47 @@ public class ExtendOperator extends SqlInternalOperator
    * squirreled away in an ad-hoc instance of the macro. We must do it
    * this way because we can't change Calcite to define a new node type
    * that holds onto the schema.
+   * <p>
+   * The node structure is:<pre>{@code
+   * EXTEND(
+   *   TABLE(
+   *     <table fn>(
+   *       <table macro>
+   *     )
+   *   ),
+   *   <schema>
+   * )}</pre>
+   * <p>
+   * Note that the table macro is not an operand: it is an implicit
+   * member of the table macro function operator.
    */
   @Override
   public SqlNode rewriteCall(SqlValidator validator, SqlCall call)
   {
     SqlBasicCall tableOpCall = (SqlBasicCall) call.operand(0);
     if (!(tableOpCall.getOperator() instanceof SqlCollectionTableOperator)) {
-      throw new ISE("First argument to EXTEND must be a table function");
+      throw new ISE("First argument to EXTEND must be TABLE");
     }
+
+    // The table function must be a Druid-defined table macro function
+    // which is aware of the EXTEND schema.
     SqlBasicCall tableFnCall = (SqlBasicCall) tableOpCall.operand(0);
-    if (!(tableFnCall.getOperator() instanceof UserDefinedTableMacroFunction)) 
{
+    if (!(tableFnCall.getOperator() instanceof 
SchemaAwareUserDefinedTableMacro)) {
       // May be an unresolved function.
-      return call;
+      throw new IAE(
+          "Function %s does not accept an EXTEND clause (or a schema list)",
+          tableFnCall.getOperator().getName()
+      );
     }
-    UserDefinedTableMacroFunction macro = (UserDefinedTableMacroFunction) 
tableFnCall.getOperator();
 
+    // Move the schema from the second operand of EXTEND into a member
+    // function of a shim table macro.
+    SchemaAwareUserDefinedTableMacro tableFn = 
(SchemaAwareUserDefinedTableMacro) tableFnCall.getOperator();
     SqlNodeList schema = (SqlNodeList) call.operand(1);
-    SqlCall newCall = macro.rewriteCall(tableFnCall, schema);
+    SqlCall newCall = tableFn.rewriteCall(tableFnCall, schema);
+
+    // Create a new TABLE(table_fn) node to replace the EXTEND node. After 
this,
+    // the table macro function acts just like a standard Calcite version.
     return 
SqlStdOperatorTable.COLLECTION_TABLE.createCall(call.getParserPosition(), 
newCall);
   }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
index 6ef3aa1f5e..1d5a5c4fef 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
@@ -54,7 +54,7 @@ import java.util.Map;
  * a Druid JSON signature, or an SQL {@code EXTEND} list of columns.
  * As with all table functions, the {@code EXTEND} is optional.
  */
-public class ExternalOperatorConversion extends 
CatalogExternalTableOperatorConversion
+public class ExternalOperatorConversion extends 
DruidUserDefinedTableMacroConversion
 {
   public static final String FUNCTION_NAME = "EXTERN";
 
@@ -72,12 +72,12 @@ public class ExternalOperatorConversion extends 
CatalogExternalTableOperatorConv
     public ExternFunction()
     {
       super(Arrays.asList(
-          new Parameter(INPUT_SOURCE_PARAM, ParameterType.VARCHAR, true),
-          new Parameter(INPUT_FORMAT_PARAM, ParameterType.VARCHAR, true),
+          new Parameter(INPUT_SOURCE_PARAM, ParameterType.VARCHAR, false),
+          new Parameter(INPUT_FORMAT_PARAM, ParameterType.VARCHAR, false),
 
-          // Not required: the user can either provide the signature OR
+          // Optional: the user can either provide the signature OR
           // an EXTEND clause. Checked in the implementation.
-          new Parameter(SIGNATURE_PARAM, ParameterType.VARCHAR, false)
+          new Parameter(SIGNATURE_PARAM, ParameterType.VARCHAR, true)
       ));
     }
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/HttpOperatorConversion.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/HttpOperatorConversion.java
index 35a24fd390..6b49bde969 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/HttpOperatorConversion.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/HttpOperatorConversion.java
@@ -23,7 +23,7 @@ import com.google.inject.Inject;
 import org.apache.druid.catalog.model.TableDefnRegistry;
 import org.apache.druid.catalog.model.table.HttpInputSourceDefn;
 
-public class HttpOperatorConversion extends 
CatalogExternalTableOperatorConversion
+public class HttpOperatorConversion extends 
DruidUserDefinedTableMacroConversion
 {
   public static final String FUNCTION_NAME = "http";
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/InlineOperatorConversion.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/InlineOperatorConversion.java
index ee58db2fd0..c684428a9c 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/InlineOperatorConversion.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/InlineOperatorConversion.java
@@ -23,14 +23,12 @@ import com.google.inject.Inject;
 import org.apache.druid.catalog.model.TableDefnRegistry;
 import org.apache.druid.catalog.model.table.InlineInputSourceDefn;
 
-public class InlineOperatorConversion extends 
CatalogExternalTableOperatorConversion
+public class InlineOperatorConversion extends 
DruidUserDefinedTableMacroConversion
 {
   public static final String FUNCTION_NAME = "inline";
 
   @Inject
-  public InlineOperatorConversion(
-      final TableDefnRegistry registry
-  )
+  public InlineOperatorConversion(final TableDefnRegistry registry)
   {
     super(FUNCTION_NAME, registry, InlineInputSourceDefn.TYPE_KEY, 
registry.jsonMapper());
   }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/LocalOperatorConversion.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/LocalOperatorConversion.java
index 676216e9ae..50c0dca927 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/LocalOperatorConversion.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/LocalOperatorConversion.java
@@ -23,16 +23,14 @@ import com.google.inject.Inject;
 import org.apache.druid.catalog.model.TableDefnRegistry;
 import org.apache.druid.catalog.model.table.LocalInputSourceDefn;
 
-public class LocalOperatorConversion extends 
CatalogExternalTableOperatorConversion
+public class LocalOperatorConversion extends 
DruidUserDefinedTableMacroConversion
 {
   // Cannot use "local" because it is a SQL keyword and the user would
   // be required to quote the name.
   public static final String FUNCTION_NAME = "localfiles";
 
   @Inject
-  public LocalOperatorConversion(
-      final TableDefnRegistry registry
-  )
+  public LocalOperatorConversion(final TableDefnRegistry registry)
   {
     super(FUNCTION_NAME, registry, LocalInputSourceDefn.TYPE_KEY, 
registry.jsonMapper());
   }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/UserDefinedTableMacroFunction.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
similarity index 81%
rename from 
sql/src/main/java/org/apache/druid/sql/calcite/external/UserDefinedTableMacroFunction.java
rename to 
sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
index 82855336ba..855b9ceebf 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/UserDefinedTableMacroFunction.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
@@ -20,6 +20,7 @@
 package org.apache.druid.sql.calcite.external;
 
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.FunctionParameter;
 import org.apache.calcite.schema.TableMacro;
 import org.apache.calcite.schema.TranslatableTable;
@@ -36,6 +37,7 @@ import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.type.SqlOperandTypeChecker;
 import org.apache.calcite.sql.type.SqlOperandTypeInference;
 import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
 
@@ -63,14 +65,12 @@ import java.util.Set;
  * SELECT ..
  * FROM myTable EXTEND (x VARCHAR, ...)
  * </pre></code>
- * Though, oddly, a search of Apache Phoenix itself does not find a hit for
- * EXTEND, so perhaps the feature was never completed?
  * <p>
  * For Druid, we want the above form: extend a table function, not a
  * literal table. Since we can't change the Calcite parser, we instead use
  * tricks within the constraints of the parser.
  * <ul>
- * <li>First, use use a Python script to modify the parser to add the
+ * <li>The Calcite parser is revised to add the
  * EXTEND rule for a table function.</li>
  * <li>Calcite expects the EXTEND operator to have two arguments: an identifier
  * and the column list. Since our case has a function call as the first 
argument,
@@ -99,9 +99,10 @@ import java.util.Set;
  * </pre></code>
  * Since we seldom use unparse, we can perhaps live with this limitation for 
now.
  */
-public abstract class UserDefinedTableMacroFunction extends 
BaseUserDefinedTableMacro implements AuthorizableOperator
+public abstract class SchemaAwareUserDefinedTableMacro
+    extends BaseUserDefinedTableMacro implements AuthorizableOperator
 {
-  public UserDefinedTableMacroFunction(
+  public SchemaAwareUserDefinedTableMacro(
       SqlIdentifier opName,
       SqlReturnTypeInference returnTypeInference,
       SqlOperandTypeInference operandTypeInference,
@@ -119,15 +120,20 @@ public abstract class UserDefinedTableMacroFunction 
extends BaseUserDefinedTable
    */
   public SqlBasicCall rewriteCall(SqlBasicCall oldCall, SqlNodeList schema)
   {
-    return new ExtendedCall(oldCall, new ShimTableMacroFunction(this, schema));
+    return new ExtendedCall(oldCall, new ShimUserDefinedTableMacro(this, 
schema));
   }
 
-  private static class ShimTableMacroFunction extends 
BaseUserDefinedTableMacro implements AuthorizableOperator
+  // Note the confusing use of "table macro". A TablMacro is a non-SqlNode 
that does the
+  // actual translation to a table. A *UserDefinedTableMacro is a function 
that wraps
+  // a table macro. The result is that "macro" by itself is ambiguous: it can 
be the
+  // implementation (TableMacro) or the function that wraps the implementation.
+  private static class ShimUserDefinedTableMacro extends 
BaseUserDefinedTableMacro implements AuthorizableOperator
   {
-    protected final UserDefinedTableMacroFunction base;
+    protected final SchemaAwareUserDefinedTableMacro base;
     protected final SqlNodeList schema;
+    private TranslatableTable table;
 
-    public ShimTableMacroFunction(final UserDefinedTableMacroFunction base, 
final SqlNodeList schema)
+    public ShimUserDefinedTableMacro(final SchemaAwareUserDefinedTableMacro 
base, final SqlNodeList schema)
     {
       super(
           base.getNameAsId(),
@@ -141,6 +147,21 @@ public abstract class UserDefinedTableMacroFunction 
extends BaseUserDefinedTable
       this.schema = schema;
     }
 
+    @Override
+    public TranslatableTable getTable(
+        RelDataTypeFactory typeFactory,
+        List<SqlNode> operandList
+    )
+    {
+      if (table == null) {
+        // Cache the table to avoid multiple conversions
+        // Possible because each call has a distinct instance
+        // of this operator.
+        table = super.getTable(typeFactory, operandList);
+      }
+      return table;
+    }
+
     @Override
     public Set<ResourceAction> computeResources(final SqlCall call)
     {
@@ -155,7 +176,7 @@ public abstract class UserDefinedTableMacroFunction extends 
BaseUserDefinedTable
   {
     private final SqlNodeList schema;
 
-    public ExtendedCall(SqlBasicCall oldCall, ShimTableMacroFunction macro)
+    public ExtendedCall(SqlBasicCall oldCall, ShimUserDefinedTableMacro macro)
     {
       super(
           macro,
@@ -208,6 +229,16 @@ public abstract class UserDefinedTableMacroFunction 
extends BaseUserDefinedTable
       schema.unparse(writer, leftPrec, rightPrec);
       writer.endList(frame);
     }
+
+    /**
+     * Required by GHA CodeQL even though Calcite doesn't use this
+     * particular method.
+     */
+    @Override
+    public Object clone()
+    {
+      throw new UOE("Not supported");
+    }
   }
 
   public interface ExtendedTableMacro extends TableMacro
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlParameterizerShuttle.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlParameterizerShuttle.java
index 98363d9eb2..a0c9fcd728 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlParameterizerShuttle.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlParameterizerShuttle.java
@@ -114,7 +114,12 @@ public class SqlParameterizerShuttle extends SqlShuttle
    */
   private SqlNode createArrayLiteral(Object value)
   {
-    List<?> list = Arrays.asList((Object[]) value);
+    List<?> list;
+    if (value instanceof List) {
+      list = (List<?>) value;
+    } else {
+      list = Arrays.asList((Object[]) value);
+    }
     List<SqlNode> args = new ArrayList<>(list.size());
     for (Object element : list) {
       if (element == null) {
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index 2a92a182d3..93e519091f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -44,6 +44,7 @@ import 
org.apache.druid.sql.calcite.external.ExternalDataSource;
 import org.apache.druid.sql.calcite.external.Externals;
 import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.IngestHandler;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
@@ -306,6 +307,50 @@ public class CalciteInsertDmlTest extends 
CalciteIngestionDmlTest
         .verify();
   }
 
+  @Test
+  public void testInsertFromExternalWithSchema()
+  {
+    String extern;
+    ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
+    try {
+      extern = StringUtils.format(
+          "TABLE(extern(%s, %s))",
+          Calcites.escapeStringLiteral(
+              queryJsonMapper.writeValueAsString(
+                  new InlineInputSource("a,b,1\nc,d,2\n")
+              )
+          ),
+          Calcites.escapeStringLiteral(
+              queryJsonMapper.writeValueAsString(
+                  new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, 
false, false, 0)
+              )
+          )
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+    testIngestionQuery()
+        .sql("INSERT INTO dst SELECT * FROM %s\n" +
+             "  (x VARCHAR, y VARCHAR, z BIGINT)\n" +
+             "PARTITIONED BY ALL TIME",
+             extern
+         )
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("dst", externalDataSource.getSignature())
+        .expectResources(dataSourceWrite("dst"), 
Externals.EXTERNAL_RESOURCE_ACTION)
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("x", "y", "z")
+                .context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .expectLogicalPlanFrom("insertFromExternal")
+        .verify();
+  }
+
   @Test
   public void testInsertWithPartitionedBy()
   {
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
index ed2349af8e..f23a135359 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
@@ -101,7 +101,7 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
                 .dataSource(httpDataSource)
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .columns("x", "y", "z")
-                
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
                 .build()
          )
         .expectLogicalPlanFrom("httpExtern")
@@ -143,7 +143,7 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
                 .dataSource(httpDataSource)
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .columns("x", "y", "z")
-                
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
                 .build()
          )
         .expectLogicalPlanFrom("httpExtern")
@@ -173,7 +173,7 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
                 .dataSource(httpDataSource)
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .columns("x", "y", "z")
-                
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
                 .build()
          )
         .expectLogicalPlanFrom("httpExtern")
@@ -200,7 +200,7 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
                 .dataSource(httpDataSource)
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .columns("x", "y", "z")
-                
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
                 .build()
          )
         .expectLogicalPlanFrom("httpExtern")
@@ -223,7 +223,7 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
                 .dataSource(externalDataSource)
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .columns("x", "y", "z")
-                
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
                 .build()
          )
         .expectLogicalPlanFrom("insertFromExternal")
@@ -291,7 +291,7 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
                 .dataSource(externalDataSource)
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .columns("x", "y", "z")
-                
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
                 .build()
          )
         .expectLogicalPlanFrom("insertFromExternal")
@@ -319,7 +319,7 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
                 .dataSource(externalDataSource)
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .columns("x", "y", "z")
-                
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
                 .build()
          )
         .expectLogicalPlanFrom("insertFromExternal")
@@ -356,7 +356,7 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
                 .dataSource(localDataSource)
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .columns("x", "y", "z")
-                
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
                 .build()
          )
         .expectLogicalPlanFrom("localExtern")
@@ -384,7 +384,7 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
                 .dataSource(localDataSource)
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .columns("x", "y", "z")
-                
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
                 .build()
          )
         .expectLogicalPlanFrom("localExtern")
@@ -412,7 +412,7 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
                 .dataSource(localDataSource)
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .columns("x", "y", "z")
-                
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
                 .build()
          )
         .expectLogicalPlanFrom("localExtern")
@@ -442,7 +442,7 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
                 .dataSource(localDataSource)
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .columns("x", "y", "z")
-                
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
                 .build()
          )
         .expectLogicalPlanFrom("localExtern")
@@ -472,7 +472,7 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
                 .dataSource(localDataSource)
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .columns("x", "y", "z")
-                
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
                 .build()
          )
         .expectLogicalPlanFrom("localExtern")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to