>From Ian Maxon <[email protected]>:
Ian Maxon has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20492?usp=email )
Change subject: [ASTERIXDB-3636][API] Migrate UDF API to domain sockets
......................................................................
[ASTERIXDB-3636][API] Migrate UDF API to domain sockets
- user model changes: yes
- storage format changes: no
- interface changes: no
Details:
Migrate the UDF API to use a domain socket, instead of
being restricted to localhost on the normal NC servlet
Change-Id: I5f8ac2170fd6b2beef14d99c38b9141af0f12ba3
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
M asterixdb/asterix-app/pom.xml
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
M asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf
M asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.ftl
M asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf.ftl
M asterixdb/asterix-doc/src/main/user-defined_function/udf.md
M
asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
19 files changed, 178 insertions(+), 132 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/92/20492/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index 524af43..12619a9 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.translator;
-import java.net.InetSocketAddress;
import java.util.UUID;
import org.apache.asterix.common.api.IClientRequest;
@@ -31,18 +30,15 @@
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.http.api.IServletRequest;
-import org.apache.hyracks.util.NetworkUtil;
public class Receptionist implements IReceptionist {
@Override
public IRequestReference welcome(IServletRequest request) {
final String uuid = UUID.randomUUID().toString();
- final InetSocketAddress localAddress = request.getLocalAddress();
- final RequestReference ref =
- RequestReference.of(uuid,
NetworkUtil.toHostPort(localAddress), System.currentTimeMillis());
+ final RequestReference ref = RequestReference.of(uuid,
request.getHostPort(), System.currentTimeMillis());
ref.setUserAgent(request.getHeader(HttpHeaders.USER_AGENT));
- ref.setRemoteAddr(NetworkUtil.toHostPort(request.getRemoteAddress()));
+ ref.setRemoteAddr(request.getRemotePort());
return ref;
}
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 41311d9..31886e2 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -843,6 +843,18 @@
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents.client5</groupId>
+ <artifactId>httpclient5</artifactId>
+ <version>5.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.netty</groupId>
+ <artifactId>reactor-netty</artifactId>
+ <version>1.2.10</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.asterix</groupId>
<artifactId>asterix-lang-common</artifactId>
<version>${project.version}</version>
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
index 02a99f6..3abbab5 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
@@ -28,7 +28,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
-import java.net.InetAddress;
import java.net.URI;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
@@ -349,29 +348,9 @@
}
protected boolean isRequestPermitted(IServletRequest request,
IServletResponse response) throws IOException {
- if (!isRequestOnLoopback(request)) {
- rejectForbidden(response);
- return false;
- }
return true;
}
- protected boolean isRequestOnLoopback(IServletRequest request) {
- if (request.getLocalAddress() != null && request.getRemoteAddress() !=
null) {
- InetAddress local = request.getLocalAddress().getAddress();
- InetAddress remote = request.getRemoteAddress().getAddress();
- return remote.isLoopbackAddress() && local.isLoopbackAddress();
- } else {
- return false;
- }
- }
-
- protected void rejectForbidden(IServletResponse response) throws
IOException {
- // TODO: why this JSON format, do we use this anywhere else?
- sendError(response, HttpUtil.ContentType.APPLICATION_JSON,
HttpResponseStatus.FORBIDDEN,
- "{ \"error\": \"Forbidden\" }");
- }
-
@Override
protected void post(IServletRequest request, IServletResponse response)
throws IOException {
if (isRequestPermitted(request, response)) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 0e6a04b..49dc03f 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -30,6 +30,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -97,6 +98,7 @@
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -124,6 +126,7 @@
protected boolean startupCompleted;
protected WebManager webManager;
private HttpServer apiServer;
+ private HttpServer udfServer;
@Override
public void registerConfig(IConfigManager configManager) {
@@ -250,19 +253,28 @@
apiServer.setAttribute(ServletConstants.CREDENTIAL_MAP,
parseCredentialMap(((NodeControllerService)
ncServiceCtx.getControllerService()).getConfiguration()
.getCredentialFilePath()));
- Pair<Map<String, String>, Map<String, String>> auth =
BasicAuthServlet.generateSysAuthHeader(apiServer.ctx());
- apiServer
- .addServlet(new BasicAuthServlet(apiServer.ctx(),
- new NCUdfApiServlet(apiServer.ctx(), new String[] {
UDF }, getApplicationContext(),
- apiServer.getScheme(),
apiServer.getAddress().getPort()),
- auth.getFirst(), auth.getSecond()));
- apiServer.addServlet(new BasicAuthServlet(
- apiServer.ctx(), new NCUdfRecoveryServlet(apiServer.ctx(), new
String[] { UDF_RECOVERY },
- getApplicationContext(), apiServer.getScheme(),
apiServer.getAddress().getPort()),
- auth.getFirst(), auth.getSecond()));
+ if (!getApplicationContext().isCloudDeployment()) {
+ Pair<Map<String, String>, Map<String, String>> auth =
+ BasicAuthServlet.generateSysAuthHeader(apiServer.ctx());
+ apiServer.addServlet(new BasicAuthServlet(apiServer.ctx(),
+ new NCUdfRecoveryServlet(apiServer.ctx(), new String[] {
UDF_RECOVERY }, getApplicationContext(),
+ apiServer.getScheme(),
externalProperties.getAPIServerPort()),
+ auth.getFirst(), auth.getSecond()));
+ }
apiServer.addServlet(new QueryStatusApiServlet(apiServer.ctx(),
getApplicationContext(), QUERY_STATUS));
apiServer.addServlet(new QueryResultApiServlet(apiServer.ctx(),
getApplicationContext(), QUERY_RESULT));
webManager.add(apiServer);
+
+ //UDF deployment server
+ IApplicationConfig appCfg =
getApplicationContext().getServiceContext().getAppConfig();
+ String socketPath = appCfg.getString(NCConfig.Option.UDF_API_DS_PATH);
+ udfServer = new HttpServer(webManager.getBosses(),
webManager.getWorkers(), Path.of(socketPath), config);
+ Pair<Map<String, String>, Map<String, String>> auth =
BasicAuthServlet.generateSysAuthHeader(apiServer.ctx());
+ udfServer.addServlet(new BasicAuthServlet(apiServer.ctx(),
+ new NCUdfApiServlet(apiServer.ctx(), new String[] { UDF },
getApplicationContext(),
+ apiServer.getScheme(),
externalProperties.getAPIServerPort()),
+ auth.getFirst(), auth.getSecond()));
+ webManager.add(udfServer);
}
protected List<AsterixExtension> getExtensions() throws Exception {
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
index b86730b..4cac409 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
@@ -28,14 +28,14 @@
public class CloudStorageIntegrationUtil extends AsterixHyracksIntegrationUtil
{
public static final String RESOURCES_PATH =
joinPath(getProjectPath().toString(), "src", "test", "resources");
- public static final String CONFIG_FILE = joinPath(RESOURCES_PATH,
"cc-cloud-storage-main.conf");
- public static final String CONFIG_FILE_TEMPLATE = joinPath(RESOURCES_PATH,
"cc-cloud-storage-main.ftl");
+ public static final String TARGET_PATH =
joinPath(getProjectPath().toString(), "target");
+ public static final String CONFIG_FILE = joinPath(TARGET_PATH,
"cc-cloud-storage.conf");
+ public static final String CONFIG_FILE_TEMPLATE = joinPath(RESOURCES_PATH,
"cc-cloud-storage.conf.ftl");
public static void main(String[] args) throws Exception {
- boolean cleanStart = Boolean.getBoolean("cleanup.start");
- LocalCloudUtilAdobeMock.startS3CloudEnvironment(cleanStart);
- final AsterixHyracksIntegrationUtil integrationUtil = new
AsterixHyracksIntegrationUtil();
+ boolean cleanStart = true;
try (S3MockContainer s3Mock =
LocalCloudUtilAdobeMock.startS3CloudEnvironment(cleanStart)) {
+ final AsterixHyracksIntegrationUtil integrationUtil = new
AsterixHyracksIntegrationUtil();
fillConfigTemplate(MOCK_SERVER_HOSTNAME_FRAGMENT +
s3Mock.getHttpServerPort(), CONFIG_FILE_TEMPLATE,
CONFIG_FILE);
integrationUtil.run(cleanStart,
Boolean.getBoolean("cleanup.shutdown"),
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
index e32f8ea..49633d2 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
@@ -18,89 +18,75 @@
*/
package org.apache.asterix.app.external;
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
import java.io.File;
import java.io.IOException;
-import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpResponse;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.AuthCache;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpDelete;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.protocol.HttpClientContext;
-import org.apache.http.client.utils.URIUtils;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.mime.HttpMultipartMode;
-import org.apache.http.entity.mime.MultipartEntityBuilder;
-import org.apache.http.impl.auth.BasicScheme;
-import org.apache.http.impl.client.BasicAuthCache;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.hyracks.algebricks.common.utils.Pair;
import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.channel.unix.DomainSocketAddress;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.http.client.HttpClientResponse;
+
@SuppressWarnings("squid:S134")
public class ExternalUDFLibrarian implements IExternalUDFLibrarian {
- private HttpClient hc;
-
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- public ExternalUDFLibrarian() {
- hc = new DefaultHttpClient();
+ private static String createAuthHeader(Pair<String, String> credentials) {
+ String auth = credentials.first + ":" + credentials.second;
+ byte[] encodedAuth =
Base64.getEncoder().encode(auth.getBytes(StandardCharsets.UTF_8));
+ return "Basic " + new String(encodedAuth);
+ }
+
+ public void setCredentials(Pair<String, String> credentials) {
}
@Override
- public void install(URI path, String type, String libPath, Pair<String,
String> credentials) throws Exception {
- HttpClientContext hcCtx = createHttpClientContext(path, credentials);
- HttpPost post = new HttpPost(path);
+ public void install(String path, String type, String libPath, Pair<String,
String> credentials) throws Exception {
File lib = new File(libPath);
- MultipartEntityBuilder entity =
MultipartEntityBuilder.create().setMode(HttpMultipartMode.STRICT);
- entity.addTextBody("type", type);
- entity.addBinaryBody("data", lib, ContentType.DEFAULT_BINARY,
lib.getName()).build();
- post.setEntity(entity.build());
- HttpResponse response = hc.execute(post, hcCtx);
- handleResponse(response);
+ HttpClient hc = HttpClient.create().headers(h ->
h.set("Authorization", createAuthHeader(credentials)))
+ .remoteAddress(() -> new
DomainSocketAddress(joinPath("target", "tmp", "asterix_nc1", "udf.sock")));
+ hc.post().uri(path)
+ .sendForm((req, form) -> form.multipart(true).attr("type",
type).file("data", lib,
+ "application/octet-stream"))
+ .responseSingle((response, content) ->
handleResponse(response, content)).block();
}
@Override
- public void uninstall(URI path, Pair<String, String> credentials) throws
IOException, AsterixException {
- HttpClientContext hcCtx = createHttpClientContext(path, credentials);
- HttpDelete del = new HttpDelete(path);
- HttpResponse response = hc.execute(del, hcCtx);
- handleResponse(response);
+ public void uninstall(String path, Pair<String, String> credentials)
throws IOException, AsterixException {
+ HttpClient hc = HttpClient.create().headers(h ->
h.set("Authorization", createAuthHeader(credentials)))
+ .remoteAddress(() -> new
DomainSocketAddress(joinPath("target", "tmp", "asterix_nc1", "udf.sock")));
+ hc.delete().uri(path).responseSingle((response, content) ->
handleResponse(response, content)).block();
}
- private HttpClientContext createHttpClientContext(URI path, Pair<String,
String> credentials) {
- HttpClientContext hcCtx = HttpClientContext.create();
- HttpHost h = URIUtils.extractHost(path);
- CredentialsProvider cp = new BasicCredentialsProvider();
- cp.setCredentials(AuthScope.ANY, new
UsernamePasswordCredentials(credentials.first, credentials.second));
- hcCtx.setCredentialsProvider(cp);
- AuthCache ac = new BasicAuthCache();
- ac.put(h, new BasicScheme());
- hcCtx.setAuthCache(ac);
- return hcCtx;
- }
+ private Mono<Void> handleResponse(HttpClientResponse response,
reactor.netty.ByteBufMono content) {
+ return content.asString().defaultIfEmpty("").flatMap(body -> {
+ int respCode = response.status().code();
+ if (respCode == HttpResponseStatus.OK.code()) {
+ return Mono.empty();
+ }
- private void handleResponse(HttpResponse response) throws IOException,
AsterixException {
- String resp = null;
- int respCode = response.getStatusLine().getStatusCode();
- if (respCode == 500 || respCode == 400) {
- resp =
OBJECT_MAPPER.readTree(response.getEntity().getContent()).get("error").asText();
- }
- response.getEntity().consumeContent();
- if (resp == null && respCode != 200) {
- resp = response.getStatusLine().toString();
- }
- if (resp != null) {
- throw new AsterixException(resp);
- }
+ String errorMessage;
+ if (respCode == HttpResponseStatus.INTERNAL_SERVER_ERROR.code()
+ || respCode == HttpResponseStatus.BAD_REQUEST.code()) {
+ try {
+ errorMessage =
OBJECT_MAPPER.readTree(body).get("error").asText();
+ } catch (IOException e) {
+ errorMessage = "Failed to parse error response: " + body;
+ }
+ } else {
+ errorMessage = response.status().toString();
+ }
+ return Mono.error(new AsterixException(errorMessage));
+ });
}
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
index 998fa78..4efcca0 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
@@ -19,13 +19,12 @@
package org.apache.asterix.app.external;
import java.io.IOException;
-import java.net.URI;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.hyracks.algebricks.common.utils.Pair;
public interface IExternalUDFLibrarian {
- void install(URI path, String type, String libPath, Pair<String, String>
credentials) throws Exception;
+ void install(String path, String type, String libPath, Pair<String,
String> credentials) throws Exception;
- void uninstall(URI path, Pair<String, String> credentials) throws
IOException, AsterixException;
+ void uninstall(String path, Pair<String, String> credentials) throws
IOException, AsterixException;
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 32d6e17..e3f5dc6 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1462,7 +1462,7 @@
//TODO: this is not right. URLEncoder does not properly
encode paths.
String dataverse = URLEncoder.encode(command[1],
StandardCharsets.US_ASCII.name());
String library = URLEncoder.encode(command[2],
StandardCharsets.US_ASCII.name());
- URI path = createEndpointURI("/admin/udf/" + dataverse +
"/" + library);
+ String path = "/admin/udf/" + dataverse + "/" + library;
if (command.length < 2) {
throw new Exception("invalid library command: " +
line);
}
@@ -2721,6 +2721,11 @@
return uri;
}
+ protected URI createUDFDomainSockURI(String pathAndQuery) {
+ URI uri = URI.create("unix://target/tmp/asterix_nc1/udf.sock" +
pathAndQuery);
+ return uri;
+ }
+
public URI getEndpoint(String servlet) throws URISyntaxException {
return createEndpointURI(Servlets.getAbsolutePath(getPath(servlet)));
}
diff --git
a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf
b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf
index 8745d9d..9efabaa 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf
@@ -21,6 +21,7 @@
iodevices=target/tmp/asterix_nc1/iodevice1
iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2
nc.api.port=19004
+udf.api.ds.path=target/tmp/asterix_nc1/udf.sock
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
[nc/asterix_nc2]
@@ -29,6 +30,7 @@
core.dump.dir=target/tmp/asterix_nc2/coredump
iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
nc.api.port=19005
+udf.api.ds.path=target/tmp/asterix_nc2/udf.sock
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
[nc]
@@ -68,7 +70,7 @@
cloud.storage.scheme=s3
cloud.storage.bucket=cloud-storage-container
cloud.storage.region=us-west-2
-cloud.storage.endpoint=http://127.0.0.1:8001
+cloud.storage.endpoint=http://127.0.0.1:46677
cloud.storage.anonymous.auth=true
cloud.storage.cache.policy=selective
cloud.max.write.requests.per.second=2000
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.ftl
b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.ftl
index 8271bfd..4a33d4e 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.ftl
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.ftl
@@ -21,6 +21,7 @@
iodevices=target/tmp/asterix_nc1/iodevice1
iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2
nc.api.port=19004
+udf.api.ds.path=target/tmp/asterix_nc1/udf.sock
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
[nc/asterix_nc2]
@@ -29,6 +30,7 @@
core.dump.dir=target/tmp/asterix_nc2/coredump
iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
nc.api.port=19005
+udf.api.ds.path=target/tmp/asterix_nc2/udf.sock
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
[nc]
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf.ftl
b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf.ftl
index 74c557a..02e9356 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf.ftl
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf.ftl
@@ -21,6 +21,7 @@
iodevices=target/tmp/asterix_nc1/iodevice1
iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2
nc.api.port=19004
+udf.api.ds.path=target/tmp/asterix_nc1/udf.sock
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
[nc/asterix_nc2]
@@ -29,6 +30,7 @@
core.dump.dir=target/tmp/asterix_nc2/coredump
iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
nc.api.port=19005
+udf.api.ds.path=target/tmp/asterix_nc2/udf.sock
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
[nc]
diff --git a/asterixdb/asterix-doc/src/main/user-defined_function/udf.md
b/asterixdb/asterix-doc/src/main/user-defined_function/udf.md
index 71e0fa2..9768c81 100644
--- a/asterixdb/asterix-doc/src/main/user-defined_function/udf.md
+++ b/asterixdb/asterix-doc/src/main/user-defined_function/udf.md
@@ -52,7 +52,7 @@
## <a name="installingUDF">Installing a Java UDF Library</a>
To install a UDF package to the cluster, we need to send a Multipart Form-data
HTTP request to the `/admin/udf` endpoint
-of the CC at the normal API port (`19004` by default). Any suitable tool will
do, but for the example here I will use
+of the cluster through the special UDF domain socket. Any suitable tool will
do, but for the example here I will use
`curl` which is widely available.
For example, to install a library with the following criteria:
@@ -65,7 +65,7 @@
we would execute
- curl -v -u admin:admin -X POST -F 'data=@./lib.zip' -F 'type=java'
localhost:19004/admin/udf/udfs/testlib
+ curl -v -u admin:admin --unix-socket udf.sock -F 'data=@./lib.zip' -F
'type=java' localhost/admin/udf/udfs/testlib
Any response other than `200` indicates an error in deployment.
diff --git
a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
index 025f607..3c0e56c 100644
---
a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
+++
b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
@@ -19,7 +19,6 @@
package org.apache.asterix.test.podman;
import java.io.IOException;
-import java.net.URI;
import org.apache.asterix.app.external.IExternalUDFLibrarian;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -42,7 +41,7 @@
}
@Override
- public void install(URI path, String type, String libPath, Pair<String,
String> credentials) throws Exception {
+ public void install(String path, String type, String libPath, Pair<String,
String> credentials) throws Exception {
Container.ExecResult curlResult = null;
int retryCt = 0;
while (retryCt < 10) {
@@ -50,7 +49,7 @@
curlResult = asterix.execInContainer("curl",
"--no-progress-meter", "-X", "POST", "-u",
credentials.first + ":" + credentials.second, "-F",
"data=@" + "/var/tmp/asterix-app/" + libPath, "-F",
"type=" + type,
- "http://localhost:19004" + path.getRawPath());
+ "http://localhost:19004" + path);
handleResponse(curlResult);
return;
} catch (RuntimeException e) {
@@ -62,10 +61,10 @@
}
@Override
- public void uninstall(URI path, Pair<String, String> credentials) throws
IOException, AsterixException {
+ public void uninstall(String path, Pair<String, String> credentials)
throws IOException, AsterixException {
try {
Container.ExecResult curlResult = asterix.execInContainer("curl",
"-X", "DELETE", "-u",
- credentials.first + ":" + credentials.second,
"http://localhost:19004" + path.getPath());
+ credentials.first + ":" + credentials.second,
"http://localhost:19004" + path);
handleResponse(curlResult);
} catch (InterruptedException e) {
throw new IOException(e);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index f55dd59..a7b0e76 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -103,6 +103,10 @@
PYTHON_ARGS(STRING_ARRAY, (String[]) null),
PYTHON_ENV(STRING_ARRAY, (String[]) null),
PYTHON_DS_PATH(STRING, (String) null),
+ UDF_API_DS_PATH(
+ STRING,
+ appConfig ->
FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR),
"udf.sock"),
+ "<value of " + ControllerConfig.Option.DEFAULT_DIR.cmdline() +
">/udf.sock"),
LIBRARY_MAX_FILE_SIZE(POSITIVE_LONG_BYTE_UNIT, 250L * 1024 * 1024),
//250MB
LIBRARY_MAX_EXTRACTED_SIZE(POSITIVE_LONG_BYTE_UNIT, 1000L * 1024 *
1024), //1GB
LIBRARY_MAX_ARCHIVE_ENTRIES(INTEGER, 4096),
@@ -244,6 +248,8 @@
return "Number of threads per partition used to write and
read from storage";
case IO_QUEUE_SIZE:
return "Length of the queue used for requests to write and
read";
+ case UDF_API_DS_PATH:
+ return "Path and name for domain socket used to deploy
UDFs to the cluster. Defaults to default.dir/udf.sock";
case PYTHON_CMD:
return "Absolute path to python interpreter";
case PYTHON_ADDITIONAL_PACKAGES:
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
index 59c0ae0..faab5d3 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
@@ -114,5 +114,9 @@
*/
InetSocketAddress getLocalAddress();
+ String getHostPort();
+
+ String getRemotePort();
+
Channel getChannel();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
index 4a72d53..0aaf8fe 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.http.server;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -28,6 +29,7 @@
import java.util.Set;
import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.util.NetworkUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@@ -115,6 +117,23 @@
}
@Override
+ public String getHostPort() {
+ return getPort(channel.localAddress());
+ }
+
+ @Override
+ public String getRemotePort() {
+ return getPort(channel.remoteAddress());
+ }
+
+ private String getPort(SocketAddress s) {
+ if (s instanceof InetSocketAddress) {
+ return NetworkUtil.toHostPort((InetSocketAddress)
channel.localAddress());
+ }
+ return "0";
+ }
+
+ @Override
public Channel getChannel() {
return channel;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
index 6476ab9..16a29aa 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
@@ -30,6 +30,8 @@
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioDomainSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpContent;
@@ -71,7 +73,11 @@
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
try {
- clientIp = ((NioSocketChannel)
ctx.channel()).remoteAddress().getAddress().toString().substring(1);
+ if (ctx.channel() instanceof SocketChannel) {
+ clientIp = ((NioSocketChannel)
ctx.channel()).remoteAddress().getAddress().toString().substring(1);
+ } else if (ctx.channel() instanceof NioDomainSocketChannel) {
+ clientIp = ctx.channel().remoteAddress().toString();
+ }
} catch (Exception e) {
LOGGER.debug("ignoring {} obtaining client ip for {}", e,
ctx.channel());
clientIp = "-";
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 1198945..8ea6d84 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -20,6 +20,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnixDomainSocketAddress;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -52,8 +55,10 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.ServerChannel;
import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.DuplexChannel;
+import io.netty.channel.socket.nio.NioServerDomainSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpScheme;
@@ -85,7 +90,7 @@
private final ServletRegistry servlets;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
- private final Set<InetSocketAddress> addresses;
+ private final Set<SocketAddress> addresses;
private final ThreadPoolExecutor executor;
// Mutable members
private volatile int state = STOPPED;
@@ -93,6 +98,7 @@
private final List<Channel> channels;
private Throwable cause;
private HttpServerConfig config;
+ private Class<? extends ServerChannel> channelImpl =
NioServerSocketChannel.class;
private final GenericFutureListener<Future<Void>> channelCloseListener = f
-> {
// This listener is invoked from within a netty IO thread. Hence, we
can never block it
@@ -118,8 +124,13 @@
this(bossGroup, workerGroup, Collections.singletonList(address),
config, closeHandler);
}
- public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
Collection<InetSocketAddress> addresses,
- HttpServerConfig config, IChannelClosedHandler closeHandler) {
+ public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
Path path, HttpServerConfig config) {
+ this(bossGroup, workerGroup,
Collections.singletonList(UnixDomainSocketAddress.of(path)), config, null);
+ }
+
+ public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
+ Collection<? extends SocketAddress> addresses, HttpServerConfig
config,
+ IChannelClosedHandler closeHandler) {
if (addresses.isEmpty()) {
throw new IllegalArgumentException("no addresses specified");
}
@@ -133,13 +144,19 @@
servlets = new ServletRegistry();
workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize());
int numExecutorThreads = config.getThreadCount();
- int[] ports =
this.addresses.stream().mapToInt(InetSocketAddress::getPort).distinct().toArray();
+ int[] ports = addresses.stream().filter(s -> s instanceof
InetSocketAddress)
+ .mapToInt(s -> ((InetSocketAddress)
s).getPort()).distinct().toArray();
String desc;
if (ports.length > 1) {
- desc = this.addresses.stream().map(a ->
a.getAddress().getHostAddress() + ":" + a.getPort())
+ desc = addresses.stream().filter(s -> s instanceof
InetSocketAddress).map(s -> (InetSocketAddress) s)
+ .map(a -> (a).getAddress().getHostAddress() + ":" +
a.getPort())
.collect(Collectors.joining(",", "[", "]"));
- } else {
+ } else if (ports.length == 1) {
desc = "port:" + ports[0];
+ } else {
+ desc = "path: "
+ + addresses.stream().findFirst().map(s ->
((UnixDomainSocketAddress) s).getPath().toString());
+ channelImpl = NioServerDomainSocketChannel.class;
}
executor = new ThreadPoolExecutor(numExecutorThreads,
numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue,
runnable -> new Thread(runnable, "HttpExecutor(" + desc + ")-"
+ threadId.getAndIncrement()));
@@ -278,18 +295,18 @@
private void bind() throws Exception {
ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+ b.group(bossGroup, workerGroup).channel(channelImpl)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new
FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
.childOption(ChannelOption.AUTO_READ, Boolean.FALSE)
.childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
WRITE_BUFFER_WATER_MARK)
.handler(new
LoggingHandler(LogLevel.DEBUG)).childHandler(getChannelInitializer());
- List<Pair<InetSocketAddress, ChannelFuture>> channelFutures = new
ArrayList<>();
- for (InetSocketAddress address : addresses) {
+ List<Pair<SocketAddress, ChannelFuture>> channelFutures = new
ArrayList<>();
+ for (SocketAddress address : addresses) {
channelFutures.add(org.apache.commons.lang3.tuple.Pair.of(address,
b.bind(address)));
}
Exception failure = null;
- for (Pair<InetSocketAddress, ChannelFuture> addressFuture :
channelFutures) {
+ for (Pair<SocketAddress, ChannelFuture> addressFuture :
channelFutures) {
try {
Channel channel = addressFuture.getRight().sync().channel();
channel.closeFuture().addListener(channelCloseListener);
@@ -394,7 +411,7 @@
return new HttpServerHandler<>(this, chunkSize);
}
- protected ChannelInitializer<SocketChannel> getChannelInitializer() {
+ protected ChannelInitializer<DuplexChannel> getChannelInitializer() {
return new HttpServerInitializer(this);
}
@@ -429,7 +446,7 @@
}
@Deprecated // this returns an arbitrary (the first supplied if collection
is ordered) address
- public InetSocketAddress getAddress() {
+ public SocketAddress getAddress() {
return addresses.iterator().next();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index ad8a61f..32b1dbb 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -20,11 +20,11 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.DuplexChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
-public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
+public class HttpServerInitializer extends ChannelInitializer<DuplexChannel> {
private final HttpServer server;
private final int maxRequestSize;
@@ -44,7 +44,7 @@
}
@Override
- public void initChannel(SocketChannel ch) {
+ public void initChannel(DuplexChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpRequestCapacityController(server));
p.addLast(new HttpRequestDecoder(maxRequestInitialLineLength,
maxRequestHeaderSize, maxRequestChunkSize));
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20492?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I5f8ac2170fd6b2beef14d99c38b9141af0f12ba3
Gerrit-Change-Number: 20492
Gerrit-PatchSet: 1
Gerrit-Owner: Ian Maxon <[email protected]>