Github user DarinJ commented on a diff in the pull request:

    https://github.com/apache/incubator-pirk/pull/93#discussion_r79377189
  
    --- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java ---
    @@ -49,83 +41,111 @@
     public class ResponderDriver
     {
       private static final Logger logger = 
LoggerFactory.getLogger(ResponderDriver.class);
    +  // ClassNames to instantiate Platforms using the platform CLI
    +  private final static String MAPREDUCE_LAUNCHER = 
"org.apache.pirk.responder.wideskies.mapreduce.MapReduceResponderLauncher";
    +  private final static String SPARK_LAUNCHER = 
"org.apache.pirk.responder.wideskies.spark.SparkResponderLauncher";
    +  private final static String SPARKSTREAMING_LAUNCHER = 
"org.apache.pirk.responder.wideskies.spark.streaming.SparkStreamingResponderLauncher";
    +  private final static String STANDALONE_LAUNCHER = 
"org.apache.pirk.responder.wideskies.standalone.StandaloneResponderLauncher";
    +  private final static String STORM_LAUNCHER = 
"org.apache.pirk.responder.wideskies.storm.StormResponderLauncher";
     
       private enum Platform
       {
         MAPREDUCE, SPARK, SPARKSTREAMING, STORM, STANDALONE, NONE
       }
     
    -  public static void main(String[] args) throws Exception
    +  private static void launch(String launcherClassName)
    +  {
    +    logger.info("Launching Responder with {}", launcherClassName);
    +    try
    +    {
    +      Class clazz = Class.forName(launcherClassName);
    +      if (ResponderLauncher.class.isAssignableFrom(clazz))
    +      {
    +        Object launcherInstance = clazz.newInstance();
    +        Method m = launcherInstance.getClass().getDeclaredMethod("run");
    +        m.invoke(launcherInstance);
    +      }
    +      else
    +      {
    +        logger.error("Class {} does not implement ResponderLauncher", 
launcherClassName);
    +      }
    +    }
    +    catch (ClassNotFoundException e)
    +    {
    +      logger.error("Class {} not found, check launcher property: {}", 
launcherClassName);
    +    }
    +    catch (NoSuchMethodException e)
    +    {
    +      logger.error("In {} run method not found: {}", launcherClassName);
    +    }
    +    catch (InvocationTargetException e)
    +    {
    +      logger.error("In {} run method could not be invoked: {}: {}", 
launcherClassName, e);
    +    }
    +    catch (InstantiationException e)
    +    {
    +      logger.error("Instantiation exception within {}: {}", 
launcherClassName, e);
    +    }
    +    catch (IllegalAccessException e)
    +    {
    +      logger.error("IllegalAccess Exception {}", e);
    +    }
    +  }
    +
    +  public static void main(String[] args)
       {
         ResponderCLI responderCLI = new ResponderCLI(args);
     
         // For handling System.exit calls from Spark Streaming
         System.setSecurityManager(new SystemExitManager());
     
    -    Platform platform = Platform.NONE;
    -    String platformString = 
SystemConfiguration.getProperty(ResponderProps.PLATFORM);
    -    try
    -    {
    -      platform = Platform.valueOf(platformString.toUpperCase());
    -    } catch (IllegalArgumentException e)
    +    String launcherClassName = 
SystemConfiguration.getProperty(ResponderProps.LAUNCHER);
    +    if (launcherClassName != null)
         {
    -      logger.error("platform " + platformString + " not found.");
    +      launch(launcherClassName);
         }
    -
    -    logger.info("platform = " + platform);
    -    switch (platform)
    +    else
         {
    -      case MAPREDUCE:
    -        logger.info("Launching MapReduce ResponderTool:");
    -
    -        ComputeResponseTool pirWLTool = new ComputeResponseTool();
    -        ToolRunner.run(pirWLTool, new String[] {});
    -        break;
    -
    -      case SPARK:
    -        logger.info("Launching Spark ComputeResponse:");
    -
    -        ComputeResponse computeResponse = new 
ComputeResponse(FileSystem.get(new Configuration()));
    -        computeResponse.performQuery();
    -        break;
    -
    -      case SPARKSTREAMING:
    -        logger.info("Launching Spark ComputeStreamingResponse:");
    -
    -        ComputeStreamingResponse computeSR = new 
ComputeStreamingResponse(FileSystem.get(new Configuration()));
    -        try
    -        {
    -          computeSR.performQuery();
    -        } catch (SystemExitException e)
    -        {
    -          // If System.exit(0) is not caught from Spark Streaming,
    -          // the application will complete with a 'failed' status
    -          logger.info("Exited with System.exit(0) from Spark Streaming");
    -        }
    -
    -        // Teardown the context
    -        computeSR.teardown();
    -        break;
    -
    -      case STORM:
    -        logger.info("Launching Storm PirkTopology:");
    -        PirkTopology.runPirkTopology();
    -        break;
    -
    -      case STANDALONE:
    -        logger.info("Launching Standalone Responder:");
    -
    -        String queryInput = 
SystemConfiguration.getProperty("pir.queryInput");
    -        Query query = new LocalFileSystemStore().recall(queryInput, 
Query.class);
    -
    -        Responder pirResponder = new Responder(query);
    -        pirResponder.computeStandaloneResponse();
    -        break;
    +      logger.warn("platform is being deprecaited in flavor of launcher");
    --- End diff --
    
    You figured it out :).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to