>From Ian Maxon <[email protected]>:

Ian Maxon has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20492?usp=email )


Change subject: [ASTERIXDB-3636][API] Migrate UDF API to domain sockets
......................................................................

[ASTERIXDB-3636][API] Migrate UDF API to domain sockets

- user model changes: yes
- storage format changes: no
- interface changes: no

Details:
Migrate the UDF API to use a domain socket, instead of
being restricted to localhost on the normal NC servlet

Change-Id: I5f8ac2170fd6b2beef14d99c38b9141af0f12ba3
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
M asterixdb/asterix-app/pom.xml
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
M asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf
M asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.ftl
M asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf.ftl
M asterixdb/asterix-doc/src/main/user-defined_function/udf.md
M 
asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
19 files changed, 178 insertions(+), 132 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/92/20492/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index 524af43..12619a9 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.translator;

-import java.net.InetSocketAddress;
 import java.util.UUID;

 import org.apache.asterix.common.api.IClientRequest;
@@ -31,18 +30,15 @@
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
-import org.apache.hyracks.util.NetworkUtil;

 public class Receptionist implements IReceptionist {

     @Override
     public IRequestReference welcome(IServletRequest request) {
         final String uuid = UUID.randomUUID().toString();
-        final InetSocketAddress localAddress = request.getLocalAddress();
-        final RequestReference ref =
-                RequestReference.of(uuid, 
NetworkUtil.toHostPort(localAddress), System.currentTimeMillis());
+        final RequestReference ref = RequestReference.of(uuid, 
request.getHostPort(), System.currentTimeMillis());
         ref.setUserAgent(request.getHeader(HttpHeaders.USER_AGENT));
-        ref.setRemoteAddr(NetworkUtil.toHostPort(request.getRemoteAddress()));
+        ref.setRemoteAddr(request.getRemotePort());
         return ref;
     }

diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 41311d9..31886e2 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -843,6 +843,18 @@
       <artifactId>httpclient</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.httpcomponents.client5</groupId>
+      <artifactId>httpclient5</artifactId>
+      <version>5.5</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.projectreactor.netty</groupId>
+      <artifactId>reactor-netty</artifactId>
+      <version>1.2.10</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.asterix</groupId>
       <artifactId>asterix-lang-common</artifactId>
       <version>${project.version}</version>
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
index 02a99f6..3abbab5 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
@@ -28,7 +28,6 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintWriter;
-import java.net.InetAddress;
 import java.net.URI;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
@@ -349,29 +348,9 @@
     }
 
     protected boolean isRequestPermitted(IServletRequest request, 
IServletResponse response) throws IOException {
-        if (!isRequestOnLoopback(request)) {
-            rejectForbidden(response);
-            return false;
-        }
         return true;
     }

-    protected boolean isRequestOnLoopback(IServletRequest request) {
-        if (request.getLocalAddress() != null && request.getRemoteAddress() != 
null) {
-            InetAddress local = request.getLocalAddress().getAddress();
-            InetAddress remote = request.getRemoteAddress().getAddress();
-            return remote.isLoopbackAddress() && local.isLoopbackAddress();
-        } else {
-            return false;
-        }
-    }
-
-    protected void rejectForbidden(IServletResponse response) throws 
IOException {
-        // TODO: why this JSON format, do we use this anywhere else?
-        sendError(response, HttpUtil.ContentType.APPLICATION_JSON, 
HttpResponseStatus.FORBIDDEN,
-                "{ \"error\": \"Forbidden\" }");
-    }
-
     @Override
     protected void post(IServletRequest request, IServletResponse response) 
throws IOException {
         if (isRequestPermitted(request, response)) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 0e6a04b..49dc03f 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -30,6 +30,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -97,6 +98,7 @@
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -124,6 +126,7 @@
     protected boolean startupCompleted;
     protected WebManager webManager;
     private HttpServer apiServer;
+    private HttpServer udfServer;

     @Override
     public void registerConfig(IConfigManager configManager) {
@@ -250,19 +253,28 @@
         apiServer.setAttribute(ServletConstants.CREDENTIAL_MAP,
                 parseCredentialMap(((NodeControllerService) 
ncServiceCtx.getControllerService()).getConfiguration()
                         .getCredentialFilePath()));
-        Pair<Map<String, String>, Map<String, String>> auth = 
BasicAuthServlet.generateSysAuthHeader(apiServer.ctx());
-        apiServer
-                .addServlet(new BasicAuthServlet(apiServer.ctx(),
-                        new NCUdfApiServlet(apiServer.ctx(), new String[] { 
UDF }, getApplicationContext(),
-                                apiServer.getScheme(), 
apiServer.getAddress().getPort()),
-                        auth.getFirst(), auth.getSecond()));
-        apiServer.addServlet(new BasicAuthServlet(
-                apiServer.ctx(), new NCUdfRecoveryServlet(apiServer.ctx(), new 
String[] { UDF_RECOVERY },
-                        getApplicationContext(), apiServer.getScheme(), 
apiServer.getAddress().getPort()),
-                auth.getFirst(), auth.getSecond()));
+        if (!getApplicationContext().isCloudDeployment()) {
+            Pair<Map<String, String>, Map<String, String>> auth =
+                    BasicAuthServlet.generateSysAuthHeader(apiServer.ctx());
+            apiServer.addServlet(new BasicAuthServlet(apiServer.ctx(),
+                    new NCUdfRecoveryServlet(apiServer.ctx(), new String[] { 
UDF_RECOVERY }, getApplicationContext(),
+                            apiServer.getScheme(), 
externalProperties.getAPIServerPort()),
+                    auth.getFirst(), auth.getSecond()));
+        }
         apiServer.addServlet(new QueryStatusApiServlet(apiServer.ctx(), 
getApplicationContext(), QUERY_STATUS));
         apiServer.addServlet(new QueryResultApiServlet(apiServer.ctx(), 
getApplicationContext(), QUERY_RESULT));
         webManager.add(apiServer);
+
+        //UDF deployment server
+        IApplicationConfig appCfg = 
getApplicationContext().getServiceContext().getAppConfig();
+        String socketPath = appCfg.getString(NCConfig.Option.UDF_API_DS_PATH);
+        udfServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(), Path.of(socketPath), config);
+        Pair<Map<String, String>, Map<String, String>> auth = 
BasicAuthServlet.generateSysAuthHeader(apiServer.ctx());
+        udfServer.addServlet(new BasicAuthServlet(apiServer.ctx(),
+                new NCUdfApiServlet(apiServer.ctx(), new String[] { UDF }, 
getApplicationContext(),
+                        apiServer.getScheme(), 
externalProperties.getAPIServerPort()),
+                auth.getFirst(), auth.getSecond()));
+        webManager.add(udfServer);
     }

     protected List<AsterixExtension> getExtensions() throws Exception {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
index b86730b..4cac409 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
@@ -28,14 +28,14 @@
 public class CloudStorageIntegrationUtil extends AsterixHyracksIntegrationUtil 
{

     public static final String RESOURCES_PATH = 
joinPath(getProjectPath().toString(), "src", "test", "resources");
-    public static final String CONFIG_FILE = joinPath(RESOURCES_PATH, 
"cc-cloud-storage-main.conf");
-    public static final String CONFIG_FILE_TEMPLATE = joinPath(RESOURCES_PATH, 
"cc-cloud-storage-main.ftl");
+    public static final String TARGET_PATH = 
joinPath(getProjectPath().toString(), "target");
+    public static final String CONFIG_FILE = joinPath(TARGET_PATH, 
"cc-cloud-storage.conf");
+    public static final String CONFIG_FILE_TEMPLATE = joinPath(RESOURCES_PATH, 
"cc-cloud-storage.conf.ftl");

     public static void main(String[] args) throws Exception {
-        boolean cleanStart = Boolean.getBoolean("cleanup.start");
-        LocalCloudUtilAdobeMock.startS3CloudEnvironment(cleanStart);
-        final AsterixHyracksIntegrationUtil integrationUtil = new 
AsterixHyracksIntegrationUtil();
+        boolean cleanStart = true;
         try (S3MockContainer s3Mock = 
LocalCloudUtilAdobeMock.startS3CloudEnvironment(cleanStart)) {
+            final AsterixHyracksIntegrationUtil integrationUtil = new 
AsterixHyracksIntegrationUtil();
             fillConfigTemplate(MOCK_SERVER_HOSTNAME_FRAGMENT + 
s3Mock.getHttpServerPort(), CONFIG_FILE_TEMPLATE,
                     CONFIG_FILE);
             integrationUtil.run(cleanStart, 
Boolean.getBoolean("cleanup.shutdown"),
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
index e32f8ea..49633d2 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
@@ -18,89 +18,75 @@
  */
 package org.apache.asterix.app.external;

+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
 import java.io.File;
 import java.io.IOException;
-import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;

 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpResponse;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.AuthCache;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpDelete;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.protocol.HttpClientContext;
-import org.apache.http.client.utils.URIUtils;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.mime.HttpMultipartMode;
-import org.apache.http.entity.mime.MultipartEntityBuilder;
-import org.apache.http.impl.auth.BasicScheme;
-import org.apache.http.impl.client.BasicAuthCache;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.hyracks.algebricks.common.utils.Pair;

 import com.fasterxml.jackson.databind.ObjectMapper;

+import io.netty.channel.unix.DomainSocketAddress;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.http.client.HttpClientResponse;
+
 @SuppressWarnings("squid:S134")
 public class ExternalUDFLibrarian implements IExternalUDFLibrarian {

-    private HttpClient hc;
-
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

-    public ExternalUDFLibrarian() {
-        hc = new DefaultHttpClient();
+    private static String createAuthHeader(Pair<String, String> credentials) {
+        String auth = credentials.first + ":" + credentials.second;
+        byte[] encodedAuth = 
Base64.getEncoder().encode(auth.getBytes(StandardCharsets.UTF_8));
+        return "Basic " + new String(encodedAuth);
+    }
+
+    public void setCredentials(Pair<String, String> credentials) {
     }

     @Override
-    public void install(URI path, String type, String libPath, Pair<String, 
String> credentials) throws Exception {
-        HttpClientContext hcCtx = createHttpClientContext(path, credentials);
-        HttpPost post = new HttpPost(path);
+    public void install(String path, String type, String libPath, Pair<String, 
String> credentials) throws Exception {
         File lib = new File(libPath);
-        MultipartEntityBuilder entity = 
MultipartEntityBuilder.create().setMode(HttpMultipartMode.STRICT);
-        entity.addTextBody("type", type);
-        entity.addBinaryBody("data", lib, ContentType.DEFAULT_BINARY, 
lib.getName()).build();
-        post.setEntity(entity.build());
-        HttpResponse response = hc.execute(post, hcCtx);
-        handleResponse(response);
+        HttpClient hc = HttpClient.create().headers(h -> 
h.set("Authorization", createAuthHeader(credentials)))
+                .remoteAddress(() -> new 
DomainSocketAddress(joinPath("target", "tmp", "asterix_nc1", "udf.sock")));
+        hc.post().uri(path)
+                .sendForm((req, form) -> form.multipart(true).attr("type", 
type).file("data", lib,
+                        "application/octet-stream"))
+                .responseSingle((response, content) -> 
handleResponse(response, content)).block();
     }

     @Override
-    public void uninstall(URI path, Pair<String, String> credentials) throws 
IOException, AsterixException {
-        HttpClientContext hcCtx = createHttpClientContext(path, credentials);
-        HttpDelete del = new HttpDelete(path);
-        HttpResponse response = hc.execute(del, hcCtx);
-        handleResponse(response);
+    public void uninstall(String path, Pair<String, String> credentials) 
throws IOException, AsterixException {
+        HttpClient hc = HttpClient.create().headers(h -> 
h.set("Authorization", createAuthHeader(credentials)))
+                .remoteAddress(() -> new 
DomainSocketAddress(joinPath("target", "tmp", "asterix_nc1", "udf.sock")));
+        hc.delete().uri(path).responseSingle((response, content) -> 
handleResponse(response, content)).block();
     }

-    private HttpClientContext createHttpClientContext(URI path, Pair<String, 
String> credentials) {
-        HttpClientContext hcCtx = HttpClientContext.create();
-        HttpHost h = URIUtils.extractHost(path);
-        CredentialsProvider cp = new BasicCredentialsProvider();
-        cp.setCredentials(AuthScope.ANY, new 
UsernamePasswordCredentials(credentials.first, credentials.second));
-        hcCtx.setCredentialsProvider(cp);
-        AuthCache ac = new BasicAuthCache();
-        ac.put(h, new BasicScheme());
-        hcCtx.setAuthCache(ac);
-        return hcCtx;
-    }
+    private Mono<Void> handleResponse(HttpClientResponse response, 
reactor.netty.ByteBufMono content) {
+        return content.asString().defaultIfEmpty("").flatMap(body -> {
+            int respCode = response.status().code();
+            if (respCode == HttpResponseStatus.OK.code()) {
+                return Mono.empty();
+            }

-    private void handleResponse(HttpResponse response) throws IOException, 
AsterixException {
-        String resp = null;
-        int respCode = response.getStatusLine().getStatusCode();
-        if (respCode == 500 || respCode == 400) {
-            resp = 
OBJECT_MAPPER.readTree(response.getEntity().getContent()).get("error").asText();
-        }
-        response.getEntity().consumeContent();
-        if (resp == null && respCode != 200) {
-            resp = response.getStatusLine().toString();
-        }
-        if (resp != null) {
-            throw new AsterixException(resp);
-        }
+            String errorMessage;
+            if (respCode == HttpResponseStatus.INTERNAL_SERVER_ERROR.code()
+                    || respCode == HttpResponseStatus.BAD_REQUEST.code()) {
+                try {
+                    errorMessage = 
OBJECT_MAPPER.readTree(body).get("error").asText();
+                } catch (IOException e) {
+                    errorMessage = "Failed to parse error response: " + body;
+                }
+            } else {
+                errorMessage = response.status().toString();
+            }
+            return Mono.error(new AsterixException(errorMessage));
+        });
     }
 }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
index 998fa78..4efcca0 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
@@ -19,13 +19,12 @@
 package org.apache.asterix.app.external;

 import java.io.IOException;
-import java.net.URI;

 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.hyracks.algebricks.common.utils.Pair;

 public interface IExternalUDFLibrarian {
-    void install(URI path, String type, String libPath, Pair<String, String> 
credentials) throws Exception;
+    void install(String path, String type, String libPath, Pair<String, 
String> credentials) throws Exception;

-    void uninstall(URI path, Pair<String, String> credentials) throws 
IOException, AsterixException;
+    void uninstall(String path, Pair<String, String> credentials) throws 
IOException, AsterixException;
 }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 32d6e17..e3f5dc6 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1462,7 +1462,7 @@
                     //TODO: this is not right. URLEncoder does not properly 
encode paths.
                     String dataverse = URLEncoder.encode(command[1], 
StandardCharsets.US_ASCII.name());
                     String library = URLEncoder.encode(command[2], 
StandardCharsets.US_ASCII.name());
-                    URI path = createEndpointURI("/admin/udf/" + dataverse + 
"/" + library);
+                    String path = "/admin/udf/" + dataverse + "/" + library;
                     if (command.length < 2) {
                         throw new Exception("invalid library command: " + 
line);
                     }
@@ -2721,6 +2721,11 @@
         return uri;
     }

+    protected URI createUDFDomainSockURI(String pathAndQuery) {
+        URI uri = URI.create("unix://target/tmp/asterix_nc1/udf.sock" + 
pathAndQuery);
+        return uri;
+    }
+
     public URI getEndpoint(String servlet) throws URISyntaxException {
         return createEndpointURI(Servlets.getAbsolutePath(getPath(servlet)));
     }
diff --git 
a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf 
b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf
index 8745d9d..9efabaa 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf
@@ -21,6 +21,7 @@
 iodevices=target/tmp/asterix_nc1/iodevice1
 iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2
 nc.api.port=19004
+udf.api.ds.path=target/tmp/asterix_nc1/udf.sock
 #jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006

 [nc/asterix_nc2]
@@ -29,6 +30,7 @@
 core.dump.dir=target/tmp/asterix_nc2/coredump
 
iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
 nc.api.port=19005
+udf.api.ds.path=target/tmp/asterix_nc2/udf.sock
 #jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007

 [nc]
@@ -68,7 +70,7 @@
 cloud.storage.scheme=s3
 cloud.storage.bucket=cloud-storage-container
 cloud.storage.region=us-west-2
-cloud.storage.endpoint=http://127.0.0.1:8001
+cloud.storage.endpoint=http://127.0.0.1:46677
 cloud.storage.anonymous.auth=true
 cloud.storage.cache.policy=selective
 cloud.max.write.requests.per.second=2000
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.ftl 
b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.ftl
index 8271bfd..4a33d4e 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.ftl
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.ftl
@@ -21,6 +21,7 @@
 iodevices=target/tmp/asterix_nc1/iodevice1
 iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2
 nc.api.port=19004
+udf.api.ds.path=target/tmp/asterix_nc1/udf.sock
 #jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006

 [nc/asterix_nc2]
@@ -29,6 +30,7 @@
 core.dump.dir=target/tmp/asterix_nc2/coredump
 
iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
 nc.api.port=19005
+udf.api.ds.path=target/tmp/asterix_nc2/udf.sock
 #jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007

 [nc]
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf.ftl 
b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf.ftl
index 74c557a..02e9356 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf.ftl
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf.ftl
@@ -21,6 +21,7 @@
 iodevices=target/tmp/asterix_nc1/iodevice1
 iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2
 nc.api.port=19004
+udf.api.ds.path=target/tmp/asterix_nc1/udf.sock
 #jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006

 [nc/asterix_nc2]
@@ -29,6 +30,7 @@
 core.dump.dir=target/tmp/asterix_nc2/coredump
 
iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
 nc.api.port=19005
+udf.api.ds.path=target/tmp/asterix_nc2/udf.sock
 #jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007

 [nc]
diff --git a/asterixdb/asterix-doc/src/main/user-defined_function/udf.md 
b/asterixdb/asterix-doc/src/main/user-defined_function/udf.md
index 71e0fa2..9768c81 100644
--- a/asterixdb/asterix-doc/src/main/user-defined_function/udf.md
+++ b/asterixdb/asterix-doc/src/main/user-defined_function/udf.md
@@ -52,7 +52,7 @@
 ## <a name="installingUDF">Installing a Java UDF Library</a>

 To install a UDF package to the cluster, we need to send a Multipart Form-data 
HTTP request to the `/admin/udf` endpoint
-of the CC at the normal API port (`19004` by default). Any suitable tool will 
do, but for the example here I will use
+of the cluster through the special UDF domain socket. Any suitable tool will 
do, but for the example here I will use
 `curl` which is widely available.

 For example, to install a library with the following criteria:
@@ -65,7 +65,7 @@

 we would execute

-    curl -v -u admin:admin -X POST -F 'data=@./lib.zip' -F 'type=java' 
localhost:19004/admin/udf/udfs/testlib
+    curl -v -u admin:admin --unix-socket udf.sock -F 'data=@./lib.zip' -F 
'type=java' localhost/admin/udf/udfs/testlib

 Any response other than `200` indicates an error in deployment.

diff --git 
a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
 
b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
index 025f607..3c0e56c 100644
--- 
a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
+++ 
b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.test.podman;

 import java.io.IOException;
-import java.net.URI;

 import org.apache.asterix.app.external.IExternalUDFLibrarian;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -42,7 +41,7 @@
     }

     @Override
-    public void install(URI path, String type, String libPath, Pair<String, 
String> credentials) throws Exception {
+    public void install(String path, String type, String libPath, Pair<String, 
String> credentials) throws Exception {
         Container.ExecResult curlResult = null;
         int retryCt = 0;
         while (retryCt < 10) {
@@ -50,7 +49,7 @@
                 curlResult = asterix.execInContainer("curl", 
"--no-progress-meter", "-X", "POST", "-u",
                         credentials.first + ":" + credentials.second, "-F",
                         "data=@" + "/var/tmp/asterix-app/" + libPath, "-F", 
"type=" + type,
-                        "http://localhost:19004"; + path.getRawPath());
+                        "http://localhost:19004"; + path);
                 handleResponse(curlResult);
                 return;
             } catch (RuntimeException e) {
@@ -62,10 +61,10 @@
     }
 
     @Override
-    public void uninstall(URI path, Pair<String, String> credentials) throws 
IOException, AsterixException {
+    public void uninstall(String path, Pair<String, String> credentials) 
throws IOException, AsterixException {
         try {
             Container.ExecResult curlResult = asterix.execInContainer("curl", 
"-X", "DELETE", "-u",
-                    credentials.first + ":" + credentials.second, 
"http://localhost:19004"; + path.getPath());
+                    credentials.first + ":" + credentials.second, 
"http://localhost:19004"; + path);
             handleResponse(curlResult);
         } catch (InterruptedException e) {
             throw new IOException(e);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index f55dd59..a7b0e76 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -103,6 +103,10 @@
         PYTHON_ARGS(STRING_ARRAY, (String[]) null),
         PYTHON_ENV(STRING_ARRAY, (String[]) null),
         PYTHON_DS_PATH(STRING, (String) null),
+        UDF_API_DS_PATH(
+                STRING,
+                appConfig -> 
FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), 
"udf.sock"),
+                "<value of " + ControllerConfig.Option.DEFAULT_DIR.cmdline() + 
">/udf.sock"),
         LIBRARY_MAX_FILE_SIZE(POSITIVE_LONG_BYTE_UNIT, 250L * 1024 * 1024), 
//250MB
         LIBRARY_MAX_EXTRACTED_SIZE(POSITIVE_LONG_BYTE_UNIT, 1000L * 1024 * 
1024), //1GB
         LIBRARY_MAX_ARCHIVE_ENTRIES(INTEGER, 4096),
@@ -244,6 +248,8 @@
                     return "Number of threads per partition used to write and 
read from storage";
                 case IO_QUEUE_SIZE:
                     return "Length of the queue used for requests to write and 
read";
+                case UDF_API_DS_PATH:
+                    return "Path and name for domain socket used to deploy 
UDFs to the cluster. Defaults to default.dir/udf.sock";
                 case PYTHON_CMD:
                     return "Absolute path to python interpreter";
                 case PYTHON_ADDITIONAL_PACKAGES:
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
index 59c0ae0..faab5d3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
@@ -114,5 +114,9 @@
      */
     InetSocketAddress getLocalAddress();

+    String getHostPort();
+
+    String getRemotePort();
+
     Channel getChannel();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
index 4a72d53..0aaf8fe 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.http.server;

 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,6 +29,7 @@
 import java.util.Set;

 import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.util.NetworkUtil;

 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
@@ -115,6 +117,23 @@
     }

     @Override
+    public String getHostPort() {
+        return getPort(channel.localAddress());
+    }
+
+    @Override
+    public String getRemotePort() {
+        return getPort(channel.remoteAddress());
+    }
+
+    private String getPort(SocketAddress s) {
+        if (s instanceof InetSocketAddress) {
+            return NetworkUtil.toHostPort((InetSocketAddress) 
channel.localAddress());
+        }
+        return "0";
+    }
+
+    @Override
     public Channel getChannel() {
         return channel;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
index 6476ab9..16a29aa 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
@@ -30,6 +30,8 @@
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioDomainSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.DefaultHttpContent;
@@ -71,7 +73,11 @@
         if (msg instanceof HttpRequest) {
             HttpRequest req = (HttpRequest) msg;
             try {
-                clientIp = ((NioSocketChannel) 
ctx.channel()).remoteAddress().getAddress().toString().substring(1);
+                if (ctx.channel() instanceof SocketChannel) {
+                    clientIp = ((NioSocketChannel) 
ctx.channel()).remoteAddress().getAddress().toString().substring(1);
+                } else if (ctx.channel() instanceof NioDomainSocketChannel) {
+                    clientIp = ctx.channel().remoteAddress().toString();
+                }
             } catch (Exception e) {
                 LOGGER.debug("ignoring {} obtaining client ip for {}", e, 
ctx.channel());
                 clientIp = "-";
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 1198945..8ea6d84 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -20,6 +20,9 @@

 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnixDomainSocketAddress;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -52,8 +55,10 @@
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.ServerChannel;
 import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.DuplexChannel;
+import io.netty.channel.socket.nio.NioServerDomainSocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpScheme;
@@ -85,7 +90,7 @@
     private final ServletRegistry servlets;
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
-    private final Set<InetSocketAddress> addresses;
+    private final Set<SocketAddress> addresses;
     private final ThreadPoolExecutor executor;
     // Mutable members
     private volatile int state = STOPPED;
@@ -93,6 +98,7 @@
     private final List<Channel> channels;
     private Throwable cause;
     private HttpServerConfig config;
+    private Class<? extends ServerChannel> channelImpl = 
NioServerSocketChannel.class;

     private final GenericFutureListener<Future<Void>> channelCloseListener = f 
-> {
         // This listener is invoked from within a netty IO thread. Hence, we 
can never block it
@@ -118,8 +124,13 @@
         this(bossGroup, workerGroup, Collections.singletonList(address), 
config, closeHandler);
     }

-    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, 
Collection<InetSocketAddress> addresses,
-            HttpServerConfig config, IChannelClosedHandler closeHandler) {
+    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, 
Path path, HttpServerConfig config) {
+        this(bossGroup, workerGroup, 
Collections.singletonList(UnixDomainSocketAddress.of(path)), config, null);
+    }
+
+    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
+            Collection<? extends SocketAddress> addresses, HttpServerConfig 
config,
+            IChannelClosedHandler closeHandler) {
         if (addresses.isEmpty()) {
             throw new IllegalArgumentException("no addresses specified");
         }
@@ -133,13 +144,19 @@
         servlets = new ServletRegistry();
         workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize());
         int numExecutorThreads = config.getThreadCount();
-        int[] ports = 
this.addresses.stream().mapToInt(InetSocketAddress::getPort).distinct().toArray();
+        int[] ports = addresses.stream().filter(s -> s instanceof 
InetSocketAddress)
+                .mapToInt(s -> ((InetSocketAddress) 
s).getPort()).distinct().toArray();
         String desc;
         if (ports.length > 1) {
-            desc = this.addresses.stream().map(a -> 
a.getAddress().getHostAddress() + ":" + a.getPort())
+            desc = addresses.stream().filter(s -> s instanceof 
InetSocketAddress).map(s -> (InetSocketAddress) s)
+                    .map(a -> (a).getAddress().getHostAddress() + ":" + 
a.getPort())
                     .collect(Collectors.joining(",", "[", "]"));
-        } else {
+        } else if (ports.length == 1) {
             desc = "port:" + ports[0];
+        } else {
+            desc = "path: "
+                    + addresses.stream().findFirst().map(s -> 
((UnixDomainSocketAddress) s).getPath().toString());
+            channelImpl = NioServerDomainSocketChannel.class;
         }
         executor = new ThreadPoolExecutor(numExecutorThreads, 
numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue,
                 runnable -> new Thread(runnable, "HttpExecutor(" + desc + ")-" 
+ threadId.getAndIncrement()));
@@ -278,18 +295,18 @@

     private void bind() throws Exception {
         ServerBootstrap b = new ServerBootstrap();
-        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+        b.group(bossGroup, workerGroup).channel(channelImpl)
                 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new 
FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
                 .childOption(ChannelOption.AUTO_READ, Boolean.FALSE)
                 .childOption(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT)
                 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
WRITE_BUFFER_WATER_MARK)
                 .handler(new 
LoggingHandler(LogLevel.DEBUG)).childHandler(getChannelInitializer());
-        List<Pair<InetSocketAddress, ChannelFuture>> channelFutures = new 
ArrayList<>();
-        for (InetSocketAddress address : addresses) {
+        List<Pair<SocketAddress, ChannelFuture>> channelFutures = new 
ArrayList<>();
+        for (SocketAddress address : addresses) {
             channelFutures.add(org.apache.commons.lang3.tuple.Pair.of(address, 
b.bind(address)));
         }
         Exception failure = null;
-        for (Pair<InetSocketAddress, ChannelFuture> addressFuture : 
channelFutures) {
+        for (Pair<SocketAddress, ChannelFuture> addressFuture : 
channelFutures) {
             try {
                 Channel channel = addressFuture.getRight().sync().channel();
                 channel.closeFuture().addListener(channelCloseListener);
@@ -394,7 +411,7 @@
         return new HttpServerHandler<>(this, chunkSize);
     }

-    protected ChannelInitializer<SocketChannel> getChannelInitializer() {
+    protected ChannelInitializer<DuplexChannel> getChannelInitializer() {
         return new HttpServerInitializer(this);
     }

@@ -429,7 +446,7 @@
     }

     @Deprecated // this returns an arbitrary (the first supplied if collection 
is ordered) address
-    public InetSocketAddress getAddress() {
+    public SocketAddress getAddress() {
         return addresses.iterator().next();
     }

diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index ad8a61f..32b1dbb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -20,11 +20,11 @@

 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.DuplexChannel;
 import io.netty.handler.codec.http.HttpRequestDecoder;
 import io.netty.handler.codec.http.HttpResponseEncoder;

-public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
+public class HttpServerInitializer extends ChannelInitializer<DuplexChannel> {

     private final HttpServer server;
     private final int maxRequestSize;
@@ -44,7 +44,7 @@
     }

     @Override
-    public void initChannel(SocketChannel ch) {
+    public void initChannel(DuplexChannel ch) {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new HttpRequestCapacityController(server));
         p.addLast(new HttpRequestDecoder(maxRequestInitialLineLength, 
maxRequestHeaderSize, maxRequestChunkSize));

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20492?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I5f8ac2170fd6b2beef14d99c38b9141af0f12ba3
Gerrit-Change-Number: 20492
Gerrit-PatchSet: 1
Gerrit-Owner: Ian Maxon <[email protected]>

Reply via email to