This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 580b81a [ASTERIXDB-2597] Load UDFs via HTTP 580b81a is described below commit 580b81aa5e8888b8e1b0620521a1c9680e54df73 Author: Ian Maxon <ima...@apache.org> AuthorDate: Mon Jul 1 13:16:43 2019 -0700 [ASTERIXDB-2597] Load UDFs via HTTP - POST existing UDF format to /admin/udf/$DATAVERSE/$LIBNAME - DELETE against that URL to remove UDFs Change-Id: I6be9fef54c010bdb32f5c78af9b973f9843f442f Reviewed-on: https://asterix-gerrit.ics.uci.edu/3386 Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Ian Maxon <ima...@uci.edu> --- .../asterix/api/http/server/UdfApiServlet.java | 161 +++++++++++++++++++++ .../asterix/app/external/ExternalLibraryUtils.java | 81 ++++++----- .../asterix/app/external/ExternalUDFLibrarian.java | 122 +++++----------- .../app/external/IExternalUDFLibrarian.java | 7 +- .../asterix/app/message/DeleteUdfMessage.java | 55 +++++++ .../apache/asterix/app/message/LoadUdfMessage.java | 67 +++++++++ .../asterix/app/message/UdfResponseMessage.java | 70 +++++++++ .../app/nc/task/ExternalLibrarySetupTask.java | 3 +- .../asterix/hyracks/bootstrap/CCApplication.java | 6 +- .../api/common/AsterixHyracksIntegrationUtil.java | 3 +- .../asterix/app/bootstrap/TestNodeController.java | 4 - .../apache/asterix/test/common/TestExecutor.java | 2 +- .../asterix/test/runtime/LangExecutionUtil.java | 11 +- .../type_validation/type_validation.0.ddl.sqlpp} | 15 +- .../asterix/common/library/ILibraryManager.java | 5 +- .../org/apache/asterix/common/utils/Servlets.java | 1 + .../external/library/ExternalLibraryManager.java | 17 ++- .../hyracks/api/application/IServerContext.java} | 22 +-- .../hyracks/api/application/IServiceContext.java | 2 + .../client/HyracksClientInterfaceFunctions.java | 8 +- .../api/client/IHyracksClientConnection.java | 18 ++- .../api/client/IHyracksClientInterface.java | 3 +- .../hyracks/control/cc/ClientInterfaceIPCI.java | 2 +- .../control/cc/work/CliDeployBinaryWork.java | 8 +- .../control/common/application/ServiceContext.java | 11 +- .../control/common/base/INodeController.java | 2 +- .../control/common/context/ServerContext.java | 14 +- .../ClassLoaderJobSerializerDeserializer.java | 2 +- .../control/common/deployment/DeploymentUtils.java | 52 ++++++- .../hyracks/control/common/ipc/CCNCFunctions.java | 9 +- .../common/ipc/NodeControllerRemoteProxy.java | 5 +- .../hyracks/control/nc/NodeControllerIPCI.java | 4 +- .../hyracks/control/nc/NodeControllerService.java | 2 +- .../apache/hyracks/control/nc/io/IOManager.java | 4 + .../hyracks/control/nc/work/DeployBinaryWork.java | 7 +- .../impl/HyracksClientInterfaceRemoteProxy.java | 6 +- .../apache/hyracks/ipc/impl/HyracksConnection.java | 24 +-- .../hyracks/test/support/TestNCServiceContext.java | 6 + 38 files changed, 613 insertions(+), 228 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java new file mode 100644 index 0000000..d293ee8 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +import java.io.File; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentMap; + +import org.apache.asterix.app.external.ExternalLibraryUtils; +import org.apache.asterix.app.message.DeleteUdfMessage; +import org.apache.asterix.app.message.LoadUdfMessage; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.AbstractServlet; +import org.apache.hyracks.util.file.FileUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.QueryStringDecoder; + +public class UdfApiServlet extends AbstractServlet { + + private static final Logger LOGGER = LogManager.getLogger(); + private final ICcApplicationContext appCtx; + private final ICCMessageBroker broker; + public static final String UDF_TMP_DIR_PREFIX = "udf_temp"; + public static final int UDF_RESPONSE_TIMEOUT = 5000; + + public UdfApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String... paths) { + super(ctx, paths); + this.appCtx = appCtx; + this.broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + } + + private String[] getResource(FullHttpRequest req) throws IllegalArgumentException { + String[] path = new QueryStringDecoder(req.uri()).path().split("/"); + if (path.length != 5) { + throw new IllegalArgumentException("Invalid resource."); + } + String resourceName = path[path.length - 1]; + String dataverseName = path[path.length - 2]; + return new String[] { resourceName, dataverseName }; + } + + @Override + protected void post(IServletRequest request, IServletResponse response) { + FullHttpRequest req = request.getHttpRequest(); + String[] resourceNames; + try { + resourceNames = getResource(req); + } catch (IllegalArgumentException e) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + String resourceName = resourceNames[0]; + String dataverse = resourceNames[1]; + File udf = null; + try { + File workingDir = new File(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(), + UDF_TMP_DIR_PREFIX); + if (!workingDir.exists()) { + FileUtil.forceMkdirs(workingDir); + } + udf = File.createTempFile(resourceName, ".zip", workingDir); + try (RandomAccessFile raf = new RandomAccessFile(udf, "rw")) { + ByteBuf reqContent = req.content(); + raf.setLength(reqContent.readableBytes()); + FileChannel fc = raf.getChannel(); + ByteBuffer content = reqContent.nioBuffer(); + while (content.hasRemaining()) { + fc.write(content); + } + } + IHyracksClientConnection hcc = appCtx.getHcc(); + DeploymentId udfName = new DeploymentId(dataverse + "." + resourceName); + ClassLoader cl = appCtx.getLibraryManager().getLibraryClassLoader(dataverse, resourceName); + if (cl != null) { + deleteUdf(dataverse, resourceName); + } + hcc.deployBinary(udfName, Arrays.asList(udf.toString()), true); + ExternalLibraryUtils.setUpExternaLibrary(appCtx.getLibraryManager(), false, + FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(), + "applications", udfName.toString())); + + long reqId = broker.newRequestId(); + List<INcAddressedMessage> requests = new ArrayList<>(); + List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes()); + ncs.forEach(s -> requests.add(new LoadUdfMessage(dataverse, resourceName, reqId))); + broker.sendSyncRequestToNCs(reqId, ncs, requests, UDF_RESPONSE_TIMEOUT); + } catch (Exception e) { + response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + LOGGER.error(e); + return; + } finally { + if (udf != null) { + udf.delete(); + } + } + response.setStatus(HttpResponseStatus.OK); + + } + + private void deleteUdf(String dataverse, String resourceName) throws Exception { + DeleteUdfMessage msg = new DeleteUdfMessage(dataverse, resourceName); + for (String nc : appCtx.getClusterStateManager().getParticipantNodes()) { + broker.sendApplicationMessageToNC(msg, nc); + } + appCtx.getLibraryManager().deregisterLibraryClassLoader(dataverse, resourceName); + appCtx.getHcc().unDeployBinary(new DeploymentId(resourceName)); + } + + @Override + protected void delete(IServletRequest request, IServletResponse response) { + String[] resourceNames; + try { + resourceNames = getResource(request.getHttpRequest()); + } catch (IllegalArgumentException e) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + String resourceName = resourceNames[0]; + String dataverse = resourceNames[1]; + try { + deleteUdf(dataverse, resourceName); + } catch (Exception e) { + response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + LOGGER.error(e); + return; + } + response.setStatus(HttpResponseStatus.OK); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java index 0eac212..a989941 100755 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.FilenameFilter; import java.io.IOException; import java.net.URL; +import java.net.URLClassLoader; import java.rmi.RemoteException; import java.util.ArrayList; import java.util.HashMap; @@ -60,8 +61,8 @@ public class ExternalLibraryUtils { private ExternalLibraryUtils() { } - public static void setUpExternaLibraries(ILibraryManager externalLibraryManager, boolean isMetadataNode) - throws Exception { + public static void setUpExternaLibrary(ILibraryManager externalLibraryManager, boolean isMetadataNode, + String libraryPath) throws Exception { // start by un-installing removed libraries (Metadata Node only) Map<String, List<String>> uninstalledLibs = null; if (isMetadataNode) { @@ -69,18 +70,30 @@ public class ExternalLibraryUtils { } // get the directory of the to be installed libraries - File installLibDir = getLibraryInstallDir(); + String[] pathSplit = libraryPath.split("\\."); + String[] dvSplit = pathSplit[pathSplit.length - 2].split("/"); + String dataverse = dvSplit[dvSplit.length - 1]; + String name = pathSplit[pathSplit.length - 1].trim(); + File installLibDir = new File(libraryPath); + // directory exists? if (installLibDir.exists()) { - // get the list of files in the directory - for (File dataverseDir : installLibDir.listFiles(File::isDirectory)) { - for (File libraryDir : dataverseDir.listFiles(File::isDirectory)) { - // For each file (library), register classloader and configure its parameter. - // If current node is Metadata Node, add the library to metadata. - registerClassLoader(externalLibraryManager, dataverseDir.getName(), libraryDir.getName()); - configureLibrary(externalLibraryManager, dataverseDir.getName(), libraryDir, uninstalledLibs, - isMetadataNode); - } + registerClassLoader(externalLibraryManager, dataverse, name, libraryPath); + configureLibrary(externalLibraryManager, dataverse, name, installLibDir, uninstalledLibs, isMetadataNode); + } + } + + public static void setUpInstalledLibraries(ILibraryManager externalLibraryManager, boolean isMetadataNode, + File appDir) throws Exception { + File[] libs = appDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return dir.isDirectory(); + } + }); + if (libs != null) { + for (File lib : libs) { + setUpExternaLibrary(externalLibraryManager, isMetadataNode, lib.getAbsolutePath()); } } } @@ -134,7 +147,7 @@ public class ExternalLibraryUtils { * @throws RemoteException * @throws ACIDException */ - protected static boolean uninstallLibrary(String dataverse, String libraryName) + public static boolean uninstallLibrary(String dataverse, String libraryName) throws AsterixException, RemoteException, ACIDException { MetadataTransactionContext mdTxnCtx = null; try { @@ -270,10 +283,9 @@ public class ExternalLibraryUtils { * failure in installing an element does not effect installation of other * libraries. */ - protected static void configureLibrary(ILibraryManager libraryManager, String dataverse, final File libraryDir, - Map<String, List<String>> uninstalledLibs, boolean isMetadataNode) throws Exception { + protected static void configureLibrary(ILibraryManager libraryManager, String dataverse, String libraryName, + final File libraryDir, Map<String, List<String>> uninstalledLibs, boolean isMetadataNode) throws Exception { - String libraryName = libraryDir.getName().trim(); String[] libraryDescriptors = libraryDir.list((dir, name) -> name.endsWith(".xml")); if (libraryDescriptors == null) { @@ -303,15 +315,15 @@ public class ExternalLibraryUtils { * register the library class loader with the external library manager * * @param dataverse - * @param libraryName + * @param libraryPath * @throws Exception */ - protected static void registerClassLoader(ILibraryManager externalLibraryManager, String dataverse, - String libraryName) throws Exception { + protected static void registerClassLoader(ILibraryManager externalLibraryManager, String dataverse, String name, + String libraryPath) throws Exception { // get the class loader - ClassLoader classLoader = getLibraryClassLoader(dataverse, libraryName); + URLClassLoader classLoader = getLibraryClassLoader(dataverse, name, libraryPath); // register it with the external library manager - externalLibraryManager.registerLibraryClassLoader(dataverse, libraryName, classLoader); + externalLibraryManager.registerLibraryClassLoader(dataverse, name, classLoader); } /** @@ -331,22 +343,23 @@ public class ExternalLibraryUtils { /** * Get the class loader for the library * + * @param libraryPath * @param dataverse - * @param libraryName * @return * @throws Exception */ - private static ClassLoader getLibraryClassLoader(String dataverse, String libraryName) throws Exception { + private static URLClassLoader getLibraryClassLoader(String dataverse, String name, String libraryPath) + throws Exception { // Get a reference to the library directory - File installDir = getLibraryInstallDir(); + File installDir = new File(libraryPath); if (LOGGER.isInfoEnabled()) { - LOGGER.info("Installing lirbary " + libraryName + " in dataverse " + dataverse + "." - + " Install Directory: " + installDir.getAbsolutePath()); + LOGGER.info("Installing lirbary " + name + " in dataverse " + dataverse + "." + " Install Directory: " + + installDir.getAbsolutePath()); } // get a reference to the specific library dir - File libDir = - new File(installDir.getAbsolutePath() + File.separator + dataverse + File.separator + libraryName); + File libDir = installDir; + FilenameFilter jarFileFilter = new FilenameFilter() { @Override public boolean accept(File dir, String name) { @@ -388,9 +401,9 @@ public class ExternalLibraryUtils { } if (LOGGER.isInfoEnabled()) { - StringBuilder logMesg = new StringBuilder("Classpath for library " + libraryName + "\n"); + StringBuilder logMesg = new StringBuilder("Classpath for library " + dataverse + ": "); for (URL url : urls) { - logMesg.append(url.getFile() + "\n"); + logMesg.append(url.getFile() + File.pathSeparatorChar); } LOGGER.info(logMesg.toString()); } @@ -400,14 +413,6 @@ public class ExternalLibraryUtils { } /** - * @return the directory "System.getProperty("app.home", System.getProperty("user.home")/lib/udfs" - */ - protected static File getLibraryInstallDir() { - return new File(System.getProperty("app.home", System.getProperty("user.home")) + File.separator + "lib" - + File.separator + "udfs"); - } - - /** * @return the directory "System.getProperty("app.home", System.getProperty("user.home")/lib/udfs/uninstall" */ protected static File getLibraryUninstallDir() { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java index 7246925..86588ea 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java @@ -19,112 +19,58 @@ package org.apache.asterix.app.external; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.rmi.RemoteException; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.List; -import java.util.zip.ZipEntry; -import java.util.zip.ZipFile; +import java.net.URL; -import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.library.ILibraryManager; -import org.apache.commons.compress.utils.IOUtils; -import org.apache.commons.io.FileUtils; -import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.http.HttpResponse; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.FileEntity; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.hyracks.api.exceptions.HyracksException; @SuppressWarnings("squid:S134") public class ExternalUDFLibrarian implements IExternalUDFLibrarian { - // The following list includes a library manager for the CC - // and library managers for NCs (one-per-NC). - private final List<ILibraryManager> libraryManagers; + private HttpClient hc; + private String host; + private int port; - public ExternalUDFLibrarian(List<ILibraryManager> libraryManagers) { - this.libraryManagers = libraryManagers; + public ExternalUDFLibrarian(String host, int port) { + hc = new DefaultHttpClient(); + this.host = host; + this.port = port; } - public static void removeLibraryDir() throws IOException { - File installLibDir = ExternalLibraryUtils.getLibraryInstallDir(); - FileUtils.deleteQuietly(installLibDir); - } - - public static void unzip(String sourceFile, String outputDir) throws IOException { - if (System.getProperty("os.name").toLowerCase().startsWith("win")) { - try (ZipFile zipFile = new ZipFile(sourceFile)) { - Enumeration<? extends ZipEntry> entries = zipFile.entries(); - while (entries.hasMoreElements()) { - ZipEntry entry = entries.nextElement(); - File entryDestination = new File(outputDir, entry.getName()); - if (!entry.isDirectory()) { - entryDestination.getParentFile().mkdirs(); - try (InputStream in = zipFile.getInputStream(entry); - OutputStream out = new FileOutputStream(entryDestination)) { - IOUtils.copy(in, out); - } - } - } - } - } else { - Process process = new ProcessBuilder("unzip", "-d", outputDir, sourceFile).start(); - try { - process.waitFor(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - } + public ExternalUDFLibrarian() { + this("localhost", 19002); } @Override - public void install(String dvName, String libName, String libPath) throws Exception { - // get the directory of the to be installed libraries - File installLibDir = ExternalLibraryUtils.getLibraryInstallDir(); - // directory exists? - if (!installLibDir.exists()) { - installLibDir.mkdir(); - } - // copy the library file into the directory - File destinationDir = - new File(installLibDir.getAbsolutePath() + File.separator + dvName + File.separator + libName); - FileUtils.deleteQuietly(destinationDir); - destinationDir.mkdirs(); - try { - unzip(libPath, destinationDir.getAbsolutePath()); - } catch (Exception e) { - - throw new Exception("Couldn't unzip the file: " + libPath, e); - } - - for (ILibraryManager libraryManager : libraryManagers) { - ExternalLibraryUtils.registerClassLoader(libraryManager, dvName, libName); - ExternalLibraryUtils.configureLibrary(libraryManager, dvName, destinationDir, new HashMap<>(), - libraryManagers.indexOf(libraryManager) != 0); + public void install(String dataverse, String libName, String libPath) throws Exception { + URL url = new URL("http", host, port, "/admin/udf/" + dataverse + "/" + libName); + HttpPost post = new HttpPost(url.toString()); + post.setEntity(new FileEntity(new File(libPath), "application/octet-stream")); + HttpResponse response = hc.execute(post); + response.getEntity().consumeContent(); + if (response.getStatusLine().getStatusCode() != 200) { + throw new HyracksException(response.getStatusLine().toString()); } } @Override - public void uninstall(String dvName, String libName) throws RemoteException, AsterixException, ACIDException { - ExternalLibraryUtils.uninstallLibrary(dvName, libName); - for (ILibraryManager libraryManager : libraryManagers) { - libraryManager.deregisterLibraryClassLoader(dvName, libName); + public void uninstall(String dataverse, String libName) + throws IOException, ClientProtocolException, AsterixException { + URL url = new URL("http", host, port, "/admin/udf/" + dataverse + "/" + libName); + HttpDelete del = new HttpDelete(url.toString()); + HttpResponse response = hc.execute(del); + response.getEntity().consumeContent(); + if (response.getStatusLine().getStatusCode() != 200) { + throw new AsterixException(response.getStatusLine().toString()); } } - public void cleanup() throws AsterixException, RemoteException, ACIDException { - for (ILibraryManager libraryManager : libraryManagers) { - List<Pair<String, String>> libs = libraryManager.getAllLibraries(); - for (Pair<String, String> dvAndLib : libs) { - ExternalLibraryUtils.uninstallLibrary(dvAndLib.first, dvAndLib.second); - libraryManager.deregisterLibraryClassLoader(dvAndLib.first, dvAndLib.second); - } - } - // get the directory of the to be installed libraries - File installLibDir = ExternalLibraryUtils.getLibraryInstallDir(); - FileUtils.deleteQuietly(installLibDir); - } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java index 9a17444..a7a668a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java @@ -19,13 +19,12 @@ package org.apache.asterix.app.external; import java.io.IOException; -import java.rmi.RemoteException; -import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.http.client.ClientProtocolException; public interface IExternalUDFLibrarian { - public void install(String dvName, String libName, String libPath) throws IOException, Exception; + void install(String dataverse, String libName, String libPath) throws Exception; - public void uninstall(String dvName, String libName) throws RemoteException, AsterixException, ACIDException; + void uninstall(String dataverse, String libName) throws IOException, ClientProtocolException, AsterixException; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java new file mode 100644 index 0000000..ca7b9db --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.app.external.ExternalLibraryUtils; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class DeleteUdfMessage implements INcAddressedMessage { + + private static final long serialVersionUID = -3129473321451281271L; + private final String dataverseName; + private final String libraryName; + private static final Logger LOGGER = LogManager.getLogger(); + + public DeleteUdfMessage(String dataverseName, String libraryName) { + this.dataverseName = dataverseName; + this.libraryName = libraryName; + } + + @Override + public void handle(INcApplicationContext appCtx) { + ILibraryManager mgr = appCtx.getLibraryManager(); + String mdNodeName = appCtx.getMetadataProperties().getMetadataNodeName(); + String nodeName = appCtx.getServiceContext().getNodeId(); + boolean isMdNode = mdNodeName.equals(nodeName); + try { + if (isMdNode) { + ExternalLibraryUtils.uninstallLibrary(dataverseName, libraryName); + } + mgr.deregisterLibraryClassLoader(dataverseName, libraryName); + } catch (Exception e) { + LOGGER.error("Unable to un-deploy UDF", e); + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java new file mode 100644 index 0000000..66714bf --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.app.external.ExternalLibraryUtils; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.common.messaging.CcIdentifiedMessage; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.hyracks.util.file.FileUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class LoadUdfMessage extends CcIdentifiedMessage implements INcAddressedMessage { + + private final String dataverseName; + private final String libraryName; + private static final Logger LOGGER = LogManager.getLogger(); + + private static final long serialVersionUID = -4529473341458281271L; + private final long reqId; + + public LoadUdfMessage(String dataverseName, String libraryName, long reqId) { + this.dataverseName = dataverseName; + this.libraryName = libraryName; + this.reqId = reqId; + } + + @Override + public void handle(INcApplicationContext appCtx) { + ILibraryManager mgr = appCtx.getLibraryManager(); + String mdNodeName = appCtx.getMetadataProperties().getMetadataNodeName(); + String nodeName = appCtx.getServiceContext().getNodeId(); + INCMessageBroker broker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + boolean isMdNode = mdNodeName.equals(nodeName); + try { + ExternalLibraryUtils.setUpExternaLibrary(mgr, isMdNode, + FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(), + "applications", dataverseName + "." + libraryName)); + broker.sendMessageToPrimaryCC(new UdfResponseMessage(reqId, null)); + } catch (Exception e) { + try { + broker.sendMessageToPrimaryCC(new UdfResponseMessage(reqId, e)); + } catch (Exception f) { + LOGGER.error("Unable to send failure response to CC", f); + } + } + + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UdfResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UdfResponseMessage.java new file mode 100644 index 0000000..29a91c5 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UdfResponseMessage.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import java.util.ArrayList; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.common.messaging.api.INcResponse; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class UdfResponseMessage implements ICcAddressedMessage, INcResponse { + + private static final long serialVersionUID = -4520773141458281271L; + private final long reqId; + private final Exception failure; + + public UdfResponseMessage(long reqId, Exception failure) { + this.reqId = reqId; + this.failure = failure; + } + + @SuppressWarnings("unchecked") + @Override + public void setResult(MutablePair<ICCMessageBroker.ResponseState, Object> result) { + ICCMessageBroker.ResponseState responseState = result.getLeft(); + if (failure != null) { + result.setLeft(ICCMessageBroker.ResponseState.FAILURE); + result.setRight(failure); + return; + } + switch (responseState) { + case UNINITIALIZED: + // First to arrive + result.setRight(new ArrayList<String>()); + // No failure, change state to success + result.setLeft(ICCMessageBroker.ResponseState.SUCCESS); + // Fallthrough + case SUCCESS: + break; + default: + break; + + } + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + broker.respond(reqId, this); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java index 8cfeb12..1ca2b78 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java @@ -38,7 +38,8 @@ public class ExternalLibrarySetupTask implements INCLifecycleTask { public void perform(CcId ccId, IControllerService cs) throws HyracksDataException { INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); try { - ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), metadataNode); + ExternalLibraryUtils.setUpInstalledLibraries(appContext.getLibraryManager(), metadataNode, + cs.getContext().getServerCtx().getAppDir()); } catch (Exception e) { throw HyracksDataException.create(e); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 5998599..e2fbe35 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -47,6 +47,7 @@ import org.apache.asterix.api.http.server.QueryStatusApiServlet; import org.apache.asterix.api.http.server.RebalanceApiServlet; import org.apache.asterix.api.http.server.ServletConstants; import org.apache.asterix.api.http.server.ShutdownApiServlet; +import org.apache.asterix.api.http.server.UdfApiServlet; import org.apache.asterix.api.http.server.VersionApiServlet; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.cc.CCExtensionManager; @@ -150,7 +151,7 @@ public class CCApplication extends BaseCCApplication { ReplicationProperties repProp = new ReplicationProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig())); INcLifecycleCoordinator lifecycleCoordinator = createNcLifeCycleCoordinator(repProp.isReplicationEnabled()); - ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false); + ExternalLibraryUtils.setUpInstalledLibraries(libraryManager, false, ccServiceCtx.getServerCtx().getAppDir()); componentProvider = new StorageComponentProvider(); ccExtensionManager = new CCExtensionManager(new ArrayList<>(getExtensions())); @@ -261,6 +262,7 @@ public class CCApplication extends BaseCCApplication { addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_CC_DETAIL); // must not precede add of CLUSTER_STATE addServlet(jsonAPIServer, Servlets.DIAGNOSTICS); addServlet(jsonAPIServer, Servlets.ACTIVE_STATS); + addServlet(jsonAPIServer, Servlets.UDF); return jsonAPIServer; } @@ -313,6 +315,8 @@ public class CCApplication extends BaseCCApplication { return new DiagnosticsApiServlet(appCtx, ctx, paths); case Servlets.ACTIVE_STATS: return new ActiveStatsApiServlet(appCtx, ctx, paths); + case Servlets.UDF: + return new UdfApiServlet(appCtx, ctx, paths); default: throw new IllegalStateException(String.valueOf(key)); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index 2c25b36..ccbab3c 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -198,8 +198,7 @@ public class AsterixHyracksIntegrationUtil { public void init(boolean deleteOldInstanceData, String externalLibPath, String confDir) throws Exception { List<ILibraryManager> libraryManagers = new ArrayList<>(); - ExternalUDFLibrarian librarian = new ExternalUDFLibrarian(libraryManagers); - librarian.cleanup(); + ExternalUDFLibrarian librarian = new ExternalUDFLibrarian(); init(deleteOldInstanceData, confDir); if (externalLibPath != null && externalLibPath.length() != 0) { libraryManagers.add(((ICcApplicationContext) cc.getApplicationContext()).getLibraryManager()); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index ffbddb6..ccbf5ec 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -26,7 +26,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.asterix.app.external.ExternalUDFLibrarian; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.app.nc.TransactionSubsystem; import org.apache.asterix.common.config.DatasetConfig.IndexType; @@ -151,8 +150,6 @@ public class TestNodeController { try { File outdir = new File(PATH_ACTUAL); outdir.mkdirs(); - // remove library directory - ExternalUDFLibrarian.removeLibraryDir(); ExecutionTestUtil.setUp(cleanupOnStart, testConfigFileName == null ? TEST_CONFIG_FILE_NAME : testConfigFileName, ExecutionTestUtil.integrationUtil, runHDFS, options); @@ -167,7 +164,6 @@ public class TestNodeController { } public void deInit(boolean cleanupOnStop) throws Exception { - ExternalUDFLibrarian.removeLibraryDir(); ExecutionTestUtil.tearDown(cleanupOnStop, runHDFS); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index 5da5199..d05ae3e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -1105,7 +1105,7 @@ public class TestExecutor { lines = statement.split("\n"); String lastLine = lines[lines.length - 1]; String[] command = lastLine.trim().split(" "); - if (command.length < 3) { + if (command.length < 2) { throw new Exception("invalid library format"); } String dataverse = command[1]; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java index 75eccfd..20420ab 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java @@ -33,7 +33,6 @@ import java.util.Collection; import java.util.List; import org.apache.asterix.app.external.ExternalUDFLibrarian; -import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.test.common.TestExecutor; import org.apache.asterix.testframework.context.TestCaseContext; @@ -69,10 +68,8 @@ public class LangExecutionUtil { testExecutor = executor; File outdir = new File(PATH_ACTUAL); outdir.mkdirs(); - List<ILibraryManager> libraryManagers = - ExecutionTestUtil.setUp(cleanupOnStart, configFile, integrationUtil, startHdfs, null); - ExternalUDFLibrarian.removeLibraryDir(); - librarian = new ExternalUDFLibrarian(libraryManagers); + ExecutionTestUtil.setUp(cleanupOnStart, configFile, integrationUtil, startHdfs, null); + librarian = new ExternalUDFLibrarian(); testExecutor.setLibrarian(librarian); if (repeat != 1) { System.out.println("FYI: each test will be run " + repeat + " times."); @@ -86,7 +83,6 @@ public class LangExecutionUtil { // Check whether there are leaked threads. checkThreadLeaks(); } finally { - ExternalUDFLibrarian.removeLibraryDir(); ExecutionTestUtil.tearDown(cleanupOnStop); integrationUtil.removeTestStorageFiles(); if (!badTestCases.isEmpty()) { @@ -126,9 +122,6 @@ public class LangExecutionUtil { if (repeat > 1) { System.err.print("[" + i + "/" + repeat + "] "); } - if (librarian != null) { - librarian.cleanup(); - } testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, ExecutionTestUtil.FailedGroup); try { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/type_validation/type_validation.0.ddl.sqlpp similarity index 62% copy from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/type_validation/type_validation.0.ddl.sqlpp index 9a17444..76cc70d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/type_validation/type_validation.0.ddl.sqlpp @@ -16,16 +16,5 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.app.external; - -import java.io.IOException; -import java.rmi.RemoteException; - -import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.exceptions.AsterixException; - -public interface IExternalUDFLibrarian { - public void install(String dvName, String libName, String libPath) throws IOException, Exception; - - public void uninstall(String dvName, String libName) throws RemoteException, AsterixException, ACIDException; -} +DROP DATAVERSE externallibtest if exists; +CREATE DATAVERSE externallibtest; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java index ec02692..c1598d9 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.library; +import java.net.URLClassLoader; import java.util.List; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -29,12 +30,10 @@ public interface ILibraryManager { /** * Registers the library class loader with the external library manager. * <code>dataverseName</code> and <code>libraryName</code> uniquely identifies a class loader. - * - * @param dataverseName * @param libraryName * @param classLoader */ - void registerLibraryClassLoader(String dataverseName, String libraryName, ClassLoader classLoader) + void registerLibraryClassLoader(String dataverseName, String libraryName, URLClassLoader classLoader) throws HyracksDataException; /** diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java index d5aa5d1..93a959f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java @@ -36,6 +36,7 @@ public class Servlets { public static final String ACTIVE_STATS = "/admin/active/*"; public static final String STORAGE = "/admin/storage/*"; public static final String NET_DIAGNOSTICS = "/admin/net/*"; + public static final String UDF = "/admin/udf/*"; private Servlets() { } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java index b9ce2bd..60c8bfd 100755 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.external.library; +import java.io.IOException; +import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -28,14 +30,17 @@ import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.library.ILibraryManager; import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class ExternalLibraryManager implements ILibraryManager { - private final Map<String, ClassLoader> libraryClassLoaders = new HashMap<>(); + private final Map<String, URLClassLoader> libraryClassLoaders = new HashMap<>(); private final Map<String, List<String>> externalFunctionParameters = new HashMap<>(); + private static final Logger LOGGER = LogManager.getLogger(); @Override - public void registerLibraryClassLoader(String dataverseName, String libraryName, ClassLoader classLoader) + public void registerLibraryClassLoader(String dataverseName, String libraryName, URLClassLoader classLoader) throws RuntimeDataException { String key = getKey(dataverseName, libraryName); synchronized (libraryClassLoaders) { @@ -59,7 +64,13 @@ public class ExternalLibraryManager implements ILibraryManager { public void deregisterLibraryClassLoader(String dataverseName, String libraryName) { String key = getKey(dataverseName, libraryName); synchronized (libraryClassLoaders) { - if (libraryClassLoaders.get(key) != null) { + URLClassLoader cl = libraryClassLoaders.get(key); + if (cl != null) { + try { + cl.close(); + } catch (IOException e) { + LOGGER.error("Unable to close UDF classloader!", e); + } libraryClassLoaders.remove(key); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java similarity index 67% copy from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java copy to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java index ff037f0..b997a6e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java @@ -16,29 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.control.common.context; +package org.apache.hyracks.api.application; import java.io.File; -public class ServerContext { - public enum ServerType { +public interface IServerContext { + enum ServerType { CLUSTER_CONTROLLER, NODE_CONTROLLER, } - private final ServerType type; - private final File baseDir; + ServerType getServerType(); - public ServerContext(ServerType type, File baseDir) { - this.type = type; - this.baseDir = baseDir; - } - - public ServerType getServerType() { - return type; - } + File getBaseDir(); - public File getBaseDir() { - return baseDir; - } + File getAppDir(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java index 6effee3..0a99c45 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java @@ -69,4 +69,6 @@ public interface IServiceContext { default IPersistedResourceRegistry getPersistedResourceRegistry() { throw new UnsupportedOperationException(); } + + IServerContext getServerCtx(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index 0ee3658..e813141 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -386,10 +386,12 @@ public class HyracksClientInterfaceFunctions { private static final long serialVersionUID = 1L; private final List<URL> binaryURLs; private final DeploymentId deploymentId; + private final boolean extractFromArchive; - public CliDeployBinaryFunction(List<URL> binaryURLs, DeploymentId deploymentId) { + public CliDeployBinaryFunction(List<URL> binaryURLs, DeploymentId deploymentId, boolean isExtractFromArchive) { this.binaryURLs = binaryURLs; this.deploymentId = deploymentId; + this.extractFromArchive = isExtractFromArchive; } @Override @@ -404,6 +406,10 @@ public class HyracksClientInterfaceFunctions { public DeploymentId getDeploymentId() { return deploymentId; } + + public boolean isExtractFromArchive() { + return extractFromArchive; + } } public static class CliUnDeployBinaryFunction extends Function { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java index 89f2ad4..f295119 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java @@ -159,12 +159,22 @@ public interface IHyracksClientConnection extends IClusterInfoCollector { void waitForCompletion(JobId jobId) throws Exception; /** - * Deploy the user-defined jars to the cluster + * Deploy files to the cluster * - * @param jars - * a list of user-defined jars + * @param files + * a list of file paths */ - DeploymentId deployBinary(List<String> jars) throws Exception; + DeploymentId deployBinary(List<String> files) throws Exception; + + /** + * Deploy files to the cluster + * + * @param files + * a list of file paths + * @param deploymentId + * the id used to uniquely identify this set of files for management + */ + void deployBinary(DeploymentId deploymentId, List<String> files, boolean extractFromArchive) throws Exception; /** * undeploy a certain deployment diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java index 4cc47d2..053276f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java @@ -57,7 +57,8 @@ public interface IHyracksClientInterface { public ClusterTopology getClusterTopology() throws Exception; - public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception; + public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId, boolean extractFromArchive) + throws Exception; public void unDeployBinary(DeploymentId deploymentId) throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java index 67e9599..a78c269 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java @@ -165,7 +165,7 @@ class ClientInterfaceIPCI implements IIPCI { HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn; ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, dbf.getBinaryURLs(), dbf.getDeploymentId(), - new IPCResponder<>(handle, mid))); + dbf.isExtractFromArchive(), new IPCResponder<>(handle, mid))); break; case CLI_UNDEPLOY_BINARY: HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java index 4962607..53f9360 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java @@ -47,13 +47,15 @@ public class CliDeployBinaryWork extends SynchronizableWork { private List<URL> binaryURLs; private DeploymentId deploymentId; private IPCResponder<DeploymentId> callback; + private boolean extractFromArchive; public CliDeployBinaryWork(ClusterControllerService ncs, List<URL> binaryURLs, DeploymentId deploymentId, - IPCResponder<DeploymentId> callback) { + boolean extractFromArchive, IPCResponder<DeploymentId> callback) { this.ccs = ncs; this.binaryURLs = binaryURLs; this.deploymentId = deploymentId; this.callback = callback; + this.extractFromArchive = extractFromArchive; } @Override @@ -66,7 +68,7 @@ public class CliDeployBinaryWork extends SynchronizableWork { * Deploy for the cluster controller */ DeploymentUtils.deploy(deploymentId, binaryURLs, ccs.getContext().getJobSerializerDeserializerContainer(), - ccs.getServerContext(), false); + ccs.getServerContext(), false, extractFromArchive); /** * Deploy for the node controllers @@ -82,7 +84,7 @@ public class CliDeployBinaryWork extends SynchronizableWork { * deploy binaries to each node controller */ for (NodeControllerState ncs : nodeManager.getAllNodeControllerStates()) { - ncs.getNodeController().deployBinary(deploymentId, binaryURLs); + ncs.getNodeController().deployBinary(deploymentId, binaryURLs, extractFromArchive); } ccs.getExecutor().execute(new Runnable() { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java index 99fc76b..b6d32a9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java @@ -21,16 +21,16 @@ package org.apache.hyracks.control.common.application; import java.io.Serializable; import java.util.concurrent.ThreadFactory; +import org.apache.hyracks.api.application.IServerContext; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.io.IPersistedResourceRegistry; import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer; import org.apache.hyracks.api.job.JobSerializerDeserializerContainer; import org.apache.hyracks.api.messages.IMessageBroker; -import org.apache.hyracks.control.common.context.ServerContext; public abstract class ServiceContext implements IServiceContext { - protected final ServerContext serverCtx; + protected final IServerContext serverCtx; protected final IApplicationConfig appConfig; protected ThreadFactory threadFactory; protected Serializable distributedState; @@ -38,7 +38,7 @@ public abstract class ServiceContext implements IServiceContext { protected IJobSerializerDeserializerContainer jobSerDeContainer = new JobSerializerDeserializerContainer(); protected IPersistedResourceRegistry persistedResourceRegistry; - public ServiceContext(ServerContext serverCtx, IApplicationConfig appConfig, ThreadFactory threadFactory) { + public ServiceContext(IServerContext serverCtx, IApplicationConfig appConfig, ThreadFactory threadFactory) { this.serverCtx = serverCtx; this.appConfig = appConfig; this.threadFactory = threadFactory; @@ -88,4 +88,9 @@ public abstract class ServiceContext implements IServiceContext { public IPersistedResourceRegistry getPersistedResourceRegistry() { return persistedResourceRegistry; } + + @Override + public IServerContext getServerCtx() { + return serverCtx; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java index 42a0d66..7f172b6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java @@ -51,7 +51,7 @@ public interface INodeController { void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception; - void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception; + void deployBinary(DeploymentId deploymentId, List<URL> url, boolean extractFromArchive) throws Exception; void undeployBinary(DeploymentId deploymentId) throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java index ff037f0..ef2777d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java @@ -20,18 +20,18 @@ package org.apache.hyracks.control.common.context; import java.io.File; -public class ServerContext { - public enum ServerType { - CLUSTER_CONTROLLER, - NODE_CONTROLLER, - } +import org.apache.hyracks.api.application.IServerContext; + +public class ServerContext implements IServerContext { private final ServerType type; private final File baseDir; + private final File appDir; public ServerContext(ServerType type, File baseDir) { this.type = type; this.baseDir = baseDir; + this.appDir = new File(baseDir, "applications"); } public ServerType getServerType() { @@ -41,4 +41,8 @@ public class ServerContext { public File getBaseDir() { return baseDir; } + + public File getAppDir() { + return appDir; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java index 31e6e39..f0d7828 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java @@ -73,7 +73,7 @@ public class ClassLoaderJobSerializerDeserializer implements IJobSerializerDeser }); try { if (classLoader == null) { - /** crate a new classloader */ + /** create a new classloader */ URL[] urls = binaryURLs.toArray(new URL[binaryURLs.size()]); classLoader = new MutableURLClassLoader(urls, this.getClass().getClassLoader()); } else { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java index a079d97..b900591 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java @@ -26,7 +26,10 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.URL; import java.util.ArrayList; +import java.util.Enumeration; import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -41,6 +44,8 @@ import org.apache.hyracks.api.job.IJobSerializerDeserializer; import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.common.context.ServerContext; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * A utility class which is in charge of the actual work of deployments. @@ -50,6 +55,7 @@ import org.apache.hyracks.control.common.context.ServerContext; public class DeploymentUtils { private static final String DEPLOYMENT = "applications"; + private static final Logger LOGGER = LogManager.getLogger(); /** * undeploy an existing deployment @@ -86,13 +92,12 @@ public class DeploymentUtils { * @param container * the container of serailizer/deserializer * @param ctx - * the ServerContext - * @param isNC + * the ServerContext * @param isNC * true is NC/false is CC * @throws HyracksException */ public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container, - ServerContext ctx, boolean isNC) throws HyracksException { + ServerContext ctx, boolean isNC, boolean extractFromArchive) throws HyracksException { IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeserializer(deploymentId); if (jobSerDe == null) { jobSerDe = new ClassLoaderJobSerializerDeserializer(); @@ -101,7 +106,11 @@ public class DeploymentUtils { String rootDir = ctx.getBaseDir().toString(); String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + DEPLOYMENT + File.separator + deploymentId : rootDir + File.separator + DEPLOYMENT + File.separator + deploymentId; - jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC)); + if (extractFromArchive) { + downloadURLs(urls, deploymentDir, isNC, true); + } else { + jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC, false)); + } } /** @@ -176,7 +185,8 @@ public class DeploymentUtils { * @return a list of local file URLs * @throws HyracksException */ - private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC) throws HyracksException { + private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC, + boolean extractFromArchive) throws HyracksException { //retry 10 times at maximum for downloading binaries int retryCount = 10; int tried = 0; @@ -208,14 +218,44 @@ public class DeploymentUtils { is.close(); } } + if (extractFromArchive) { + unzip(targetFile.getAbsolutePath(), deploymentDir); + } downloadedFileURLs.add(targetFile.toURI().toURL()); } return downloadedFileURLs; } catch (Exception e) { - e.printStackTrace(); + LOGGER.error("Unable to fetch binaries from URL", e); trace = e; } } throw HyracksException.create(trace); } + + public static void unzip(String sourceFile, String outputDir) throws IOException { + try (ZipFile zipFile = new ZipFile(sourceFile)) { + Enumeration<? extends ZipEntry> entries = zipFile.entries(); + List<File> createdFiles = new ArrayList<>(); + while (entries.hasMoreElements()) { + ZipEntry entry = entries.nextElement(); + File entryDestination = new File(outputDir, entry.getName()); + if (!entry.isDirectory()) { + entryDestination.getParentFile().mkdirs(); + try (InputStream in = zipFile.getInputStream(entry); + OutputStream out = new FileOutputStream(entryDestination)) { + createdFiles.add(entryDestination); + IOUtils.copy(in, out); + } catch (IOException e) { + for (File f : createdFiles) { + if (!f.delete()) { + LOGGER.error("Couldn't clean up file after failed archive extraction: " + + f.getAbsolutePath()); + } + } + throw e; + } + } + } + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index 2ba4768..43b9003 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -1176,11 +1176,14 @@ public class CCNCFunctions { private final List<URL> binaryURLs; private final DeploymentId deploymentId; + private final boolean extractFromArchive; - public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId) { + public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId, + boolean isExtractFromArchive) { super(ccId); this.binaryURLs = binaryURLs; this.deploymentId = deploymentId; + this.extractFromArchive = isExtractFromArchive; } @Override @@ -1195,6 +1198,10 @@ public class CCNCFunctions { public DeploymentId getDeploymentId() { return deploymentId; } + + public boolean isExtractFromArchive() { + return extractFromArchive; + } } public static class UnDeployBinaryFunction extends CCIdentifiedFunction { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index d32ee32..ea71bc9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -92,8 +92,9 @@ public class NodeControllerRemoteProxy implements INodeController { } @Override - public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception { - DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs, ccId); + public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs, boolean extractFromArchive) + throws Exception { + DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs, ccId, extractFromArchive); ipcHandle.send(-1, rpaf, null); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java index 3bc9710..836c624 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java @@ -100,8 +100,8 @@ final class NodeControllerIPCI implements IIPCI { case DEPLOY_BINARY: CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn; - ncs.getWorkQueue() - .schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs(), dbf.getCcId())); + ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs(), + dbf.getCcId(), dbf.isExtractFromArchive())); return; case UNDEPLOY_BINARY: diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 517169b..bf07c20 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -222,7 +222,7 @@ public class NodeControllerService implements IControllerService { deployedJobSpecActivityClusterGraphMap = new Hashtable<>(); timer = new Timer(true); serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, - new File(new File(NodeControllerService.class.getName()), id)); + new File(ioManager.getWorkspacePath(0), id)); getNodeControllerInfosAcceptor = new MutableObject<>(); memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR)); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java index 14404d2..c335312 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java @@ -346,6 +346,10 @@ public class IOManager implements IIOManager { return dev.createFileRef(waPath + File.separator + waf.getName()); } + public String getWorkspacePath(int index) { + return workspaces.get(index) != null ? workspaces.get(index).getWorkspace() : null; + } + @Override public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException { try { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java index dfda463..586b539 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java @@ -42,12 +42,15 @@ public class DeployBinaryWork extends AbstractWork { private NodeControllerService ncs; private List<URL> binaryURLs; private final CcId ccId; + private final boolean extractFromArchive; - public DeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId) { + public DeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId, + boolean extractFromArchive) { this.deploymentId = deploymentId; this.ncs = ncs; this.binaryURLs = binaryURLs; this.ccId = ccId; + this.extractFromArchive = extractFromArchive; } @Override @@ -55,7 +58,7 @@ public class DeployBinaryWork extends AbstractWork { DeploymentStatus status; try { DeploymentUtils.deploy(deploymentId, binaryURLs, ncs.getContext().getJobSerializerDeserializerContainer(), - ncs.getServerContext(), true); + ncs.getServerContext(), true, extractFromArchive); status = DeploymentStatus.SUCCEED; } catch (Exception e) { status = DeploymentStatus.FAIL; diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java index 82d7a3b..840fa24 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java @@ -144,9 +144,11 @@ public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterfac } @Override - public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception { + public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId, boolean extractFromArchive) + throws Exception { HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = - new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, deploymentId); + new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, deploymentId, + extractFromArchive); rpci.call(ipcHandle, dbf); } diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java index 29d12ce..fa6f2f0 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java @@ -214,21 +214,28 @@ public final class HyracksConnection implements IHyracksClientConnection { } @Override - public DeploymentId deployBinary(List<String> jars) throws Exception { + public DeploymentId deployBinary(List<String> files) throws Exception { /** generate a deployment id */ DeploymentId deploymentId = new DeploymentId(UUID.randomUUID().toString()); + deployBinary(deploymentId, files, false); + return deploymentId; + } + + @Override + public void deployBinary(DeploymentId deploymentId, List<String> files, boolean extractFromArchive) + throws Exception { List<URL> binaryURLs = new ArrayList<>(); - if (jars != null && !jars.isEmpty()) { + if (files != null && !files.isEmpty()) { CloseableHttpClient hc = new DefaultHttpClient(); try { - /** upload jars through a http client one-by-one to the CC server */ - for (String jar : jars) { - int slashIndex = jar.lastIndexOf('/'); - String fileName = jar.substring(slashIndex + 1); + /** upload files through a http client one-by-one to the CC server */ + for (String file : files) { + int slashIndex = file.lastIndexOf('/'); + String fileName = file.substring(slashIndex + 1); String url = "http://" + ccHost + ":" + ccInfo.getWebPort() + "/applications/" + deploymentId.toString() + "&" + fileName; HttpPut put = new HttpPut(url); - put.setEntity(new FileEntity(new File(jar), "application/octet-stream")); + put.setEntity(new FileEntity(new File(file), "application/octet-stream")); HttpResponse response = hc.execute(put); response.getEntity().consumeContent(); if (response.getStatusLine().getStatusCode() != 200) { @@ -243,8 +250,7 @@ public final class HyracksConnection implements IHyracksClientConnection { } } /** deploy the URLs to the CC and NCs */ - hci.deployBinary(binaryURLs, deploymentId); - return deploymentId; + hci.deployBinary(binaryURLs, deploymentId, extractFromArchive); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java index 4417795..63126ab 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.concurrent.ThreadFactory; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.application.IServerContext; import org.apache.hyracks.api.application.IStateDumpHandler; import org.apache.hyracks.api.comm.IChannelInterfaceFactory; import org.apache.hyracks.api.config.IApplicationConfig; @@ -152,4 +153,9 @@ public class TestNCServiceContext implements INCServiceContext { public Object getApplicationContext() { return appCtx; } + + @Override + public IServerContext getServerCtx() { + return null; + } }