[
https://issues.apache.org/jira/browse/APEXCORE-304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263552#comment-15263552
]
ASF GitHub Bot commented on APEXCORE-304:
-----------------------------------------
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/incubator-apex-core/pull/311#discussion_r61534824
--- Diff:
engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java ---
@@ -274,4 +287,113 @@ public WindowGenerator setupWindowGenerator()
localCluster.shutdown();
}
+ @Test
+ public void testDynamicLoading() throws Exception
+ {
+ String generatedJar = generatejar("POJO");
+ URLClassLoader uCl = URLClassLoader.newInstance(new URL[] {new
File(generatedJar).toURI().toURL()});
+ Class<?> pojo = uCl.loadClass("POJO");
+
+ DynamicLoaderApp app = new DynamicLoaderApp();
+ app.generatedJar = generatedJar;
+ app.pojo = pojo;
+
+ LocalMode lma = LocalMode.newInstance();
+ lma.prepareDAG(app, new Configuration());
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+ DynamicLoaderApp.latch.await();
+ Assert.assertTrue(DynamicLoaderApp.passed);
+ lc.shutdown();
+ }
+
+ static class DynamicLoaderApp implements StreamingApplication
+ {
+ static boolean passed = false;
+ static CountDownLatch latch = new CountDownLatch(2);
+
+ DynamicLoader test;
+ String generatedJar;
+ Class<?> pojo;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TestGeneratorInputOperator input = dag.addOperator("Input", new
TestGeneratorInputOperator());
+ test = dag.addOperator("Test", new DynamicLoader());
+
+ dag.addStream("S1", input.outport, test.input);
+ dag.setAttribute(Context.DAGContext.LIBRARY_JARS, generatedJar);
+ dag.setInputPortAttribute(test.input,
Context.PortContext.TUPLE_CLASS, pojo);
+ }
+ }
+
+ static class DynamicLoader extends BaseOperator
+ {
+ public final transient DefaultInputPort input = new DefaultInputPort()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ Class<?> value = context.getValue(Context.PortContext.TUPLE_CLASS);
+ if (value.getName().equals("POJO")) {
+ DynamicLoaderApp.passed = true;
+ } else {
+ DynamicLoaderApp.passed = false;
+ }
+ DynamicLoaderApp.latch.countDown();
+ }
+
+ @Override
+ public void process(Object tuple)
+ {
+ }
+ };
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ try {
+ cl.loadClass("POJO");
+ } catch (ClassNotFoundException e) {
+ DynamicLoaderApp.passed = false;
+ DynamicLoaderApp.latch.countDown();
+ throw new RuntimeException(e);
+ }
+
+ try {
+ Class.forName("POJO", true,
Thread.currentThread().getContextClassLoader());
+ } catch (ClassNotFoundException e) {
+ DynamicLoaderApp.passed = false;
+ DynamicLoaderApp.latch.countDown();
+ throw new RuntimeException(e);
+ }
+
+ DynamicLoaderApp.passed = true;
+ DynamicLoaderApp.latch.countDown();
+ }
+ }
+
+ private String generatejar(String pojoClassName) throws IOException,
InterruptedException
+ {
+ String sourceDir = "src/test/resources/dynamicJar/";
+ String destDir = "target/";
--- End diff --
Done.
> Ability to add jars to classpath in populateDAG
> -----------------------------------------------
>
> Key: APEXCORE-304
> URL: https://issues.apache.org/jira/browse/APEXCORE-304
> Project: Apache Apex Core
> Issue Type: Improvement
> Reporter: Chinmay Kolhatkar
> Assignee: Chinmay Kolhatkar
>
> This will have following functionality:
> 1) In populateDAG one would be allowed to add given local jar path to
> classpath of the application.
> 2) Optionally delete the given jar file after copying to HDFS is done.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)