Github user tellison commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/93#discussion_r80030927 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java --- @@ -50,103 +40,31 @@ { private static final Logger logger = LoggerFactory.getLogger(ResponderDriver.class); - private enum Platform - { - MAPREDUCE, SPARK, SPARKSTREAMING, STORM, STANDALONE, NONE - } - - public static void main(String[] args) throws Exception + public static void main(String[] args) { ResponderCLI responderCLI = new ResponderCLI(args); - // For handling System.exit calls from Spark Streaming - System.setSecurityManager(new SystemExitManager()); - - Platform platform = Platform.NONE; - String platformString = SystemConfiguration.getProperty(ResponderProps.PLATFORM); + String platformName = SystemConfiguration.getProperty(ResponderProps.PLATFORM, "None"); + logger.info("Attempting to use platform {} ...", platformName); try { - platform = Platform.valueOf(platformString.toUpperCase()); - } catch (IllegalArgumentException e) - { - logger.error("platform " + platformString + " not found."); - } - - logger.info("platform = " + platform); - switch (platform) - { - case MAPREDUCE: - logger.info("Launching MapReduce ResponderTool:"); - - ComputeResponseTool pirWLTool = new ComputeResponseTool(); - ToolRunner.run(pirWLTool, new String[] {}); - break; - - case SPARK: - logger.info("Launching Spark ComputeResponse:"); - - ComputeResponse computeResponse = new ComputeResponse(FileSystem.get(new Configuration())); - computeResponse.performQuery(); - break; - - case SPARKSTREAMING: - logger.info("Launching Spark ComputeStreamingResponse:"); - - ComputeStreamingResponse computeSR = new ComputeStreamingResponse(FileSystem.get(new Configuration())); - try - { - computeSR.performQuery(); - } catch (SystemExitException e) - { - // If System.exit(0) is not caught from Spark Streaming, - // the application will complete with a 'failed' status - logger.info("Exited with System.exit(0) from Spark Streaming"); - } - - // Teardown the context - computeSR.teardown(); - break; - - case STORM: - logger.info("Launching Storm PirkTopology:"); - PirkTopology.runPirkTopology(); - break; - - case STANDALONE: - logger.info("Launching Standalone Responder:"); - - String queryInput = SystemConfiguration.getProperty("pir.queryInput"); - Query query = new LocalFileSystemStore().recall(queryInput, Query.class); - - Responder pirResponder = new Responder(query); - pirResponder.computeStandaloneResponse(); - break; - } - } - - // Exception and Security Manager classes used to catch System.exit from Spark Streaming - private static class SystemExitException extends SecurityException - {} - - private static class SystemExitManager extends SecurityManager - { - @Override - public void checkPermission(Permission perm) - {} - - @Override - public void checkExit(int status) - { - super.checkExit(status); - if (status == 0) // If we exited cleanly, throw SystemExitException + ResponderPlugin responder = ResponderService.getInstance().getResponder(platformName); + if (responder == null) { - throw new SystemExitException(); + logger.error("No such platform plugin found: {}!", platformName); } else { - throw new SecurityException(); + responder.run(); } - + } + catch (PIRException pirEx) --- End diff -- IIRC the PIRException was being thrown with the original as a cause, so you won't loose the stacktrace. I agree that tidying up exceptions is beyond this PR, and will require some 'waves' of wrapping Exception until they are banished.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---