This is an automated email from the ASF dual-hosted git repository. cwiklik pushed a commit to branch uima-as-3 in repository https://gitbox.apache.org/repos/asf/uima-async-scaleout.git
commit fdfa8ecf0138cd20a61fb29db5c57bfec9938bbc Author: cwiklik <cwiklik> AuthorDate: Thu Oct 18 14:17:11 2018 +0000 UIMA-5501 --- uimaj-as-connectors/.classpath | 36 ++ uimaj-as-connectors/.project | 23 + .../.settings/org.eclipse.core.resources.prefs | 6 + .../.settings/org.eclipse.jdt.core.prefs | 5 + .../.settings/org.eclipse.m2e.core.prefs | 4 + uimaj-as-connectors/pom.xml | 25 + .../connectors/direct/DirectUimaAsConnector.java | 26 + .../as/connectors/direct/DirectUimaAsConsumer.java | 205 +++++++ .../as/connectors/direct/DirectUimaAsEndpoint.java | 251 ++++++++ .../as/connectors/direct/DirectUimaAsProducer.java | 48 ++ .../mockup/MockUpAnalysisEngineController.java | 659 +++++++++++++++++++++ .../as/connectors/mockup/TestMessageProcessor.java | 28 + .../connectors/direct/DirectUimaAsConnector.class | Bin 0 -> 1276 bytes .../direct/DirectUimaAsConsumer$1$1.class | Bin 0 -> 2023 bytes .../connectors/direct/DirectUimaAsConsumer$1.class | Bin 0 -> 3352 bytes ...rectUimaAsConsumer$DirectListenerCallback.class | Bin 0 -> 1358 bytes .../connectors/direct/DirectUimaAsConsumer.class | Bin 0 -> 8575 bytes ...int$MockupService$ServiceMessageProcessor.class | Bin 0 -> 2619 bytes .../DirectUimaAsEndpoint$MockupService.class | Bin 0 -> 3518 bytes .../connectors/direct/DirectUimaAsEndpoint.class | Bin 0 -> 8402 bytes .../connectors/direct/DirectUimaAsProducer.class | Bin 0 -> 1534 bytes .../mockup/MockUpAnalysisEngineController.class | Bin 0 -> 16652 bytes .../connectors/mockup/TestMessageProcessor.class | Bin 0 -> 1378 bytes 23 files changed, 1316 insertions(+) diff --git a/uimaj-as-connectors/.classpath b/uimaj-as-connectors/.classpath new file mode 100644 index 0000000..e43402f --- /dev/null +++ b/uimaj-as-connectors/.classpath @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<classpath> + <classpathentry kind="src" output="target/classes" path="src/main/java"> + <attributes> + <attribute name="optional" value="true"/> + <attribute name="maven.pomderived" value="true"/> + </attributes> + </classpathentry> + <classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources"> + <attributes> + <attribute name="maven.pomderived" value="true"/> + </attributes> + </classpathentry> + <classpathentry kind="src" output="target/test-classes" path="src/test/java"> + <attributes> + <attribute name="optional" value="true"/> + <attribute name="maven.pomderived" value="true"/> + </attributes> + </classpathentry> + <classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources"> + <attributes> + <attribute name="maven.pomderived" value="true"/> + </attributes> + </classpathentry> + <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"> + <attributes> + <attribute name="maven.pomderived" value="true"/> + </attributes> + </classpathentry> + <classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER"> + <attributes> + <attribute name="maven.pomderived" value="true"/> + </attributes> + </classpathentry> + <classpathentry kind="output" path="target/classes"/> +</classpath> diff --git a/uimaj-as-connectors/.project b/uimaj-as-connectors/.project new file mode 100644 index 0000000..ff69063 --- /dev/null +++ b/uimaj-as-connectors/.project @@ -0,0 +1,23 @@ +<?xml version="1.0" encoding="UTF-8"?> +<projectDescription> + <name>uimaj-as-connectors</name> + <comment></comment> + <projects> + </projects> + <buildSpec> + <buildCommand> + <name>org.eclipse.jdt.core.javabuilder</name> + <arguments> + </arguments> + </buildCommand> + <buildCommand> + <name>org.eclipse.m2e.core.maven2Builder</name> + <arguments> + </arguments> + </buildCommand> + </buildSpec> + <natures> + <nature>org.eclipse.jdt.core.javanature</nature> + <nature>org.eclipse.m2e.core.maven2Nature</nature> + </natures> +</projectDescription> diff --git a/uimaj-as-connectors/.settings/org.eclipse.core.resources.prefs b/uimaj-as-connectors/.settings/org.eclipse.core.resources.prefs new file mode 100644 index 0000000..29abf99 --- /dev/null +++ b/uimaj-as-connectors/.settings/org.eclipse.core.resources.prefs @@ -0,0 +1,6 @@ +eclipse.preferences.version=1 +encoding//src/main/java=UTF-8 +encoding//src/main/resources=UTF-8 +encoding//src/test/java=UTF-8 +encoding//src/test/resources=UTF-8 +encoding/<project>=UTF-8 diff --git a/uimaj-as-connectors/.settings/org.eclipse.jdt.core.prefs b/uimaj-as-connectors/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..ec4300d --- /dev/null +++ b/uimaj-as-connectors/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,5 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7 +org.eclipse.jdt.core.compiler.compliance=1.7 +org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning +org.eclipse.jdt.core.compiler.source=1.7 diff --git a/uimaj-as-connectors/.settings/org.eclipse.m2e.core.prefs b/uimaj-as-connectors/.settings/org.eclipse.m2e.core.prefs new file mode 100644 index 0000000..f897a7f --- /dev/null +++ b/uimaj-as-connectors/.settings/org.eclipse.m2e.core.prefs @@ -0,0 +1,4 @@ +activeProfiles= +eclipse.preferences.version=1 +resolveWorkspaceProjects=true +version=1 diff --git a/uimaj-as-connectors/pom.xml b/uimaj-as-connectors/pom.xml new file mode 100644 index 0000000..da30e8c --- /dev/null +++ b/uimaj-as-connectors/pom.xml @@ -0,0 +1,25 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.uima</groupId> + <artifactId>uima-as-parent</artifactId> + <version>2.10.3-SNAPSHOT</version> + <relativePath>../uima-as-parent/pom.xml</relativePath> + </parent> + + <artifactId>uimaj-as-connectors</artifactId> + + + <dependencies> + <dependency> + <groupId>org.apache.uima</groupId> + <artifactId>uimaj-as-core</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + </dependency> + + + </dependencies> + + +</project> \ No newline at end of file diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConnector.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConnector.java new file mode 100644 index 0000000..6955a68 --- /dev/null +++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConnector.java @@ -0,0 +1,26 @@ +package org.apache.uima.as.connectors.direct; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.uima.aae.definition.connectors.UimaAsConnector; +import org.apache.uima.aae.definition.connectors.UimaAsEndpoint; + +public class DirectUimaAsConnector implements UimaAsConnector { + + private final Map<String, UimaAsEndpoint> endpoints = + new HashMap<>(); + + + @Override + public UimaAsEndpoint createEndpoint(String uri, Map<String, Object> params) throws Exception { + UimaAsEndpoint endpoint = new DirectUimaAsEndpoint(); + endpoints.putIfAbsent(uri, endpoint); + return endpoint; + } + + public static void main(String[] args) { + + } + +} diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.java new file mode 100644 index 0000000..a52b1b1 --- /dev/null +++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.java @@ -0,0 +1,205 @@ +package org.apache.uima.as.connectors.direct; + +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.uima.aae.UimaAsThreadFactory; +import org.apache.uima.aae.controller.AnalysisEngineController; +import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController; +import org.apache.uima.aae.definition.connectors.AbstractUimaAsConsumer; +import org.apache.uima.aae.definition.connectors.ListenerCallback; +import org.apache.uima.aae.definition.connectors.UimaAsConsumer; +import org.apache.uima.aae.message.AsynchAEMessage; +import org.apache.uima.aae.message.MessageContext; +import org.apache.uima.aae.message.MessageProcessor; +import org.apache.uima.aae.spi.transport.vm.UimaVmQueue; +import org.apache.uima.as.client.DirectMessage; +import org.apache.uima.as.client.DirectMessageContext; + +public class DirectUimaAsConsumer extends AbstractUimaAsConsumer { + + private BlockingQueue<DirectMessage> inQueue= new LinkedBlockingQueue<>();; + private MessageProcessor processor; + private boolean started = false; + private ExecutorService executor; + private final ConsumerType consumerType; + private int consumerThreadCount = 1; + private boolean doStop = false; + private final CountDownLatch latchToCountNumberOfInitedThreads; + private final CountDownLatch latchToCountNumberOfTerminatedThreads; + private AnalysisEngineController controller; + private final String name; + public DirectUimaAsConsumer(String name, BlockingQueue<DirectMessage> inQueue, ConsumerType type, int consumerThreadCount) { + this(name, type,consumerThreadCount); + this.inQueue = inQueue; + } + + public DirectUimaAsConsumer(String name,String targetUri, ConsumerType type, int consumerThreadCount) { + this(name, type,consumerThreadCount); + + } + + private DirectUimaAsConsumer( String name, ConsumerType type, int consumerThreadCount) { + this.name = name; + this.consumerType = type; + this.consumerThreadCount = consumerThreadCount; + latchToCountNumberOfInitedThreads = new CountDownLatch(consumerThreadCount); + latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerThreadCount); + } + + public ConsumerType getType() { + return consumerType; + } + + protected void setMessageProcessor(MessageProcessor processor) { + this.processor = processor; + } + + public void initialize() throws Exception { + + } + + /** + * This method is called on a producer thread + * + */ + @Override + public void consume(DirectMessage message) throws Exception { + inQueue.add(message); + } + + private void initializeUimaPipeline() throws Exception { +// workQueue = new UimaVmQueue(); + if ( controller.isPrimitive() ) { + ThreadGroup threadGroup = new ThreadGroup("VmThreadGroup" + 1 + "_" + controller.getComponentName()); + executor = new ThreadPoolExecutor(consumerThreadCount, consumerThreadCount, Long.MAX_VALUE, TimeUnit.DAYS, new UimaVmQueue()); + UimaAsThreadFactory tf = null; + + DirectListenerCallback callback = new DirectListenerCallback(this); + + tf = new UimaAsThreadFactory(). + withCallback(callback). + withThreadGroup(threadGroup). + withPrimitiveController((PrimitiveAnalysisEngineController)processor.getController()). + withTerminatedThreadsLatch(latchToCountNumberOfTerminatedThreads). + withInitedThreadsLatch(latchToCountNumberOfInitedThreads); + tf.setDaemon(true); + ((ThreadPoolExecutor)executor).setThreadFactory(tf); + ((ThreadPoolExecutor)executor).prestartAllCoreThreads(); + latchToCountNumberOfInitedThreads.await(); + if ( callback.failedInitialization() ) { + throw callback.getException(); + } + System.out.println("Executor Started - All Process Threads Initialized"); + } else { + executor = Executors.newFixedThreadPool(consumerThreadCount); + } + } + public void initialize(AnalysisEngineController controller) throws Exception { + this.controller = controller; + // Consumer handling ProcessCAS must first initialize each + // UIMA pipeline. + if (ConsumerType.ProcessCAS.equals(consumerType)) { + if ( Objects.isNull(controller)) { + executor = Executors.newFixedThreadPool(consumerThreadCount); + } else { + initializeUimaPipeline(); + } + + } else { + executor = Executors.newFixedThreadPool(consumerThreadCount); + } + } + + private boolean stopConsumingMessages(DirectMessage message ) throws Exception{ + return (message.getMessageIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.PoisonPill); + + } + public void stop() { + doStop = true; + } + public synchronized void start() { + if ( started ) { + return; + } + System.out.println(">>> "+name+" DirectConsumer.start() - Consumer Type:"+getType()); + new Thread() { + @Override + public void run() { + started = true; + + + while( !doStop ) { + try { + + final DirectMessage message = inQueue.take(); //blocks if empty + System.out.println(">>> "+name+" DirectConsumer.run() - Consumer Type:"+getType()+" Got new message"); + + if ( stopConsumingMessages(message)) { // special type of msg indicating end of processing + System.out.println(">>> "+name+" Got END message - Stopping Queue Consumer"); + doStop = true; + } else { + executor.submit(new Runnable() { + public void run() { + + try { + //System.out.println(">>> "+controller.getComponentName()+" Got new message - processing on thread "+Thread.currentThread().getName()+" channel:"+getType()); + //ic.onMessage(message); + + // every message is wrapped in the MessageContext + MessageContext messageContext = + new DirectMessageContext(message, "", controller.getComponentName()); + + processor.process(messageContext); + } catch( Exception e) { + e.printStackTrace(); + } + } + }); + } + + } catch( InterruptedException e) { + Thread.currentThread().interrupt(); + + //System.out.println(getType()+ " Listener Thread Interrupted - Stop Listening"); + doStop = true; + } catch (Exception e) { + e.printStackTrace(); + doStop = true; + } + } + } + }.start(); + } + public static void main(String[] args) { + // TODO Auto-generated method stub + + } + public class DirectListenerCallback implements ListenerCallback { + private UimaAsConsumer dl; + private boolean initializationFailed = false; + private Exception exception; + + public DirectListenerCallback(UimaAsConsumer l) { + this.dl = l; + } + + public void onInitializationError(Exception e) { + initializationFailed = true; + exception = e; + } + public boolean failedInitialization() { + return initializationFailed; + } + public Exception getException() { + return exception; + } + } + +} diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.java new file mode 100644 index 0000000..012c89a --- /dev/null +++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.java @@ -0,0 +1,251 @@ +package org.apache.uima.as.connectors.direct; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import org.apache.uima.UimaContext; +import org.apache.uima.aae.AsynchAECasManager; +import org.apache.uima.aae.InProcessCache; +import org.apache.uima.aae.InputChannel; +import org.apache.uima.aae.OutputChannel; +import org.apache.uima.aae.UimaEEAdminContext; +import org.apache.uima.aae.controller.AnalysisEngineController; +import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE; +import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState; +import org.apache.uima.aae.controller.ControllerCallbackListener; +import org.apache.uima.aae.controller.ControllerLatch; +import org.apache.uima.aae.controller.Endpoint; +import org.apache.uima.aae.controller.EventSubscriber; +import org.apache.uima.aae.controller.LocalCache; +import org.apache.uima.aae.definition.connectors.UimaAsConsumer; +import org.apache.uima.aae.definition.connectors.UimaAsEndpoint; +import org.apache.uima.aae.definition.connectors.UimaAsProducer; +import org.apache.uima.aae.error.AsynchAEException; +import org.apache.uima.aae.error.ErrorContext; +import org.apache.uima.aae.error.ErrorHandlerChain; +import org.apache.uima.aae.jmx.JmxManagement; +import org.apache.uima.aae.jmx.ServiceErrors; +import org.apache.uima.aae.jmx.ServiceInfo; +import org.apache.uima.aae.jmx.ServicePerformance; +import org.apache.uima.aae.message.AsynchAEMessage; +import org.apache.uima.aae.message.MessageContext; +import org.apache.uima.aae.message.MessageProcessor; +import org.apache.uima.aae.message.UimaAsOrigin; +import org.apache.uima.aae.monitor.Monitor; +import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType; +import org.apache.uima.aae.service.command.UimaAsMessageProcessor; +import org.apache.uima.aae.spi.transport.UimaMessageListener; +import org.apache.uima.as.client.DirectInputChannel; +import org.apache.uima.as.client.DirectMessage; +import org.apache.uima.as.client.DirectMessageContext; +import org.apache.uima.as.client.Listener; +import org.apache.uima.as.connectors.mockup.MockUpAnalysisEngineController; +import org.apache.uima.as.connectors.mockup.TestMessageProcessor; +import org.apache.uima.cas.CAS; +import org.apache.uima.resource.ResourceSpecifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +public class DirectUimaAsEndpoint implements UimaAsEndpoint { + + private Map<String,UimaAsConsumer> consumers = new ConcurrentHashMap<>(); + private Map<String,UimaAsProducer> producers = new ConcurrentHashMap<>(); + private final MessageProcessor processor; + private final String name; + + public DirectUimaAsEndpoint(MessageProcessor processor, String name) { + this.processor = processor; + this.name = name; + } + public MessageContext createMessage(int command, int messageType, Endpoint endpoint) { + DirectMessage message = + new DirectMessage(). + withCommand(command). + withMessageType(messageType). + withOrigin(processor.getController().getOrigin()); + + MessageContext messageContext = + new DirectMessageContext(message,name,name); + + message.withCommand(command). + withMessageType(messageType); + + if ( command == AsynchAEMessage.GetMeta && messageType == AsynchAEMessage.Response ) { + message.withMetadata(null); +// message.withMetadata(processor.getController().getResourceSpecifier().); + } + + + return messageContext; + } + public void dispatch(MessageContext messageContext) throws Exception { + UimaAsProducer producer; + if ( !producers.containsKey(messageContext.getEndpoint().getDelegateKey())) { + producer = + createProducer((UimaAsConsumer)messageContext.getEndpoint().getDestination(), messageContext.getEndpoint().getDelegateKey()); + } else { + producer = producers.get(messageContext.getEndpoint().getDelegateKey()); + } + producer.dispatch((DirectMessage)messageContext.getRawMessage()); + } + + public UimaAsProducer createProducer(UimaAsConsumer consumer, String delegateKey) throws Exception { + + UimaAsProducer producer = new DirectUimaAsProducer(consumer); + producers.put(delegateKey,producer); + return producer; + } + + public UimaAsProducer createProducer(String targetUri) throws Exception { + + UimaAsProducer producer = new DirectUimaAsProducer(targetUri); + producers.put(targetUri, producer); + return producer; + } + + public UimaAsConsumer createConsumer(String targetUri, ConsumerType type, int consumerThreadCount) throws Exception { + + DirectUimaAsConsumer consumer = new DirectUimaAsConsumer(name, targetUri, type, consumerThreadCount); + consumer.setMessageProcessor(processor); + consumers.put(targetUri+type.name(), consumer); + return consumer; + } + + public UimaAsConsumer getConsumer(String targetUri, ConsumerType type) { + return consumers.get(targetUri+type.name()); + } + + @Override + public void start() throws Exception { + for(Entry<String, UimaAsConsumer> entry : consumers.entrySet()) { + entry.getValue().initialize(processor.getController()); + entry.getValue().start(); + } + for(Entry<String, UimaAsProducer> entry : producers.entrySet()) { + entry.getValue().start(); + } + + } + @Override + public void stop() throws Exception { + for(Entry<String, UimaAsConsumer> entry : consumers.entrySet()) { + entry.getValue().stop(); + } + for(Entry<String, UimaAsProducer> entry : producers.entrySet()) { + entry.getValue().stop(); + } + + } + + public static void main(String[] args) { + try { + MessageProcessor dummyProcessor = + new TestMessageProcessor(null); + //new TestMessageProcessor(new MockUpAnalysisEngineController("MockupClient", 4)); + +// MessageProcessor processor = + // new UimaAsMessageProcessor(null); + //TestMessageProcessor processor = new TestMessageProcessor(null); + + DirectUimaAsEndpoint endpoint = + new DirectUimaAsEndpoint(dummyProcessor, "Client"); + + MockupService service = + endpoint.new MockupService(); + + service.initialize(); + service.start(); + + endpoint.createConsumer("direct:", ConsumerType.GetMeta, 1); + endpoint.createConsumer("direct:", ConsumerType.ProcessCAS, 4); + endpoint.createConsumer("direct:", ConsumerType.Cpc, 1); + + UimaAsProducer producer = + endpoint.createProducer( "direct:serviceA"); + + endpoint.start(); + + DirectMessage getMetaRequestMessage = + new DirectMessage(). + withCommand(AsynchAEMessage.GetMeta). + withMessageType(AsynchAEMessage.Request). + withOrigin(new UimaAsOrigin("Client")). + withReplyDestination(endpoint.getConsumer("direct:", ConsumerType.GetMeta)). + withPayload(AsynchAEMessage.None); + UimaAsConsumer target = + service.getEndpoint().getConsumer("direct:", ConsumerType.GetMeta); + + producer.dispatch(getMetaRequestMessage, target); + + } catch( Exception e) { + e.printStackTrace(); + } + + } + + + private class MockupService { + UimaAsEndpoint endpoint; + UimaAsProducer producer; + + public void initialize() throws Exception { + + MockUpAnalysisEngineController controller = + new MockUpAnalysisEngineController("MockupService",4); + MessageProcessor dummyProcessor = + new TestMessageProcessor(controller); + + endpoint = new DirectUimaAsEndpoint(dummyProcessor, "Service"); + UimaAsConsumer getMetaReplyConsumer = endpoint.createConsumer("direct:", ConsumerType.GetMeta, 1); + UimaAsConsumer processCasReplyConsumer = endpoint.createConsumer("direct:", ConsumerType.ProcessCAS, 4); + UimaAsConsumer cpcReplyConsumer = endpoint.createConsumer("direct:", ConsumerType.Cpc, 1); + + producer = endpoint.createProducer( "direct:serviceA"); + controller.addEndpoint(new UimaAsOrigin("Service"), endpoint); + } + public void process(MessageContext mc) throws Exception { + + } + public void start() throws Exception { + endpoint.start(); + } + public void stop() throws Exception { + endpoint.stop(); + } + public UimaAsEndpoint getEndpoint() { + return endpoint; + } + + + private class ServiceMessageProcessor implements MessageProcessor { + MockupService service; + public ServiceMessageProcessor(MockupService service) { + this.service = service; + } + @Override + public void process(MessageContext message) throws Exception { + DirectMessage request = + (DirectMessage)message.getRawMessage(); + DirectMessage getMetaReply = + new DirectMessage(). + withCommand(AsynchAEMessage.GetMeta). + withMessageType(AsynchAEMessage.Response). + withOrigin(message.getEndpoint().getMessageOrigin()). + withReplyDestination(request.getReplyDestination()). + withPayload(AsynchAEMessage.Metadata); + MessageContext reply = new DirectMessageContext(getMetaReply, "", ""); + service.getEndpoint().dispatch(reply); + + } + @Override + public AnalysisEngineController getController() { + // TODO Auto-generated method stub + return null; + } + + } + } + +} diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.java new file mode 100644 index 0000000..be7c9e3 --- /dev/null +++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.java @@ -0,0 +1,48 @@ +package org.apache.uima.as.connectors.direct; + + +import org.apache.uima.aae.definition.connectors.UimaAsConsumer; +import org.apache.uima.aae.definition.connectors.UimaAsProducer; +import org.apache.uima.aae.message.MessageContext; +import org.apache.uima.aae.message.UimaAsMessage; +import org.apache.uima.as.client.DirectMessage; + +public class DirectUimaAsProducer implements UimaAsProducer{ + + private UimaAsConsumer consumer; + + public DirectUimaAsProducer(String targetUri) { + + } + public DirectUimaAsProducer(UimaAsConsumer consumer) { + this.consumer = consumer; + } + + @Override + public void start() throws Exception { + // TODO Auto-generated method stub + + } + @Override + public void stop() throws Exception { + // TODO Auto-generated method stub + + } + + @Override + public void dispatch(DirectMessage message) throws Exception { + + consumer.consume(message); + + } + @Override + public void dispatch(DirectMessage message, UimaAsConsumer target) throws Exception { + // hand over message to the target consumer + target.consume(message); + + } + public static void main(String[] args) { + // TODO Auto-generated method stub + + } +} diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.java new file mode 100644 index 0000000..c8fa5e5 --- /dev/null +++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.java @@ -0,0 +1,659 @@ +package org.apache.uima.as.connectors.mockup; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; + +import org.apache.uima.UimaContext; +import org.apache.uima.aae.AsynchAECasManager; +import org.apache.uima.aae.InProcessCache; +import org.apache.uima.aae.InputChannel; +import org.apache.uima.aae.OutputChannel; +import org.apache.uima.aae.UimaEEAdminContext; +import org.apache.uima.aae.controller.AnalysisEngineController; +import org.apache.uima.aae.controller.AnalysisEngineInstancePool; +import org.apache.uima.aae.controller.ControllerCallbackListener; +import org.apache.uima.aae.controller.ControllerLatch; +import org.apache.uima.aae.controller.Endpoint; +import org.apache.uima.aae.controller.EventSubscriber; +import org.apache.uima.aae.controller.LocalCache; +import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController; +import org.apache.uima.aae.definition.connectors.UimaAsEndpoint; +import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE; +import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState; +import org.apache.uima.aae.error.AsynchAEException; +import org.apache.uima.aae.error.ErrorContext; +import org.apache.uima.aae.error.ErrorHandlerChain; +import org.apache.uima.aae.jmx.JmxManagement; +import org.apache.uima.aae.jmx.PrimitiveServiceInfo; +import org.apache.uima.aae.jmx.ServiceErrors; +import org.apache.uima.aae.jmx.ServiceInfo; +import org.apache.uima.aae.jmx.ServicePerformance; +import org.apache.uima.aae.message.AsynchAEMessage; +import org.apache.uima.aae.message.MessageContext; +import org.apache.uima.aae.message.Origin; +import org.apache.uima.aae.message.UimaAsOrigin; +import org.apache.uima.aae.monitor.Monitor; +import org.apache.uima.aae.spi.transport.UimaMessageListener; +import org.apache.uima.as.client.DirectInputChannel; +import org.apache.uima.as.client.Listener; +import org.apache.uima.cas.CAS; +import org.apache.uima.resource.ResourceInitializationException; +import org.apache.uima.resource.ResourceSpecifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +public class MockUpAnalysisEngineController implements PrimitiveAnalysisEngineController { + + private String name="N/A"; + private ThreadLocal<Long> threadLocalValue = new ThreadLocal<>(); + private volatile ControllerLatch latch = new ControllerLatch(this); + private CyclicBarrier barrier; + private Map<Origin, UimaAsEndpoint> endpoints = + new HashMap<>(); + private final Origin serviceOrigin; + + public MockUpAnalysisEngineController(String name, int scaleout) { + this.name = name; + serviceOrigin = new UimaAsOrigin(name); + barrier = new CyclicBarrier(scaleout); + } + public Origin getOrigin() { + return serviceOrigin; + } + public void addEndpoint(Origin origin, UimaAsEndpoint endpoint) { + endpoints.put(origin, endpoint); + } + @Override + public void terminate() { + // TODO Auto-generated method stub + + } + + @Override + public void addControllerCallbackListener(ControllerCallbackListener aListener) { + // TODO Auto-generated method stub + + } + + @Override + public void removeControllerCallbackListener(ControllerCallbackListener aListener) { + // TODO Auto-generated method stub + + } + + @Override + public void sendMetadata(Endpoint anEndpoint) throws AsynchAEException { + UimaAsEndpoint endpoint = + endpoints.get(anEndpoint.getMessageOrigin());//anEndpoint.getDelegateKey()); + MessageContext message = endpoint.createMessage(AsynchAEMessage.GetMeta,AsynchAEMessage.Response,anEndpoint); + try { + endpoint.dispatch(message); + } catch( Exception e) { + throw new AsynchAEException(e); + } + + } + + @Override + public ControllerLatch getControllerLatch() { + return latch; + } + + @Override + public void setInputChannel(InputChannel anInputChannel) throws Exception { + // TODO Auto-generated method stub + + } + + @Override + public void setDirectInputChannel(DirectInputChannel anInputChannel) throws Exception { + // TODO Auto-generated method stub + + } + + @Override + public void setJmsInputChannel(InputChannel anInputChannel) throws Exception { + // TODO Auto-generated method stub + + } + + @Override + public InputChannel getInputChannel(ENDPOINT_TYPE et) { + // TODO Auto-generated method stub + return null; + } + + @Override + public InputChannel getInputChannel() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void addInputChannel(InputChannel anInputChannel) throws Exception { + // TODO Auto-generated method stub + + } + + @Override + public String getServiceEndpointName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setServiceId(String name) { + // TODO Auto-generated method stub + + } + + @Override + public String getServiceId() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void handleDelegateLifeCycleEvent(String anEndpoint, int aDelegateCount) { + // TODO Auto-generated method stub + + } + + @Override + public void takeAction(String anAction, String anEndpointName, ErrorContext anErrorContext) { + // TODO Auto-generated method stub + + } + + @Override + public void setThreadFactory(ThreadPoolTaskExecutor factory) { + // TODO Auto-generated method stub + + } + + @Override + public List<Listener> getAllListeners() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void saveReplyTime(long snapshot, String aKey) { + // TODO Auto-generated method stub + + } + + @Override + public long getReplyTime() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Map getStats() { + // TODO Auto-generated method stub + return null; + } + + @Override + public UimaContext getChildUimaContext(String aDelegateEndpointName) throws Exception { + // TODO Auto-generated method stub + return null; + } + + @Override + public void dropCAS(String aCasReferenceId, boolean dropCacheEntry) { + // TODO Auto-generated method stub + + } + + @Override + public void dropCAS(CAS aCAS) { + // TODO Auto-generated method stub + + } + + @Override + public InProcessCache getInProcessCache() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean isPrimitive() { + // TODO Auto-generated method stub + return true; + } + + @Override + public Monitor getMonitor() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getComponentName() { + return name; + } + + @Override + public void collectionProcessComplete(Endpoint anEndpoint) throws AsynchAEException { + // TODO Auto-generated method stub + + } + + @Override + public boolean isTopLevelComponent() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void initialize() throws AsynchAEException { + System.out.println(".....Thread["+Thread.currentThread().getId()+"] "+ getComponentName()+" - Initializing AE"); + } + + @Override + public void process(CAS aCas, String aCasId) { + // TODO Auto-generated method stub + + } + + @Override + public void process(CAS aCAS, String anInputCasReferenceId, String aNewCasReferenceId, + String newCASProducedBy) { + // TODO Auto-generated method stub + + } + + @Override + public void process(CAS aCAS, String aCasReferenceId, Endpoint anEndpoint) { + // TODO Auto-generated method stub + + } + + @Override + public void saveTime(long anArrivalTime, String aCasReferenceId, String anEndpointName) { + // TODO Auto-generated method stub + + } + + @Override + public long getTime(String aCasReferenceId, String anEndpointName) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public ErrorHandlerChain getErrorHandlerChain() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void addOutputChannel(OutputChannel anOutputChannel) throws Exception { + // TODO Auto-generated method stub + + } + + @Override + public OutputChannel getOutputChannel(Endpoint anEndpoint) { + // TODO Auto-generated method stub + return null; + } + + @Override + public OutputChannel getOutputChannel(ENDPOINT_TYPE et) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setCasManager(AsynchAECasManager aCasManager) { + // TODO Auto-generated method stub + + } + + @Override + public AsynchAECasManager getCasManagerWrapper() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void stop() { + // TODO Auto-generated method stub + + } + + @Override + public boolean isStopped() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void setStopped() { + // TODO Auto-generated method stub + + } + + @Override + public void dropStats(String aCasReferenceId, String anEndpointName) { + // TODO Auto-generated method stub + + } + + @Override + public void setUimaEEAdminContext(UimaEEAdminContext anAdminContext) { + // TODO Auto-generated method stub + + } + + @Override + public UimaEEAdminContext getUimaEEAdminContext() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getJMXDomain() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getIndex() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public String getJmxContext() { + // TODO Auto-generated method stub + return null; + } + + @Override + public ServicePerformance getServicePerformance() { + // TODO Auto-generated method stub + return null; + } + + @Override + public PrimitiveServiceInfo getServiceInfo() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void addServiceInfo(ServiceInfo aServiceInfo) { + // TODO Auto-generated method stub + + } + + @Override + public ServiceErrors getServiceErrors() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setDeployDescriptor(String aDeployDescriptor) { + // TODO Auto-generated method stub + + } + + @Override + public void cacheClientEndpoint(Endpoint anEndpoint) { + // TODO Auto-generated method stub + + } + + @Override + public Endpoint getClientEndpoint() { + // TODO Auto-generated method stub + return null; + } + + @Override + public EventSubscriber getEventListener() { + // TODO Auto-generated method stub + return null; + } + + @Override + public JmxManagement getManagementInterface() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void notifyListenersWithInitializationStatus(Exception e) { + // TODO Auto-generated method stub + + } + + @Override + public ServicePerformance getCasStatistics(String aCasReferenceId) { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean isCasMultiplier() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void releaseNextCas(String aCasReferenceId) { + // TODO Auto-generated method stub + + } + + @Override + public long getIdleTime() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void beginProcess(int msgType) { + // TODO Auto-generated method stub + + } + + @Override + public void endProcess(int msgType) { + // TODO Auto-generated method stub + + } + + @Override + public long getIdleTimeBetweenProcessCalls(int msgType) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long getCpuTime() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long getAnalysisTime() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void incrementSerializationTime(long cpuTime) { + // TODO Auto-generated method stub + + } + + @Override + public void incrementDeserializationTime(long cpuTime) { + // TODO Auto-generated method stub + + } + + @Override + public void onInitialize() { + // TODO Auto-generated method stub + + } + + @Override + public UimaMessageListener getUimaMessageListener(String aDelegateKey) { + // TODO Auto-generated method stub + return null; + } + + @Override + public LocalCache getLocalCache() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void registerVmQueueWithJMX(Object o, String aName) throws Exception { + // TODO Auto-generated method stub + + } + + @Override + public AnalysisEngineController getParentController() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void addAbortedCasReferenceId(String aCasReferenceId) { + // TODO Auto-generated method stub + + } + + @Override + public boolean isAwaitingCacheCallbackNotification() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void quiesceAndStop() { + // TODO Auto-generated method stub + + } + + @Override + public void forceTimeoutOnPendingCases(String key) { + // TODO Auto-generated method stub + + } + + @Override + public void changeState(ServiceState state) { + // TODO Auto-generated method stub + + } + + @Override + public ServiceState getState() { + // TODO Auto-generated method stub + return ServiceState.INITIALIZING; + } + + @Override + public Map<String, String> getDeadClientMap() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getKey() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void dumpState(StringBuffer buffer, String lbl1) { + // TODO Auto-generated method stub + + } + + @Override + public String getPID() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void warmUp(String warmUpDataPath, CountDownLatch warmUpLatch) throws Exception { + // TODO Auto-generated method stub + + } + + @Override + public UimaContext getUimaContext() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void addUimaObject(String objectName) throws Exception { + // TODO Auto-generated method stub + + } + + @Override + public void setErrorHandlerChain(ErrorHandlerChain ehc) { + // TODO Auto-generated method stub + + } + + @Override + public ResourceSpecifier getResourceSpecifier() { + // TODO Auto-generated method stub + return null; + } + @Override + public void setAnalysisEngineInstancePool(AnalysisEngineInstancePool aPool) { + // TODO Auto-generated method stub + + } + + @Override + public int getAEInstanceCount() { + // TODO Auto-generated method stub + return 0; + } + @Override + public void initializeAnalysisEngine() throws ResourceInitializationException { + threadLocalValue.set(Thread.currentThread().getId()); + try { + barrier.await(); + } catch( InterruptedException | BrokenBarrierException e) { + + } + getControllerLatch().release(); + + } + @Override + public boolean threadAssignedToAE() { + return Objects.nonNull(threadLocalValue.get()); + } + +} \ No newline at end of file diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/TestMessageProcessor.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/TestMessageProcessor.java new file mode 100644 index 0000000..e700b00 --- /dev/null +++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/TestMessageProcessor.java @@ -0,0 +1,28 @@ +package org.apache.uima.as.connectors.mockup; + +import org.apache.uima.aae.controller.AnalysisEngineController; +import org.apache.uima.aae.message.MessageContext; +import org.apache.uima.aae.message.MessageProcessor; +import org.apache.uima.aae.service.command.CommandFactory; +import org.apache.uima.aae.service.command.UimaAsCommand; + +public class TestMessageProcessor implements MessageProcessor { + AnalysisEngineController controller; + + public TestMessageProcessor(AnalysisEngineController ctlr) { + this.controller = ctlr; + } + @Override + public void process(MessageContext message) throws Exception { + UimaAsCommand command = + CommandFactory.newCommand(message, controller); + command.execute(); + } + + @Override + public AnalysisEngineController getController() { + // TODO Auto-generated method stub + return controller; + } + +} \ No newline at end of file diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConnector.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConnector.class new file mode 100644 index 0000000..e0b179d Binary files /dev/null and b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConnector.class differ diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1$1.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1$1.class new file mode 100644 index 0000000..cad42cc Binary files /dev/null and b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1$1.class differ diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1.class new file mode 100644 index 0000000..61330b1 Binary files /dev/null and b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1.class differ diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$DirectListenerCallback.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$DirectListenerCallback.class new file mode 100644 index 0000000..77468d9 Binary files /dev/null and b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$DirectListenerCallback.class differ diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.class new file mode 100644 index 0000000..1c96817 Binary files /dev/null and b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.class differ diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService$ServiceMessageProcessor.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService$ServiceMessageProcessor.class new file mode 100644 index 0000000..3c89bb3 Binary files /dev/null and b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService$ServiceMessageProcessor.class differ diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService.class new file mode 100644 index 0000000..60cb1e6 Binary files /dev/null and b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService.class differ diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.class new file mode 100644 index 0000000..3754d56 Binary files /dev/null and b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.class differ diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.class new file mode 100644 index 0000000..158bd7c Binary files /dev/null and b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.class differ diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.class new file mode 100644 index 0000000..4ca6681 Binary files /dev/null and b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.class differ diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/TestMessageProcessor.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/TestMessageProcessor.class new file mode 100644 index 0000000..93bd910 Binary files /dev/null and b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/TestMessageProcessor.class differ