This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 68cbe16 [NO ISSUE] Allow UDF Requests from NC 68cbe16 is described below commit 68cbe169d4ec4ca257a0fe9ec3f25b4aaaf686d7 Author: Ian Maxon <ian.ma...@couchbase.com> AuthorDate: Tue Oct 27 16:43:55 2020 -0700 [NO ISSUE] Allow UDF Requests from NC - Route UDF requests on NCs to the CC - Enable library tests to round-robin betwen NC and CC Change-Id: I16557c2efb4622c9639c2992c8b2ef0624bd650e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8186 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Dmitry Lychagin <dmitry.lycha...@couchbase.com> --- .../asterix/api/http/server/NCUdfApiServlet.java | 168 +++++++++++++++++++++ .../asterix/api/http/server/UdfApiServlet.java | 64 ++++---- .../message/AbstractInternalRequestMessage.java | 123 +++++++++++++++ .../app/message/CreateLibraryRequestMessage.java | 55 +++++++ .../app/message/DropLibraryRequestMessage.java | 48 ++++++ .../message/ExecuteStatementRequestMessage.java | 9 +- .../app/message/InternalRequestResponse.java | 53 +++++++ .../asterix/hyracks/bootstrap/CCApplication.java | 3 +- .../asterix/hyracks/bootstrap/NCApplication.java | 36 +++++ .../asterix/app/external/ExternalUDFLibrarian.java | 28 +--- .../app/external/IExternalUDFLibrarian.java | 7 +- .../apache/asterix/test/common/TestExecutor.java | 9 +- .../asterix/test/runtime/SqlppExecutionIT.java | 23 ++- asterixdb/asterix-app/src/test/resources/cc.conf | 1 + .../asterix/common/config/ExternalProperties.java | 16 +- .../control/common/controllers/CCConfig.java | 15 +- .../common/controllers/ControllerConfig.java | 3 +- .../control/common/controllers/NCConfig.java | 14 +- 18 files changed, 598 insertions(+), 77 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java new file mode 100644 index 0000000..6d7a847 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.app.message.CreateLibraryRequestMessage; +import org.apache.asterix.app.message.DropLibraryRequestMessage; +import org.apache.asterix.app.message.InternalRequestResponse; +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.api.IRequestReference; +import org.apache.asterix.common.functions.ExternalFunctionLanguage; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.external.library.ExternalLibraryManager; +import org.apache.commons.io.IOUtils; +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.control.common.context.ServerContext; +import org.apache.hyracks.control.common.work.SynchronizableWork; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.utils.HttpUtil; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpScheme; + +public class NCUdfApiServlet extends UdfApiServlet { + + INcApplicationContext appCtx; + INCServiceContext srvCtx; + + public NCUdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx, + ILangCompilationProvider compilationProvider, HttpScheme httpServerProtocol, int httpServerPort) { + super(ctx, paths, appCtx, compilationProvider, null, null, httpServerProtocol, httpServerPort); + } + + @Override + public void init() throws IOException { + appCtx = (INcApplicationContext) plainAppCtx; + srvCtx = this.appCtx.getServiceContext(); + workingDir = Paths.get(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath()).normalize() + .resolve(Paths.get(ServerContext.APP_DIR_NAME, ExternalLibraryManager.LIBRARY_MANAGER_BASE_DIR_NAME, + "tmp")); + initAuth(); + initStorage(); + } + + @Override + protected void doCreate(DataverseName dataverseName, String libraryName, ExternalFunctionLanguage language, + URI downloadURI, boolean replaceIfExists, String sysAuthHeader, IRequestReference requestReference, + IServletRequest request, IServletResponse response) throws Exception { + INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker(); + MessageFuture responseFuture = ncMb.registerMessageFuture(); + CreateLibraryRequestMessage req = new CreateLibraryRequestMessage(srvCtx.getNodeId(), + responseFuture.getFutureId(), dataverseName, libraryName, language, downloadURI, replaceIfExists, + sysAuthHeader, requestReference, additionalHttpHeadersFromRequest(request)); + sendMessage(req, responseFuture, requestReference, request, response); + } + + @Override + protected void doDrop(DataverseName dataverseName, String libraryName, boolean replaceIfExists, + IRequestReference requestReference, IServletRequest request, IServletResponse response) throws Exception { + INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker(); + MessageFuture responseFuture = ncMb.registerMessageFuture(); + DropLibraryRequestMessage req = + new DropLibraryRequestMessage(srvCtx.getNodeId(), responseFuture.getFutureId(), dataverseName, + libraryName, replaceIfExists, requestReference, additionalHttpHeadersFromRequest(request)); + sendMessage(req, responseFuture, requestReference, request, response); + } + + private void sendMessage(ICcAddressedMessage requestMessage, MessageFuture responseFuture, + IRequestReference requestReference, IServletRequest request, IServletResponse response) throws Exception { + // Running on NC -> send 'execute' message to CC + INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker(); + InternalRequestResponse responseMsg; + try { + ncMb.sendMessageToPrimaryCC(requestMessage); + responseMsg = (InternalRequestResponse) responseFuture.get(120000, TimeUnit.MILLISECONDS); + + } finally { + ncMb.deregisterMessageFuture(responseFuture.getFutureId()); + } + + Throwable err = responseMsg.getError(); + if (err != null) { + if (err instanceof Error) { + throw (Error) err; + } else if (err instanceof Exception) { + throw (Exception) err; + } else { + throw new Exception(err.toString(), err); + } + } + } + + @Override + protected void get(IServletRequest request, IServletResponse response) throws Exception { + String localPath = localPath(request); + while (localPath.startsWith("/")) { + localPath = localPath.substring(1); + } + if (localPath.isEmpty()) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + Path filePath = workingDir.resolve(localPath).normalize(); + if (!filePath.startsWith(workingDir)) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + readFromFile(filePath, response); + } + + @Override + protected void readFromFile(Path filePath, IServletResponse response) throws Exception { + class InputStreamGetter extends SynchronizableWork { + private InputStream is; + + @Override + protected void doRun() throws Exception { + is = Files.newInputStream(filePath); + } + } + + InputStreamGetter r = new InputStreamGetter(); + ((NodeControllerService) srvCtx.getControllerService()).getWorkQueue().scheduleAndSync(r); + + if (r.is == null) { + response.setStatus(HttpResponseStatus.NOT_FOUND); + return; + } + try { + response.setStatus(HttpResponseStatus.OK); + HttpUtil.setContentType(response, "application/octet-stream"); + IOUtils.copyLarge(r.is, response.outputStream()); + } finally { + r.is.close(); + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java index 18139f6..360f5a0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java @@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.asterix.app.result.ResponsePrinter; import org.apache.asterix.app.translator.RequestParameters; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.api.IReceptionist; import org.apache.asterix.common.api.IRequestReference; @@ -91,8 +92,9 @@ public class UdfApiServlet extends AbstractServlet { private static final Logger LOGGER = LogManager.getLogger(); - protected final ICcApplicationContext appCtx; - private final ClusterControllerService ccs; + protected final IApplicationContext plainAppCtx; + private ICcApplicationContext appCtx; + private ClusterControllerService ccs; private final HttpScheme httpServerProtocol; private final int httpServerPort; @@ -100,29 +102,30 @@ public class UdfApiServlet extends AbstractServlet { protected final IStatementExecutorFactory statementExecutorFactory; protected final IStorageComponentProvider componentProvider; protected final IReceptionist receptionist; - protected final Path workingDir; + protected Path workingDir; protected String sysAuthHeader; - public UdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx, + public UdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx, ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory, IStorageComponentProvider componentProvider, HttpScheme httpServerProtocol, int httpServerPort) { super(ctx, paths); - this.appCtx = appCtx; - ICCServiceContext srvCtx = appCtx.getServiceContext(); - this.ccs = (ClusterControllerService) srvCtx.getControllerService(); + this.plainAppCtx = appCtx; this.compilationProvider = compilationProvider; this.statementExecutorFactory = statementExecutorFactory; this.componentProvider = componentProvider; this.receptionist = appCtx.getReceptionist(); this.httpServerProtocol = httpServerProtocol; this.httpServerPort = httpServerPort; - File baseDir = srvCtx.getServerCtx().getBaseDir(); - this.workingDir = baseDir.getAbsoluteFile().toPath().normalize().resolve( - Paths.get(ServerContext.APP_DIR_NAME, ExternalLibraryManager.LIBRARY_MANAGER_BASE_DIR_NAME, "tmp")); } @Override public void init() throws IOException { + appCtx = (ICcApplicationContext) plainAppCtx; + ICCServiceContext srvCtx = this.appCtx.getServiceContext(); + this.ccs = (ClusterControllerService) srvCtx.getControllerService(); + File baseDir = srvCtx.getServerCtx().getBaseDir(); + this.workingDir = baseDir.getAbsoluteFile().toPath().normalize().resolve( + Paths.get(ServerContext.APP_DIR_NAME, ExternalLibraryManager.LIBRARY_MANAGER_BASE_DIR_NAME, "tmp")); initAuth(); initStorage(); } @@ -151,11 +154,6 @@ public class UdfApiServlet extends AbstractServlet { @Override protected void post(IServletRequest request, IServletResponse response) { - IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState(); - if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { - response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE); - return; - } HttpRequest httpRequest = request.getHttpRequest(); Pair<DataverseName, String> libraryName = parseLibraryName(request); if (libraryName == null) { @@ -190,9 +188,8 @@ public class UdfApiServlet extends AbstractServlet { } fileUpload.renameTo(libraryTempFile.toFile()); URI downloadURI = createDownloadURI(libraryTempFile); - CreateLibraryStatement stmt = new CreateLibraryStatement(libraryName.first, libraryName.second, - language, downloadURI, true, sysAuthHeader); - executeStatement(stmt, requestReference, request); + doCreate(libraryName.first, libraryName.second, language, downloadURI, true, sysAuthHeader, + requestReference, request, response); response.setStatus(HttpResponseStatus.OK); } catch (Exception e) { response.setStatus(toHttpErrorStatus(e)); @@ -213,6 +210,14 @@ public class UdfApiServlet extends AbstractServlet { } } + protected void doCreate(DataverseName dataverseName, String libraryName, ExternalFunctionLanguage language, + URI downloadURI, boolean replaceIfExists, String sysAuthHeader, IRequestReference requestReference, + IServletRequest request, IServletResponse response) throws Exception { + CreateLibraryStatement stmt = new CreateLibraryStatement(dataverseName, libraryName, language, downloadURI, + replaceIfExists, sysAuthHeader); + executeStatement(stmt, requestReference, request, response); + } + protected URI createDownloadURI(Path file) throws Exception { String path = paths[0].substring(0, trims[0]) + '/' + file.getFileName(); String host = getHyracksClientConnection().getHost(); @@ -220,11 +225,6 @@ public class UdfApiServlet extends AbstractServlet { } protected void delete(IServletRequest request, IServletResponse response) { - IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState(); - if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { - response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE); - return; - } Pair<DataverseName, String> libraryName = parseLibraryName(request); if (libraryName == null) { response.setStatus(HttpResponseStatus.BAD_REQUEST); @@ -232,8 +232,7 @@ public class UdfApiServlet extends AbstractServlet { } try { IRequestReference requestReference = receptionist.welcome(request); - LibraryDropStatement stmt = new LibraryDropStatement(libraryName.first, libraryName.second, false); - executeStatement(stmt, requestReference, request); + doDrop(libraryName.first, libraryName.second, false, requestReference, request, response); response.setStatus(HttpResponseStatus.OK); } catch (Exception e) { response.setStatus(toHttpErrorStatus(e)); @@ -244,8 +243,19 @@ public class UdfApiServlet extends AbstractServlet { } } - protected void executeStatement(Statement statement, IRequestReference requestReference, IServletRequest request) - throws Exception { + protected void doDrop(DataverseName dataverseName, String libraryName, boolean replaceIfExists, + IRequestReference requestReference, IServletRequest request, IServletResponse response) throws Exception { + LibraryDropStatement stmt = new LibraryDropStatement(dataverseName, libraryName, replaceIfExists); + executeStatement(stmt, requestReference, request, response); + } + + protected void executeStatement(Statement statement, IRequestReference requestReference, IServletRequest request, + IServletResponse response) throws Exception { + IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState(); + if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { + response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE); + return; + } SessionOutput sessionOutput = new SessionOutput(new SessionConfig(SessionConfig.OutputFormat.ADM), new PrintWriter(NullWriter.NULL_WRITER)); ResponsePrinter printer = new ResponsePrinter(sessionOutput); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractInternalRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractInternalRequestMessage.java new file mode 100644 index 0000000..26aac63 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractInternalRequestMessage.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.app.message; + +import java.io.PrintWriter; +import java.util.Collections; +import java.util.Map; + +import org.apache.asterix.algebra.base.ILangExtension; +import org.apache.asterix.app.cc.CCExtensionManager; +import org.apache.asterix.app.result.ResponsePrinter; +import org.apache.asterix.app.translator.RequestParameters; +import org.apache.asterix.common.api.IRequestReference; +import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.hyracks.bootstrap.CCApplication; +import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.messaging.CCMessageBroker; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.translator.IRequestParameters; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutorFactory; +import org.apache.asterix.translator.ResultProperties; +import org.apache.asterix.translator.SessionConfig; +import org.apache.asterix.translator.SessionOutput; +import org.apache.commons.io.output.NullWriter; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public abstract class AbstractInternalRequestMessage implements ICcAddressedMessage { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final long serialVersionUID = 1L; + final String nodeRequestId; + final long requestMessageId; + final IRequestReference requestReference; + final Map<String, String> additionalParams; + + public AbstractInternalRequestMessage(String nodeRequestId, long requestMessageId, + IRequestReference requestReference, Map<String, String> additionalParams) { + this.nodeRequestId = nodeRequestId; + this.requestMessageId = requestMessageId; + this.requestReference = requestReference; + this.additionalParams = additionalParams; + } + + @Override + public void handle(ICcApplicationContext ccAppCtx) throws HyracksDataException { + ICCServiceContext ccSrvContext = ccAppCtx.getServiceContext(); + ClusterControllerService ccSrv = (ClusterControllerService) ccSrvContext.getControllerService(); + CCApplication ccApp = (CCApplication) ccSrv.getApplication(); + CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker(); + final RuntimeDataException rejectionReason = + ExecuteStatementRequestMessage.getRejectionReason(ccSrv, nodeRequestId); + if (rejectionReason != null) { + ExecuteStatementRequestMessage.sendRejection(rejectionReason, messageBroker, requestMessageId, + nodeRequestId); + return; + } + CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager(); + ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(ILangExtension.Language.SQLPP); + IStorageComponentProvider componentProvider = ccAppCtx.getStorageComponentProvider(); + IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory(); + InternalRequestResponse responseMsg = new InternalRequestResponse(requestMessageId); + SessionOutput sessionOutput = new SessionOutput(new SessionConfig(SessionConfig.OutputFormat.ADM), + new PrintWriter(NullWriter.NULL_WRITER)); + ResponsePrinter printer = new ResponsePrinter(sessionOutput); + ResultProperties resultProperties = new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE, 1); + IRequestParameters requestParams = new RequestParameters(requestReference, "", null, resultProperties, + new IStatementExecutor.Stats(), new IStatementExecutor.StatementProperties(), null, null, + additionalParams, Collections.emptyMap(), false); + MetadataManager.INSTANCE.init(); + IStatementExecutor translator = + statementExecutorFactory.create(ccAppCtx, Collections.singletonList(produceStatement()), sessionOutput, + compilationProvider, componentProvider, printer); + try { + translator.compileAndExecute(ccAppCtx.getHcc(), requestParams); + } catch (AlgebricksException | HyracksException | org.apache.asterix.lang.sqlpp.parser.TokenMgrError pe) { + // we trust that "our" exceptions are serializable and have a comprehensible error message + GlobalConfig.ASTERIX_LOGGER.log(Level.WARN, pe.getMessage(), pe); + responseMsg.setError(pe); + } catch (Exception e) { + GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, "Unexpected exception", e); + responseMsg.setError(e); + } + try { + messageBroker.sendApplicationMessageToNC(responseMsg, nodeRequestId); + } catch (Exception e) { + LOGGER.log(Level.WARN, e.toString(), e); + } + + } + + protected abstract Statement produceStatement(); + +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java new file mode 100644 index 0000000..818a098 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import java.net.URI; +import java.util.Map; + +import org.apache.asterix.common.api.IRequestReference; +import org.apache.asterix.common.functions.ExternalFunctionLanguage; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.lang.common.statement.CreateLibraryStatement; + +public final class CreateLibraryRequestMessage extends AbstractInternalRequestMessage { + + final DataverseName dataverseName; + final String libraryName; + final ExternalFunctionLanguage lang; + final URI location; + final boolean replaceIfExists; + final String authToken; + private static final long serialVersionUID = 1L; + + public CreateLibraryRequestMessage(String nodeRequestId, long requestMessageId, DataverseName dataverseName, + String libraryName, ExternalFunctionLanguage lang, URI location, boolean replaceIfExists, String authToken, + IRequestReference requestReference, Map<String, String> additionalParams) { + super(nodeRequestId, requestMessageId, requestReference, additionalParams); + this.dataverseName = dataverseName; + this.libraryName = libraryName; + this.lang = lang; + this.location = location; + this.replaceIfExists = replaceIfExists; + this.authToken = authToken; + } + + protected Statement produceStatement() { + return new CreateLibraryStatement(dataverseName, libraryName, lang, location, replaceIfExists, authToken); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DropLibraryRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DropLibraryRequestMessage.java new file mode 100644 index 0000000..e7e8931 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DropLibraryRequestMessage.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import java.util.Map; + +import org.apache.asterix.common.api.IRequestReference; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.lang.common.statement.LibraryDropStatement; + +public final class DropLibraryRequestMessage extends AbstractInternalRequestMessage { + + final DataverseName dataverseName; + final String libraryName; + final boolean ifExists; + private static final long serialVersionUID = 1L; + + public DropLibraryRequestMessage(String nodeRequestId, long requestMessageId, DataverseName dataverseName, + String libraryName, boolean ifExists, IRequestReference requestReference, + Map<String, String> additionalParams) { + super(nodeRequestId, requestMessageId, requestReference, additionalParams); + this.dataverseName = dataverseName; + this.libraryName = libraryName; + this.ifExists = ifExists; + } + + @Override + protected Statement produceStatement() { + return new LibraryDropStatement(dataverseName, libraryName, ifExists); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index a6ecc33..2552040 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -114,9 +114,9 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage ClusterControllerService ccSrv = (ClusterControllerService) ccSrvContext.getControllerService(); CCApplication ccApp = (CCApplication) ccSrv.getApplication(); CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker(); - final RuntimeDataException rejectionReason = getRejectionReason(ccSrv); + final RuntimeDataException rejectionReason = getRejectionReason(ccSrv, requestNodeId); if (rejectionReason != null) { - sendRejection(rejectionReason, messageBroker); + sendRejection(rejectionReason, messageBroker, requestMessageId, requestNodeId); return; } CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager(); @@ -176,7 +176,7 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage } } - private RuntimeDataException getRejectionReason(ClusterControllerService ccSrv) { + static RuntimeDataException getRejectionReason(ClusterControllerService ccSrv, String requestNodeId) { if (ccSrv.getNodeManager().getNodeControllerState(requestNodeId) == null) { return new RuntimeDataException(ErrorCode.REJECT_NODE_UNREGISTERED); } @@ -189,7 +189,8 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage return null; } - private void sendRejection(RuntimeDataException reason, CCMessageBroker messageBroker) { + static void sendRejection(RuntimeDataException reason, CCMessageBroker messageBroker, long requestMessageId, + String requestNodeId) { ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId); responseMsg.setError(reason); try { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/InternalRequestResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/InternalRequestResponse.java new file mode 100644 index 0000000..3083efc --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/InternalRequestResponse.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.messaging.NCMessageBroker; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public final class InternalRequestResponse implements INcAddressedMessage { + + private final long requestMessageId; + private Throwable error; + private static final long serialVersionUID = 1L; + + public InternalRequestResponse(long requestMessageId) { + this.requestMessageId = requestMessageId; + } + + public void setError(Throwable error) { + this.error = error; + } + + public Throwable getError() { + return error; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + MessageFuture future = mb.deregisterMessageFuture(requestMessageId); + if (future != null) { + future.complete(this); + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 3d5eb47..771c007 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -277,7 +277,8 @@ public class CCApplication extends BaseCCApplication { ccServiceCtx.getControllerService().getExecutor()); jsonAPIServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ccServiceCtx); jsonAPIServer.setAttribute(ServletConstants.CREDENTIAL_MAP, - parseCredentialMap(externalProperties.getCredentialFilePath())); + parseCredentialMap(((ClusterControllerService) (appCtx.getServiceContext().getControllerService())) + .getCCConfig().getCredentialFilePath())); // Other APIs. addServlet(jsonAPIServer, Servlets.QUERY_STATUS); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index e90976e..fd5ecb5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -22,15 +22,22 @@ import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNEC import static org.apache.asterix.common.utils.Servlets.QUERY_RESULT; import static org.apache.asterix.common.utils.Servlets.QUERY_SERVICE; import static org.apache.asterix.common.utils.Servlets.QUERY_STATUS; +import static org.apache.asterix.common.utils.Servlets.UDF; +import java.io.File; import java.io.IOException; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.asterix.algebra.base.ILangExtension; +import org.apache.asterix.api.http.server.BasicAuthServlet; import org.apache.asterix.api.http.server.NCQueryServiceServlet; +import org.apache.asterix.api.http.server.NCUdfApiServlet; import org.apache.asterix.api.http.server.NetDiagnosticsApiServlet; import org.apache.asterix.api.http.server.QueryResultApiServlet; import org.apache.asterix.api.http.server.QueryStatusApiServlet; @@ -73,6 +80,9 @@ import org.apache.asterix.transaction.management.resource.PersistentLocalResourc import org.apache.asterix.translator.Receptionist; import org.apache.asterix.util.MetadataBuiltinFunctions; import org.apache.asterix.utils.RedactionUtil; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.config.IConfigManager; @@ -204,6 +214,12 @@ public class NCApplication extends BaseNCApplication { ncExtensionManager.getCompilationProvider(ILangExtension.Language.SQLPP); apiServer.addServlet(new NCQueryServiceServlet(apiServer.ctx(), new String[] { QUERY_SERVICE }, getApplicationContext(), sqlppCompilationProvider.getLanguage(), sqlppCompilationProvider, null)); + apiServer.setAttribute(ServletConstants.CREDENTIAL_MAP, + parseCredentialMap(((NodeControllerService) ncServiceCtx.getControllerService()).getConfiguration() + .getCredentialFilePath())); + apiServer.addServlet(new BasicAuthServlet(apiServer.ctx(), + new NCUdfApiServlet(apiServer.ctx(), new String[] { UDF }, getApplicationContext(), + sqlppCompilationProvider, apiServer.getScheme(), apiServer.getAddress().getPort()))); apiServer.addServlet(new QueryStatusApiServlet(apiServer.ctx(), getApplicationContext(), QUERY_STATUS)); apiServer.addServlet(new QueryResultApiServlet(apiServer.ctx(), getApplicationContext(), QUERY_RESULT)); webManager.add(apiServer); @@ -340,4 +356,24 @@ public class NCApplication extends BaseNCApplication { protected void configurePersistedResourceRegistry() { ncServiceCtx.setPersistedResourceRegistry(new PersistedResourceRegistry()); } + + private Map<String, String> parseCredentialMap(String credPath) { + File credentialFile = new File(credPath); + Map<String, String> storedCredentials = new HashMap<>(); + if (credentialFile.exists()) { + try (CSVParser p = + CSVParser.parse(credentialFile, Charset.defaultCharset(), CSVFormat.DEFAULT.withDelimiter(':'))) { + List<CSVRecord> recs = p.getRecords(); + for (CSVRecord r : recs) { + if (r.size() != 2) { + throw new IOException("Passwd file must have exactly two fields."); + } + storedCredentials.put(r.get(0), r.get(1)); + } + } catch (IOException e) { + LOGGER.error("Malformed credential file", e); + } + } + return storedCredentials; + } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java index 2dc7326..c2b576a 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java @@ -20,7 +20,7 @@ package org.apache.asterix.app.external; import java.io.File; import java.io.IOException; -import java.net.URL; +import java.net.URI; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.commons.io.IOUtils; @@ -48,25 +48,15 @@ import org.apache.hyracks.algebricks.common.utils.Pair; public class ExternalUDFLibrarian implements IExternalUDFLibrarian { private HttpClient hc; - private String host; - private int port; - - private ExternalUDFLibrarian(String host, int port) { - hc = new DefaultHttpClient(); - this.host = host; - this.port = port; - } public ExternalUDFLibrarian() { - this("localhost", 19002); + hc = new DefaultHttpClient(); } @Override - public void install(String dataverse, String libName, String libPath, Pair<String, String> credentials) - throws Exception { - URL url = new URL("http", host, port, "/admin/udf/" + dataverse + "/" + libName); - HttpHost h = new HttpHost(host, port, "http"); - HttpPost post = new HttpPost(url.toString()); + public void install(URI path, String libPath, Pair<String, String> credentials) throws Exception { + HttpHost h = new HttpHost(path.getHost(), path.getPort(), "http"); + HttpPost post = new HttpPost(path); CredentialsProvider cp = new BasicCredentialsProvider(); cp.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(credentials.first, credentials.second)); HttpClientContext hcCtx = HttpClientContext.create(); @@ -86,18 +76,16 @@ public class ExternalUDFLibrarian implements IExternalUDFLibrarian { } @Override - public void uninstall(String dataverse, String libName, Pair<String, String> credentials) - throws IOException, AsterixException { - URL url = new URL("http", host, port, "/admin/udf/" + dataverse + "/" + libName); + public void uninstall(URI path, Pair<String, String> credentials) throws IOException, AsterixException { CredentialsProvider cp = new BasicCredentialsProvider(); cp.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(credentials.first, credentials.second)); HttpClientContext hcCtx = HttpClientContext.create(); hcCtx.setCredentialsProvider(cp); - HttpHost h = new HttpHost(host, port, "http"); + HttpHost h = new HttpHost(path.getHost(), path.getPort(), "http"); AuthCache ac = new BasicAuthCache(); ac.put(h, new BasicScheme()); hcCtx.setAuthCache(ac); - HttpDelete del = new HttpDelete(url.toString()); + HttpDelete del = new HttpDelete(path); HttpResponse response = hc.execute(del, hcCtx); String resp = null; int respCode = response.getStatusLine().getStatusCode(); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java index 1933f24..2315bec 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java @@ -19,13 +19,14 @@ package org.apache.asterix.app.external; import java.io.IOException; +import java.net.URI; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.hyracks.algebricks.common.utils.Pair; public interface IExternalUDFLibrarian { - void install(String dataverse, String libName, String libPath, Pair<String, String> credentials) throws Exception; - void uninstall(String dataverse, String libName, Pair<String, String> credentials) - throws IOException, AsterixException; + void install(URI path, String libPath, Pair<String, String> credentials) throws Exception; + + void uninstall(URI path, Pair<String, String> credentials) throws IOException, AsterixException; } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index f535f2c..e67b900 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -1209,13 +1209,15 @@ public class TestExecutor { throw new Exception("invalid library format"); } String libPath = command[5]; - librarian.install(dataverse, library, libPath, new Pair<>(username, pw)); + URI create = createEndpointURI("/admin/udf/" + dataverse + "/" + library); + librarian.install(create, libPath, new Pair<>(username, pw)); break; case "uninstall": if (command.length != 5) { throw new Exception("invalid library format"); } - librarian.uninstall(dataverse, library, new Pair<>(username, pw)); + URI delete = createEndpointURI("/admin/udf/" + dataverse + "/" + library); + librarian.uninstall(delete, new Pair<>(username, pw)); break; default: throw new Exception("invalid library format"); @@ -2229,7 +2231,8 @@ public class TestExecutor { protected URI createEndpointURI(String pathAndQuery) throws URISyntaxException { InetSocketAddress endpoint; - if (!ncEndPointsList.isEmpty() && pathAndQuery.equals(Servlets.QUERY_SERVICE)) { + if (!ncEndPointsList.isEmpty() && (pathAndQuery.equals(Servlets.QUERY_SERVICE) + || pathAndQuery.startsWith(Servlets.getAbsolutePath(Servlets.UDF)))) { int endpointIdx = Math.abs(endpointSelector++ % ncEndPointsList.size()); endpoint = ncEndPointsList.get(endpointIdx); } else if (isCcEndPointPath(pathAndQuery)) { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionIT.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionIT.java index 39f0948..91f7b05 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionIT.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionIT.java @@ -19,10 +19,16 @@ package org.apache.asterix.test.runtime; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.test.common.TestExecutor; import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.hyracks.control.nc.NodeControllerService; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -39,7 +45,9 @@ public class SqlppExecutionIT { @BeforeClass public static void setUp() throws Exception { - LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor()); + final TestExecutor testExecutor = new TestExecutor(); + LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor); + setNcEndpoints(testExecutor); } @AfterClass @@ -62,4 +70,17 @@ public class SqlppExecutionIT { public void test() throws Exception { LangExecutionUtil.test(tcCtx); } + + private static void setNcEndpoints(TestExecutor testExecutor) { + final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs; + final Map<String, InetSocketAddress> ncEndPoints = new HashMap<>(); + final String ip = InetAddress.getLoopbackAddress().getHostAddress(); + for (NodeControllerService nc : ncs) { + final String nodeId = nc.getId(); + final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext(); + int apiPort = appCtx.getExternalProperties().getNcApiPort(); + ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort)); + } + testExecutor.setNcEndPoints(ncEndPoints); + } } diff --git a/asterixdb/asterix-app/src/test/resources/cc.conf b/asterixdb/asterix-app/src/test/resources/cc.conf index 119b53a..51ee756 100644 --- a/asterixdb/asterix-app/src/test/resources/cc.conf +++ b/asterixdb/asterix-app/src/test/resources/cc.conf @@ -32,6 +32,7 @@ nc.api.port=19005 #jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007 [nc] +credential.file=src/test/resources/security/passwd address=127.0.0.1 command=asterixnc app.class=org.apache.asterix.hyracks.bootstrap.NCApplication diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java index 033b751..8f83b35 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java @@ -23,15 +23,10 @@ import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTE import static org.apache.hyracks.control.common.config.OptionTypes.STRING; import static org.apache.hyracks.control.common.config.OptionTypes.UNSIGNED_INTEGER; -import java.util.function.Function; - -import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.IOptionType; import org.apache.hyracks.api.config.Section; -import org.apache.hyracks.control.common.controllers.ControllerConfig; import org.apache.hyracks.util.StorageUtil; -import org.apache.hyracks.util.file.FileUtil; import org.apache.logging.log4j.Level; public class ExternalProperties extends AbstractProperties { @@ -54,12 +49,7 @@ public class ExternalProperties extends AbstractProperties { UNSIGNED_INTEGER, StorageUtil.getIntSizeInBytes(200, StorageUtil.StorageUnit.MEGABYTE), "The maximum accepted web request size in bytes"), - REQUESTS_ARCHIVE_SIZE(UNSIGNED_INTEGER, 50, "The maximum number of archived requests to maintain"), - CREDENTIAL_FILE( - STRING, - (Function<IApplicationConfig, String>) appConfig -> FileUtil - .joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "passwd"), - ControllerConfig.Option.DEFAULT_DIR.cmdline() + "/passwd"); + REQUESTS_ARCHIVE_SIZE(UNSIGNED_INTEGER, 50, "The maximum number of archived requests to maintain"); private final IOptionType type; private final Object defaultValue; @@ -79,7 +69,6 @@ public class ExternalProperties extends AbstractProperties { case API_PORT: case ACTIVE_PORT: case REQUESTS_ARCHIVE_SIZE: - case CREDENTIAL_FILE: return Section.CC; case NC_API_PORT: return Section.NC; @@ -159,7 +148,4 @@ public class ExternalProperties extends AbstractProperties { return accessor.getInt(Option.REQUESTS_ARCHIVE_SIZE); } - public String getCredentialFilePath() { - return accessor.getString(Option.CREDENTIAL_FILE); - } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java index bd84f11..b0ee497 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java @@ -37,6 +37,7 @@ import org.apache.hyracks.api.config.IOptionType; import org.apache.hyracks.api.config.Section; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.control.common.config.ConfigManager; +import org.apache.hyracks.control.common.config.OptionTypes; import org.apache.hyracks.util.file.FileUtil; import org.ini4j.Ini; @@ -78,7 +79,12 @@ public class CCConfig extends ControllerConfig { CONTROLLER_ID(SHORT, (short) 0x0000), KEY_STORE_PATH(STRING), TRUST_STORE_PATH(STRING), - KEY_STORE_PASSWORD(STRING); + KEY_STORE_PASSWORD(STRING), + CREDENTIAL_FILE( + OptionTypes.STRING, + (Function<IApplicationConfig, String>) appConfig -> FileUtil + .joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "passwd"), + ControllerConfig.Option.DEFAULT_DIR.cmdline() + "/passwd"); private final IOptionType parser; private Object defaultValue; @@ -198,6 +204,8 @@ public class CCConfig extends ControllerConfig { return "A fully-qualified path to a trust store file that will be used for secured connections"; case KEY_STORE_PASSWORD: return "The password to the provided key store"; + case CREDENTIAL_FILE: + return "Path to HTTP basic credentials"; default: throw new IllegalStateException("NYI: " + this); } @@ -465,7 +473,12 @@ public class CCConfig extends ControllerConfig { } public void setTrustStorePath(String trustStorePath) { + configManager.set(Option.TRUST_STORE_PATH, trustStorePath); } + public String getCredentialFilePath() { + return getAppConfig().getString(Option.CREDENTIAL_FILE); + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java index f7291cb..9a505d5 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java @@ -132,4 +132,5 @@ public class ControllerConfig implements Serializable { public boolean isSslEnabled() { return getAppConfig().getBoolean(Option.SSL_ENABLED); } -} + +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index eaf0418..71c33d3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -37,6 +37,7 @@ import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.IOptionType; import org.apache.hyracks.api.config.Section; import org.apache.hyracks.control.common.config.ConfigManager; +import org.apache.hyracks.control.common.config.OptionTypes; import org.apache.hyracks.util.file.FileUtil; public class NCConfig extends ControllerConfig { @@ -95,7 +96,12 @@ public class NCConfig extends ControllerConfig { PYTHON_CMD(STRING, (String) null), PYTHON_ADDITIONAL_PACKAGES(STRING_ARRAY, (String[]) null), PYTHON_USE_BUNDLED_MSGPACK(BOOLEAN, true), - PYTHON_ARGS(STRING_ARRAY, (String[]) null); + PYTHON_ARGS(STRING_ARRAY, (String[]) null), + CREDENTIAL_FILE( + OptionTypes.STRING, + (Function<IApplicationConfig, String>) appConfig -> FileUtil + .joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "passwd"), + ControllerConfig.Option.DEFAULT_DIR.cmdline() + "/passwd"); private final IOptionType parser; private final String defaultValueDescription; @@ -236,6 +242,8 @@ public class NCConfig extends ControllerConfig { return "True to include bundled msgpack on Python sys.path, false to use system-provided msgpack"; case PYTHON_ARGS: return "Python args to pass to Python interpreter"; + case CREDENTIAL_FILE: + return "Path to HTTP basic credentials"; default: throw new IllegalStateException("Not yet implemented: " + this); } @@ -606,4 +614,8 @@ public class NCConfig extends ControllerConfig { public int getIOQueueSize() { return appConfig.getInt(Option.IO_QUEUE_SIZE); } + + public String getCredentialFilePath() { + return getAppConfig().getString(Option.CREDENTIAL_FILE); + } }