This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 517a4bc [NO ISSUE] Remove UDF API servlet from CC 517a4bc is described below commit 517a4bcee3e5ec90da146c71d6c5371ab6d47df3 Author: Ian Maxon <ima...@apache.org> AuthorDate: Mon Oct 19 21:37:40 2020 -0700 [NO ISSUE] Remove UDF API servlet from CC Also refactor the woringDir to be given from the library manager Change-Id: I5f8bc2f0bc4b68f9d4b3ebafc3266e9584d8535b Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8483 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Ian Maxon <ima...@uci.edu> --- .../asterix/api/http/server/NCUdfApiServlet.java | 212 +++++++++++- .../asterix/api/http/server/UdfApiServlet.java | 363 --------------------- .../asterix/hyracks/bootstrap/CCApplication.java | 8 - .../asterix/common/library/ILibraryManager.java | 2 + .../external/library/ExternalLibraryManager.java | 9 + 5 files changed, 211 insertions(+), 383 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java index 6d7a847..7af7fd4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java @@ -18,12 +18,21 @@ */ package org.apache.asterix.api.http.server; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER; +import static org.apache.asterix.common.functions.ExternalFunctionLanguage.JAVA; +import static org.apache.asterix.common.functions.ExternalFunctionLanguage.PYTHON; + import java.io.IOException; import java.io.InputStream; +import java.io.PrintWriter; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -32,49 +41,102 @@ import org.apache.asterix.app.message.DropLibraryRequestMessage; import org.apache.asterix.app.message.InternalRequestResponse; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.api.IReceptionist; import org.apache.asterix.common.api.IRequestReference; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.functions.ExternalFunctionLanguage; +import org.apache.asterix.common.library.LibraryDescriptor; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.compiler.provider.ILangCompilationProvider; -import org.apache.asterix.external.library.ExternalLibraryManager; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.application.INCServiceContext; -import org.apache.hyracks.control.common.context.ServerContext; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.IFormattedException; import org.apache.hyracks.control.common.work.SynchronizableWork; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.AbstractServlet; import org.apache.hyracks.http.server.utils.HttpUtil; +import org.apache.hyracks.util.file.FileUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpScheme; +import io.netty.handler.codec.http.multipart.FileUpload; +import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; +import io.netty.handler.codec.http.multipart.InterfaceHttpData; -public class NCUdfApiServlet extends UdfApiServlet { +public class NCUdfApiServlet extends AbstractServlet { INcApplicationContext appCtx; INCServiceContext srvCtx; + protected final IApplicationContext plainAppCtx; + private final HttpScheme httpServerProtocol; + private final int httpServerPort; + + protected final ILangCompilationProvider compilationProvider; + protected final IReceptionist receptionist; + + protected Path workingDir; + protected String sysAuthHeader; + + private static final Logger LOGGER = LogManager.getLogger(); + public NCUdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx, ILangCompilationProvider compilationProvider, HttpScheme httpServerProtocol, int httpServerPort) { - super(ctx, paths, appCtx, compilationProvider, null, null, httpServerProtocol, httpServerPort); + + super(ctx, paths); + this.plainAppCtx = appCtx; + this.compilationProvider = compilationProvider; + this.receptionist = appCtx.getReceptionist(); + this.httpServerProtocol = httpServerProtocol; + this.httpServerPort = httpServerPort; } @Override public void init() throws IOException { appCtx = (INcApplicationContext) plainAppCtx; srvCtx = this.appCtx.getServiceContext(); - workingDir = Paths.get(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath()).normalize() - .resolve(Paths.get(ServerContext.APP_DIR_NAME, ExternalLibraryManager.LIBRARY_MANAGER_BASE_DIR_NAME, - "tmp")); + workingDir = Paths.get(appCtx.getLibraryManager().getDistributionDir().getAbsolutePath()).normalize(); initAuth(); initStorage(); } - @Override - protected void doCreate(DataverseName dataverseName, String libraryName, ExternalFunctionLanguage language, + protected void initAuth() { + sysAuthHeader = (String) ctx.get(SYS_AUTH_HEADER); + } + + protected void initStorage() throws IOException { + // prepare working directory + if (Files.isDirectory(workingDir)) { + try { + FileUtils.cleanDirectory(workingDir.toFile()); + } catch (IOException e) { + LOGGER.warn("Could not clean directory: " + workingDir, e); + } + } else { + Files.deleteIfExists(workingDir); + FileUtil.forceMkdirs(workingDir.toFile()); + } + } + + protected Map<String, String> additionalHttpHeadersFromRequest(IServletRequest request) { + return Collections.emptyMap(); + } + + private void doCreate(DataverseName dataverseName, String libraryName, ExternalFunctionLanguage language, URI downloadURI, boolean replaceIfExists, String sysAuthHeader, IRequestReference requestReference, IServletRequest request, IServletResponse response) throws Exception { INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker(); @@ -85,8 +147,7 @@ public class NCUdfApiServlet extends UdfApiServlet { sendMessage(req, responseFuture, requestReference, request, response); } - @Override - protected void doDrop(DataverseName dataverseName, String libraryName, boolean replaceIfExists, + private void doDrop(DataverseName dataverseName, String libraryName, boolean replaceIfExists, IRequestReference requestReference, IServletRequest request, IServletResponse response) throws Exception { INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker(); MessageFuture responseFuture = ncMb.registerMessageFuture(); @@ -140,7 +201,98 @@ public class NCUdfApiServlet extends UdfApiServlet { } @Override - protected void readFromFile(Path filePath, IServletResponse response) throws Exception { + protected void post(IServletRequest request, IServletResponse response) { + HttpRequest httpRequest = request.getHttpRequest(); + Pair<DataverseName, String> libraryName = parseLibraryName(request); + if (libraryName == null) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + Path libraryTempFile = null; + HttpPostRequestDecoder requestDecoder = new HttpPostRequestDecoder(httpRequest); + try { + if (!requestDecoder.hasNext() || requestDecoder.getBodyHttpDatas().size() != 1) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + InterfaceHttpData httpData = requestDecoder.getBodyHttpDatas().get(0); + if (!httpData.getHttpDataType().equals(InterfaceHttpData.HttpDataType.FileUpload)) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + FileUpload fileUpload = (FileUpload) httpData; + String fileExt = FilenameUtils.getExtension(fileUpload.getFilename()); + ExternalFunctionLanguage language = getLanguageByFileExtension(fileExt); + if (language == null) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + try { + IRequestReference requestReference = receptionist.welcome(request); + libraryTempFile = Files.createTempFile(workingDir, "lib_", '.' + fileExt); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Created temporary file " + libraryTempFile + " for library " + libraryName.first + "." + + libraryName.second); + } + fileUpload.renameTo(libraryTempFile.toFile()); + URI downloadURI = createDownloadURI(libraryTempFile); + doCreate(libraryName.first, libraryName.second, language, downloadURI, true, sysAuthHeader, + requestReference, request, response); + response.setStatus(HttpResponseStatus.OK); + } catch (Exception e) { + response.setStatus(toHttpErrorStatus(e)); + PrintWriter responseWriter = response.writer(); + responseWriter.write(e.getMessage()); + responseWriter.flush(); + LOGGER.error("Error creating/updating library " + libraryName.first + "." + libraryName.second, e); + } + } finally { + requestDecoder.destroy(); + if (libraryTempFile != null) { + try { + Files.deleteIfExists(libraryTempFile); + } catch (IOException e) { + LOGGER.warn("Could not delete temporary file " + libraryTempFile, e); + } + } + } + } + + private URI createDownloadURI(Path file) throws Exception { + String path = paths[0].substring(0, trims[0]) + '/' + file.getFileName(); + String host = getHyracksClientConnection().getHost(); + return new URI(httpServerProtocol.toString(), null, host, httpServerPort, path, null, null); + } + + private IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR + IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); + if (hcc == null) { + throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR); + } + return hcc; + } + + @Override + protected void delete(IServletRequest request, IServletResponse response) { + Pair<DataverseName, String> libraryName = parseLibraryName(request); + if (libraryName == null) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + try { + IRequestReference requestReference = receptionist.welcome(request); + doDrop(libraryName.first, libraryName.second, false, requestReference, request, response); + response.setStatus(HttpResponseStatus.OK); + } catch (Exception e) { + response.setStatus(toHttpErrorStatus(e)); + PrintWriter responseWriter = response.writer(); + responseWriter.write(e.getMessage()); + responseWriter.flush(); + LOGGER.error("Error deleting library " + libraryName.first + "." + libraryName.second, e); + } + } + + private void readFromFile(Path filePath, IServletResponse response) throws Exception { class InputStreamGetter extends SynchronizableWork { private InputStream is; @@ -165,4 +317,40 @@ public class NCUdfApiServlet extends UdfApiServlet { r.is.close(); } } + + private Pair<DataverseName, String> parseLibraryName(IServletRequest request) throws IllegalArgumentException { + String[] path = StringUtils.split(localPath(request), '/'); + int ln = path.length; + if (ln < 2) { + return null; + } + String libraryName = path[ln - 1]; + DataverseName dataverseName = DataverseName.create(Arrays.asList(path), 0, ln - 1); + return new Pair<>(dataverseName, libraryName); + } + + private static ExternalFunctionLanguage getLanguageByFileExtension(String fileExtension) { + switch (fileExtension) { + case LibraryDescriptor.FILE_EXT_ZIP: + return JAVA; + case LibraryDescriptor.FILE_EXT_PYZ: + return PYTHON; + default: + return null; + } + } + + private HttpResponseStatus toHttpErrorStatus(Exception e) { + if (e instanceof IFormattedException) { + IFormattedException fe = (IFormattedException) e; + if (ErrorCode.ASTERIX.equals(fe.getComponent())) { + switch (fe.getErrorCode()) { + case ErrorCode.UNKNOWN_DATAVERSE: + case ErrorCode.UNKNOWN_LIBRARY: + return HttpResponseStatus.NOT_FOUND; + } + } + } + return HttpResponseStatus.INTERNAL_SERVER_ERROR; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java deleted file mode 100644 index 360f5a0..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java +++ /dev/null @@ -1,363 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.api.http.server; - -import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; -import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER; -import static org.apache.asterix.common.functions.ExternalFunctionLanguage.JAVA; -import static org.apache.asterix.common.functions.ExternalFunctionLanguage.PYTHON; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintWriter; -import java.net.URI; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - -import org.apache.asterix.app.result.ResponsePrinter; -import org.apache.asterix.app.translator.RequestParameters; -import org.apache.asterix.common.api.IApplicationContext; -import org.apache.asterix.common.api.IClusterManagementWork; -import org.apache.asterix.common.api.IReceptionist; -import org.apache.asterix.common.api.IRequestReference; -import org.apache.asterix.common.context.IStorageComponentProvider; -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.common.functions.ExternalFunctionLanguage; -import org.apache.asterix.common.library.LibraryDescriptor; -import org.apache.asterix.common.metadata.DataverseName; -import org.apache.asterix.compiler.provider.ILangCompilationProvider; -import org.apache.asterix.external.library.ExternalLibraryManager; -import org.apache.asterix.lang.common.base.Statement; -import org.apache.asterix.lang.common.statement.CreateLibraryStatement; -import org.apache.asterix.lang.common.statement.LibraryDropStatement; -import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.translator.IRequestParameters; -import org.apache.asterix.translator.IStatementExecutor; -import org.apache.asterix.translator.IStatementExecutorFactory; -import org.apache.asterix.translator.ResultProperties; -import org.apache.asterix.translator.SessionConfig; -import org.apache.asterix.translator.SessionOutput; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.FilenameUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.io.output.NullWriter; -import org.apache.commons.lang3.StringUtils; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.api.application.ICCServiceContext; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.exceptions.IFormattedException; -import org.apache.hyracks.control.cc.ClusterControllerService; -import org.apache.hyracks.control.common.context.ServerContext; -import org.apache.hyracks.control.common.work.SynchronizableWork; -import org.apache.hyracks.http.api.IServletRequest; -import org.apache.hyracks.http.api.IServletResponse; -import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.hyracks.http.server.utils.HttpUtil; -import org.apache.hyracks.util.file.FileUtil; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpScheme; -import io.netty.handler.codec.http.multipart.FileUpload; -import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; -import io.netty.handler.codec.http.multipart.InterfaceHttpData; - -public class UdfApiServlet extends AbstractServlet { - - private static final Logger LOGGER = LogManager.getLogger(); - - protected final IApplicationContext plainAppCtx; - private ICcApplicationContext appCtx; - private ClusterControllerService ccs; - private final HttpScheme httpServerProtocol; - private final int httpServerPort; - - protected final ILangCompilationProvider compilationProvider; - protected final IStatementExecutorFactory statementExecutorFactory; - protected final IStorageComponentProvider componentProvider; - protected final IReceptionist receptionist; - protected Path workingDir; - protected String sysAuthHeader; - - public UdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx, - ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory, - IStorageComponentProvider componentProvider, HttpScheme httpServerProtocol, int httpServerPort) { - super(ctx, paths); - this.plainAppCtx = appCtx; - this.compilationProvider = compilationProvider; - this.statementExecutorFactory = statementExecutorFactory; - this.componentProvider = componentProvider; - this.receptionist = appCtx.getReceptionist(); - this.httpServerProtocol = httpServerProtocol; - this.httpServerPort = httpServerPort; - } - - @Override - public void init() throws IOException { - appCtx = (ICcApplicationContext) plainAppCtx; - ICCServiceContext srvCtx = this.appCtx.getServiceContext(); - this.ccs = (ClusterControllerService) srvCtx.getControllerService(); - File baseDir = srvCtx.getServerCtx().getBaseDir(); - this.workingDir = baseDir.getAbsoluteFile().toPath().normalize().resolve( - Paths.get(ServerContext.APP_DIR_NAME, ExternalLibraryManager.LIBRARY_MANAGER_BASE_DIR_NAME, "tmp")); - initAuth(); - initStorage(); - } - - protected void initAuth() { - sysAuthHeader = (String) ctx.get(SYS_AUTH_HEADER); - } - - protected void initStorage() throws IOException { - // prepare working directory - if (Files.isDirectory(workingDir)) { - try { - FileUtils.cleanDirectory(workingDir.toFile()); - } catch (IOException e) { - LOGGER.warn("Could not clean directory: " + workingDir, e); - } - } else { - Files.deleteIfExists(workingDir); - FileUtil.forceMkdirs(workingDir.toFile()); - } - } - - protected Map<String, String> additionalHttpHeadersFromRequest(IServletRequest request) { - return Collections.emptyMap(); - } - - @Override - protected void post(IServletRequest request, IServletResponse response) { - HttpRequest httpRequest = request.getHttpRequest(); - Pair<DataverseName, String> libraryName = parseLibraryName(request); - if (libraryName == null) { - response.setStatus(HttpResponseStatus.BAD_REQUEST); - return; - } - Path libraryTempFile = null; - HttpPostRequestDecoder requestDecoder = new HttpPostRequestDecoder(httpRequest); - try { - if (!requestDecoder.hasNext() || requestDecoder.getBodyHttpDatas().size() != 1) { - response.setStatus(HttpResponseStatus.BAD_REQUEST); - return; - } - InterfaceHttpData httpData = requestDecoder.getBodyHttpDatas().get(0); - if (!httpData.getHttpDataType().equals(InterfaceHttpData.HttpDataType.FileUpload)) { - response.setStatus(HttpResponseStatus.BAD_REQUEST); - return; - } - FileUpload fileUpload = (FileUpload) httpData; - String fileExt = FilenameUtils.getExtension(fileUpload.getFilename()); - ExternalFunctionLanguage language = getLanguageByFileExtension(fileExt); - if (language == null) { - response.setStatus(HttpResponseStatus.BAD_REQUEST); - return; - } - try { - IRequestReference requestReference = receptionist.welcome(request); - libraryTempFile = Files.createTempFile(workingDir, "lib_", '.' + fileExt); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Created temporary file " + libraryTempFile + " for library " + libraryName.first + "." - + libraryName.second); - } - fileUpload.renameTo(libraryTempFile.toFile()); - URI downloadURI = createDownloadURI(libraryTempFile); - doCreate(libraryName.first, libraryName.second, language, downloadURI, true, sysAuthHeader, - requestReference, request, response); - response.setStatus(HttpResponseStatus.OK); - } catch (Exception e) { - response.setStatus(toHttpErrorStatus(e)); - PrintWriter responseWriter = response.writer(); - responseWriter.write(e.getMessage()); - responseWriter.flush(); - LOGGER.error("Error creating/updating library " + libraryName.first + "." + libraryName.second, e); - } - } finally { - requestDecoder.destroy(); - if (libraryTempFile != null) { - try { - Files.deleteIfExists(libraryTempFile); - } catch (IOException e) { - LOGGER.warn("Could not delete temporary file " + libraryTempFile, e); - } - } - } - } - - protected void doCreate(DataverseName dataverseName, String libraryName, ExternalFunctionLanguage language, - URI downloadURI, boolean replaceIfExists, String sysAuthHeader, IRequestReference requestReference, - IServletRequest request, IServletResponse response) throws Exception { - CreateLibraryStatement stmt = new CreateLibraryStatement(dataverseName, libraryName, language, downloadURI, - replaceIfExists, sysAuthHeader); - executeStatement(stmt, requestReference, request, response); - } - - protected URI createDownloadURI(Path file) throws Exception { - String path = paths[0].substring(0, trims[0]) + '/' + file.getFileName(); - String host = getHyracksClientConnection().getHost(); - return new URI(httpServerProtocol.toString(), null, host, httpServerPort, path, null, null); - } - - protected void delete(IServletRequest request, IServletResponse response) { - Pair<DataverseName, String> libraryName = parseLibraryName(request); - if (libraryName == null) { - response.setStatus(HttpResponseStatus.BAD_REQUEST); - return; - } - try { - IRequestReference requestReference = receptionist.welcome(request); - doDrop(libraryName.first, libraryName.second, false, requestReference, request, response); - response.setStatus(HttpResponseStatus.OK); - } catch (Exception e) { - response.setStatus(toHttpErrorStatus(e)); - PrintWriter responseWriter = response.writer(); - responseWriter.write(e.getMessage()); - responseWriter.flush(); - LOGGER.error("Error deleting library " + libraryName.first + "." + libraryName.second, e); - } - } - - protected void doDrop(DataverseName dataverseName, String libraryName, boolean replaceIfExists, - IRequestReference requestReference, IServletRequest request, IServletResponse response) throws Exception { - LibraryDropStatement stmt = new LibraryDropStatement(dataverseName, libraryName, replaceIfExists); - executeStatement(stmt, requestReference, request, response); - } - - protected void executeStatement(Statement statement, IRequestReference requestReference, IServletRequest request, - IServletResponse response) throws Exception { - IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState(); - if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { - response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE); - return; - } - SessionOutput sessionOutput = new SessionOutput(new SessionConfig(SessionConfig.OutputFormat.ADM), - new PrintWriter(NullWriter.NULL_WRITER)); - ResponsePrinter printer = new ResponsePrinter(sessionOutput); - ResultProperties resultProperties = new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE, 1); - IRequestParameters requestParams = new RequestParameters(requestReference, "", null, resultProperties, - new IStatementExecutor.Stats(), new IStatementExecutor.StatementProperties(), null, null, - additionalHttpHeadersFromRequest(request), Collections.emptyMap(), false); - MetadataManager.INSTANCE.init(); - IStatementExecutor translator = statementExecutorFactory.create(appCtx, Collections.singletonList(statement), - sessionOutput, compilationProvider, componentProvider, printer); - translator.compileAndExecute(getHyracksClientConnection(), requestParams); - } - - protected void get(IServletRequest request, IServletResponse response) throws Exception { - IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState(); - if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { - response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE); - return; - } - String localPath = localPath(request); - while (localPath.startsWith("/")) { - localPath = localPath.substring(1); - } - if (localPath.isEmpty()) { - response.setStatus(HttpResponseStatus.BAD_REQUEST); - return; - } - Path filePath = workingDir.resolve(localPath).normalize(); - if (!filePath.startsWith(workingDir)) { - response.setStatus(HttpResponseStatus.BAD_REQUEST); - return; - } - readFromFile(filePath, response); - } - - protected IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR - IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); - if (hcc == null) { - throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR); - } - return hcc; - } - - protected Pair<DataverseName, String> parseLibraryName(IServletRequest request) throws IllegalArgumentException { - String[] path = StringUtils.split(localPath(request), '/'); - int ln = path.length; - if (ln < 2) { - return null; - } - String libraryName = path[ln - 1]; - DataverseName dataverseName = DataverseName.create(Arrays.asList(path), 0, ln - 1); - return new Pair<>(dataverseName, libraryName); - } - - protected static ExternalFunctionLanguage getLanguageByFileExtension(String fileExtension) { - switch (fileExtension) { - case LibraryDescriptor.FILE_EXT_ZIP: - return JAVA; - case LibraryDescriptor.FILE_EXT_PYZ: - return PYTHON; - default: - return null; - } - } - - protected HttpResponseStatus toHttpErrorStatus(Exception e) { - if (e instanceof IFormattedException) { - IFormattedException fe = (IFormattedException) e; - if (ErrorCode.ASTERIX.equals(fe.getComponent())) { - switch (fe.getErrorCode()) { - case ErrorCode.UNKNOWN_DATAVERSE: - case ErrorCode.UNKNOWN_LIBRARY: - return HttpResponseStatus.NOT_FOUND; - } - } - } - return HttpResponseStatus.INTERNAL_SERVER_ERROR; - } - - protected void readFromFile(Path filePath, IServletResponse response) throws Exception { - class InputStreamGetter extends SynchronizableWork { - private InputStream is; - - @Override - protected void doRun() throws Exception { - is = Files.newInputStream(filePath); - } - } - - InputStreamGetter r = new InputStreamGetter(); - ccs.getWorkQueue().scheduleAndSync(r); - - if (r.is == null) { - response.setStatus(HttpResponseStatus.NOT_FOUND); - return; - } - try { - response.setStatus(HttpResponseStatus.OK); - HttpUtil.setContentType(response, "application/octet-stream"); - IOUtils.copyLarge(r.is, response.outputStream()); - } finally { - r.is.close(); - } - } -} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 771c007..8f3deb1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -38,7 +38,6 @@ import java.util.concurrent.ConcurrentMap; import org.apache.asterix.api.http.IQueryWebServerRegistrant; import org.apache.asterix.api.http.server.ActiveStatsApiServlet; import org.apache.asterix.api.http.server.ApiServlet; -import org.apache.asterix.api.http.server.BasicAuthServlet; import org.apache.asterix.api.http.server.CcQueryCancellationServlet; import org.apache.asterix.api.http.server.ClusterApiServlet; import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet; @@ -51,7 +50,6 @@ import org.apache.asterix.api.http.server.QueryStatusApiServlet; import org.apache.asterix.api.http.server.RebalanceApiServlet; import org.apache.asterix.api.http.server.ServletConstants; import org.apache.asterix.api.http.server.ShutdownApiServlet; -import org.apache.asterix.api.http.server.UdfApiServlet; import org.apache.asterix.api.http.server.VersionApiServlet; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.cc.CCExtensionManager; @@ -294,7 +292,6 @@ public class CCApplication extends BaseCCApplication { addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_CC_DETAIL); // must not precede add of CLUSTER_STATE addServlet(jsonAPIServer, Servlets.DIAGNOSTICS); addServlet(jsonAPIServer, Servlets.ACTIVE_STATS); - addServlet(jsonAPIServer, Servlets.UDF); return jsonAPIServer; } @@ -345,11 +342,6 @@ public class CCApplication extends BaseCCApplication { return new DiagnosticsApiServlet(appCtx, ctx, paths); case Servlets.ACTIVE_STATS: return new ActiveStatsApiServlet(appCtx, ctx, paths); - case Servlets.UDF: - return new BasicAuthServlet(ctx, - new UdfApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP), - getStatementExecutorFactory(), componentProvider, server.getScheme(), - server.getAddress().getPort())); default: throw new IllegalStateException(key); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java index 047018b..6d1c059 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java @@ -35,6 +35,8 @@ public interface ILibraryManager { FileReference getLibraryDir(DataverseName dataverseName, String libraryName) throws HyracksDataException; + FileReference getDistributionDir(); + void dropLibraryPath(FileReference fileRef) throws HyracksDataException; byte[] serializeLibraryDescriptor(LibraryDescriptor libraryDescriptor) throws HyracksDataException; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java index d7a446b..2ae8612 100755 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java @@ -75,6 +75,8 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle public static final String DESCRIPTOR_FILE_NAME = "lib.json"; + public static final String DISTRIBUTION_DIR = "dist"; + private static final Logger LOGGER = LogManager.getLogger(ExternalLibraryManager.class); private final NodeControllerService ncs; @@ -84,6 +86,7 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle private final FileReference storageDir; private final Path storageDirPath; private final FileReference trashDir; + private final FileReference distDir; private final Path trashDirPath; private final Map<Pair<DataverseName, String>, ILibrary> libraries = new HashMap<>(); private IPCSystem pythonIPC; @@ -96,6 +99,7 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle storageDir = baseDir.getChild(STORAGE_DIR_NAME); storageDirPath = storageDir.getFile().toPath(); trashDir = baseDir.getChild(TRASH_DIR_NAME); + distDir = baseDir.getChild(DISTRIBUTION_DIR); trashDirPath = trashDir.getFile().toPath().normalize(); objectMapper = createObjectMapper(); router = new ExternalFunctionResultRouter(); @@ -177,6 +181,11 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle } @Override + public FileReference getDistributionDir() { + return distDir; + } + + @Override public ILibrary getLibrary(DataverseName dataverseName, String libraryName) throws HyracksDataException { Pair<DataverseName, String> key = getKey(dataverseName, libraryName); synchronized (this) {