This is an automated email from the ASF dual-hosted git repository.
seanfinan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ctakes.git
The following commit(s) were added to refs/heads/main by this push:
new 42eaef6 bat and sh scripts: simple reformatting PropertyAeFactory:
Substitute parameter values for parts of values. pbj python: add exception
handling, move daemon thread to pbj_pipeline from stomp_receiver
42eaef6 is described below
commit 42eaef60aa66445597b4f2db6d6408af02e0975a
Author: Sean Finan <[email protected]>
AuthorDate: Wed Jan 29 13:08:04 2025 -0500
bat and sh scripts: simple reformatting
PropertyAeFactory: Substitute parameter values for parts of values.
pbj python: add exception handling, move daemon thread to pbj_pipeline from
stomp_receiver
---
.../apache/ctakes/core/util/PropertyAeFactory.java | 38 +++++++++-
.../src/main/bin/getUmlsDictionary.bat | 6 +-
.../src/main/bin/getUmlsDictionary.sh | 5 +-
.../src/main/bin/runClinicalPipeline.bat | 4 --
.../src/main/bin/runClinicalPipeline.sh | 4 +-
.../src/main/bin/runDictionaryCreator.bat | 6 +-
.../src/main/bin/runDictionaryCreator.sh | 8 +--
.../src/main/bin/runPiperCreator.bat | 3 -
.../src/main/bin/runPiperCreator.sh | 6 +-
ctakes-distribution/src/main/bin/runPiperFile.bat | 8 +--
ctakes-distribution/src/main/bin/runPiperFile.sh | 9 +--
ctakes-distribution/src/main/bin/runPiperGUI.bat | 4 --
ctakes-distribution/src/main/bin/runPiperGUI.sh | 4 +-
.../src/main/bin/runPiperSubmitter.bat | 3 -
.../src/main/bin/runPiperSubmitter.sh | 3 +-
ctakes-distribution/src/main/bin/setenv.bat | 5 +-
ctakes-distribution/src/main/bin/setenv.sh | 6 +-
.../ctakes_cnlpt_py/pipeline/CnlptNegation.piper | 7 +-
.../src/ctakes_pbj/component/cas_annotator.py | 4 ++
.../src/ctakes_pbj/component/collection_reader.py | 8 +++
.../src/ctakes_pbj/component/pbj_receiver.py | 36 ++++++++--
.../src/ctakes_pbj/component/pbj_sender.py | 14 ++--
.../src/ctakes_pbj/pbj_tools/arg_parser.py | 3 +-
.../src/ctakes_pbj/pbj_tools/pbj_defaults.py | 13 ++++
.../src/ctakes_pbj/pbj_tools/stomp_receiver.py | 84 +++++++++++++++++-----
.../src/ctakes_pbj/pipeline/pbj_pipeline.py | 73 +++++++++++++++++--
26 files changed, 266 insertions(+), 98 deletions(-)
diff --git
a/ctakes-core/src/main/java/org/apache/ctakes/core/util/PropertyAeFactory.java
b/ctakes-core/src/main/java/org/apache/ctakes/core/util/PropertyAeFactory.java
index e6392a5..ce03060 100644
---
a/ctakes-core/src/main/java/org/apache/ctakes/core/util/PropertyAeFactory.java
+++
b/ctakes-core/src/main/java/org/apache/ctakes/core/util/PropertyAeFactory.java
@@ -148,7 +148,7 @@ public enum PropertyAeFactory {
return createParameters( parameterMap );
}
- static public Object subVariableParameter( final Object parameterValue,
final Map<String,Object> parameterMap ) {
+ static public Object subVariableParameterFull( final Object parameterValue,
final Map<String,Object> parameterMap ) {
if ( parameterMap == null || !(parameterValue instanceof String) ) {
return parameterValue;
}
@@ -171,6 +171,42 @@ public enum PropertyAeFactory {
return subValue;
}
+ static private String subVarParms( final String valueText, final
Map<String,Object> parameterMap ) {
+ int lastIndex = valueText.lastIndexOf( '$' );
+ if ( lastIndex < 0 ) {
+ return valueText;
+ }
+ final String preText = subVarParms( valueText.substring( 0, lastIndex
), parameterMap );
+ if ( lastIndex == valueText.length()-1 ) {
+ return preText;
+ }
+ final String text = valueText.substring( lastIndex+1 );
+ String varName = text;
+ for ( int i = text.length(); i>0; i-- ) {
+ varName = text.substring( 0, i );
+ Object subValue = parameterMap.get( varName );
+ if ( subValue == null ) {
+ // Check for an environment variable.
+ subValue = System.getenv( varName );
+ }
+ if ( subValue != null ) {
+ return preText + subValue + text.substring( i );
+ }
+ }
+// LOGGER.warn( "No value for unknown substitution variable ${}",
varName );
+// return parameterValue;
+ return preText + "$" + text;
+ }
+
+ static public Object subVariableParameter( final Object parameterValue,
final Map<String,Object> parameterMap ) {
+ if ( parameterMap == null || !(parameterValue instanceof String) ) {
+ return parameterValue;
+ }
+ final String textValue = parameterValue.toString();
+ return subVarParms( textValue, parameterMap );
+ }
+
+
/**
* @param readerClass Collection Reader class
* @param parameters parameters for the main component
diff --git a/ctakes-distribution/src/main/bin/getUmlsDictionary.bat
b/ctakes-distribution/src/main/bin/getUmlsDictionary.bat
index d46e2d1..65cccc5 100644
--- a/ctakes-distribution/src/main/bin/getUmlsDictionary.bat
+++ b/ctakes-distribution/src/main/bin/getUmlsDictionary.bat
@@ -1,5 +1,4 @@
@ECHO OFF
-::
:: Licensed to the Apache Software Foundation (ASF) under one
:: or more contributor license agreements. See the NOTICE file
:: distributed with this work for additional information
@@ -17,13 +16,10 @@
:: specific language governing permissions and limitations
:: under the License.
-
:: Starts a simple GUI to fetch the cTAKES umls (snomed, rxnorm) dictionary.
-
:: Requires Java 17
-
-@REM The setenv script sets up the environment needed by cTAKES.
+:: The setenv script sets up the environment needed by cTAKES.
@call %~sdp0\setenv.bat
cd %CTAKES_HOME%
diff --git a/ctakes-distribution/src/main/bin/getUmlsDictionary.sh
b/ctakes-distribution/src/main/bin/getUmlsDictionary.sh
index ac76604..5882a39 100644
--- a/ctakes-distribution/src/main/bin/getUmlsDictionary.sh
+++ b/ctakes-distribution/src/main/bin/getUmlsDictionary.sh
@@ -1,5 +1,5 @@
#!/bin/sh
-#
+
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -17,12 +17,9 @@
# specific language governing permissions and limitations
# under the License.
-
# Starts a simple GUI to fetch the cTAKES umls (snomed, rxnorm) dictionary.
-
# Requires Java 17
-
# Sets up environment for cTAKES
. ${HOME}/setenv.sh
diff --git a/ctakes-distribution/src/main/bin/runClinicalPipeline.bat
b/ctakes-distribution/src/main/bin/runClinicalPipeline.bat
index da2c971..9f58976 100644
--- a/ctakes-distribution/src/main/bin/runClinicalPipeline.bat
+++ b/ctakes-distribution/src/main/bin/runClinicalPipeline.bat
@@ -1,5 +1,4 @@
@ECHO OFF
-::
:: Licensed to the Apache Software Foundation (ASF) under one
:: or more contributor license agreements. See the NOTICE file
:: distributed with this work for additional information
@@ -17,7 +16,6 @@
:: specific language governing permissions and limitations
:: under the License.
-
:: Runs the default clinical pipeline with provided parameters.
:: Required parameters are:
:: -i , --inputDir {inputDirectory}
@@ -29,10 +27,8 @@
:: --xmiOut {xmiOutputDirectory} (if different from -o)
:: -l , --lookupXml {dictionaryConfigFile} (fast only)
:: -? , --help
-
:: Requires Java 17
-
:: The setenv script sets up the environment needed by cTAKES.
@call %~sdp0\setenv.bat
diff --git a/ctakes-distribution/src/main/bin/runClinicalPipeline.sh
b/ctakes-distribution/src/main/bin/runClinicalPipeline.sh
index b69732f..b35b3b9 100644
--- a/ctakes-distribution/src/main/bin/runClinicalPipeline.sh
+++ b/ctakes-distribution/src/main/bin/runClinicalPipeline.sh
@@ -1,5 +1,5 @@
#!/bin/sh
-#
+
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -28,10 +28,8 @@
# --xmiOut {xmiOutputDirectory} (if different from -o)
# -l , --lookupXml {dictionaryConfigFile} (fast only)
# -? , --help
-
# Requires Java 17
-
# Sets up environment for cTAKES
. ${HOME}/setenv.sh
diff --git a/ctakes-distribution/src/main/bin/runDictionaryCreator.bat
b/ctakes-distribution/src/main/bin/runDictionaryCreator.bat
index ec73443..f8ea0ab 100644
--- a/ctakes-distribution/src/main/bin/runDictionaryCreator.bat
+++ b/ctakes-distribution/src/main/bin/runDictionaryCreator.bat
@@ -1,4 +1,3 @@
-::
:: Licensed to the Apache Software Foundation (ASF) under one
:: or more contributor license agreements. See the NOTICE file
:: distributed with this work for additional information
@@ -15,10 +14,11 @@
:: KIND, either express or implied. See the License for the
:: specific language governing permissions and limitations
:: under the License.
-::
-:: Starts a GUI that can facilitate creation of a dictionary for the
+:: Starts a GUI that can facilitate creation of a dictionary.
+:: The created dictionary contains SQL that can be used by the
:: [ctakes-dictionary-lookup-fast](ctakes-dictionary-lookup-fast) module.
+:: Requires Java 17
:: The setenv script sets up the environment needed by cTAKES.
@call %~sdp0\setenv.bat
diff --git a/ctakes-distribution/src/main/bin/runDictionaryCreator.sh
b/ctakes-distribution/src/main/bin/runDictionaryCreator.sh
index ae36710..b3bf1c2 100644
--- a/ctakes-distribution/src/main/bin/runDictionaryCreator.sh
+++ b/ctakes-distribution/src/main/bin/runDictionaryCreator.sh
@@ -1,4 +1,5 @@
-#
+#!/bin/sh
+
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -16,11 +17,10 @@
# specific language governing permissions and limitations
# under the License.
#
-# Starts a GUI that can facilitate creation of a dictionary for the
+# Starts a GUI that can facilitate creation of a dictionary.
+# The created dictionary contains SQL that can be used by the
# [ctakes-dictionary-lookup-fast](ctakes-dictionary-lookup-fast) module.
-#
# Requires Java 17
-#
# Sets up environment for cTAKES
. ${HOME}/setenv.sh
diff --git a/ctakes-distribution/src/main/bin/runPiperCreator.bat
b/ctakes-distribution/src/main/bin/runPiperCreator.bat
index 9bb7c8d..7a64cb3 100644
--- a/ctakes-distribution/src/main/bin/runPiperCreator.bat
+++ b/ctakes-distribution/src/main/bin/runPiperCreator.bat
@@ -1,4 +1,3 @@
-::
:: Licensed to the Apache Software Foundation (ASF) under one
:: or more contributor license agreements. See the NOTICE file
:: distributed with this work for additional information
@@ -17,10 +16,8 @@
:: under the License.
:: Starts a GUI that can facilitate creation of a pipeline.
-
:: Requires Java 17
-
:: The setenv script sets up the environment needed by cTAKES.
@call %~sdp0\setenv.bat
diff --git a/ctakes-distribution/src/main/bin/runPiperCreator.sh
b/ctakes-distribution/src/main/bin/runPiperCreator.sh
index 311a6fb..ecb47a0 100644
--- a/ctakes-distribution/src/main/bin/runPiperCreator.sh
+++ b/ctakes-distribution/src/main/bin/runPiperCreator.sh
@@ -1,4 +1,5 @@
-#
+#!/bin/sh
+
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -16,12 +17,9 @@
# specific language governing permissions and limitations
# under the License.
-
# Starts a GUI that can facilitate creation of a pipeline.
-
# Requires Java 17
-
# Sets up environment for cTAKES
. ${HOME}/setenv.sh
diff --git a/ctakes-distribution/src/main/bin/runPiperFile.bat
b/ctakes-distribution/src/main/bin/runPiperFile.bat
index 42dc2f4..ffc2768 100644
--- a/ctakes-distribution/src/main/bin/runPiperFile.bat
+++ b/ctakes-distribution/src/main/bin/runPiperFile.bat
@@ -1,5 +1,4 @@
@ECHO OFF
-::
:: Licensed to the Apache Software Foundation (ASF) under one
:: or more contributor license agreements. See the NOTICE file
:: distributed with this work for additional information
@@ -17,9 +16,8 @@
:: specific language governing permissions and limitations
:: under the License.
-
-:: Runs the pipeline in the piper file specified by -p {piperfile}
-:: with any other provided parameters. Standard parameters are:
+:: Runs the pipeline in the piper file specified by -p {piperfile} with
provided parameters.
+:: Standard parameters are:
:: -i , --inputDir {inputDirectory}
:: -o , --outputDir {outputDirectory}
:: -s , --subDir {subDirectory} (for i/o)
@@ -35,10 +33,8 @@
:: cli PARAGRAPH_TYPES_PATH=t
:: and when executing this script use:
:: runPiperFile -p path/to/my/custom.piper -t path/to/my/custom.bsv ...
-
:: Requires Java 17
-
:: The setenv script sets up the environment needed by cTAKES.
@call %~sdp0\setenv.bat
diff --git a/ctakes-distribution/src/main/bin/runPiperFile.sh
b/ctakes-distribution/src/main/bin/runPiperFile.sh
index 0bda7db..44f71ad 100644
--- a/ctakes-distribution/src/main/bin/runPiperFile.sh
+++ b/ctakes-distribution/src/main/bin/runPiperFile.sh
@@ -1,5 +1,5 @@
#!/bin/sh
-#
+
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -17,9 +17,8 @@
# specific language governing permissions and limitations
# under the License.
-
-# Runs the pipeline in the piper file specified by -p (piperfile)
-# with any other provided parameters. Standard parameters are:
+# Runs the pipeline in the piper file specified by -p {piperfile} with
provided parameters.
+# Standard parameters are:
# -i , --inputDir {inputDirectory}
# -o , --outputDir {outputDirectory}
# -s , --subDir {subDirectory} (for i/o)
@@ -35,10 +34,8 @@
# cli PARAGRAPH_TYPES_PATH=t
# and when executing this script use:
# runPiperFile -p path/to/my/custom.piper -t path/to/my/custom.bsv ...
-
# Requires Java 17
-
# Sets up environment for cTAKES
. ${HOME}/setenv.sh
diff --git a/ctakes-distribution/src/main/bin/runPiperGUI.bat
b/ctakes-distribution/src/main/bin/runPiperGUI.bat
index 8fe6583..a1aafe3 100644
--- a/ctakes-distribution/src/main/bin/runPiperGUI.bat
+++ b/ctakes-distribution/src/main/bin/runPiperGUI.bat
@@ -1,5 +1,4 @@
@ECHO OFF
-::
:: Licensed to the Apache Software Foundation (ASF) under one
:: or more contributor license agreements. See the NOTICE file
:: distributed with this work for additional information
@@ -17,12 +16,9 @@
:: specific language governing permissions and limitations
:: under the License.
-
:: Starts a GUI that can run a pipeline.
-
:: Requires Java 17
-
:: The setenv script sets up the environment needed by cTAKES.
@call %~sdp0\setenv.bat
diff --git a/ctakes-distribution/src/main/bin/runPiperGUI.sh
b/ctakes-distribution/src/main/bin/runPiperGUI.sh
index 89bb75d..3f8ba68 100644
--- a/ctakes-distribution/src/main/bin/runPiperGUI.sh
+++ b/ctakes-distribution/src/main/bin/runPiperGUI.sh
@@ -1,5 +1,5 @@
#!/bin/sh
-#
+
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -18,10 +18,8 @@
# under the License.
# Starts a GUI that can run a pipeline.
-
# Requires Java 17
-
# Sets up environment for cTAKES
. ${HOME}/setenv.sh
diff --git a/ctakes-distribution/src/main/bin/runPiperSubmitter.bat
b/ctakes-distribution/src/main/bin/runPiperSubmitter.bat
index 0dc7d35..1301a54 100644
--- a/ctakes-distribution/src/main/bin/runPiperSubmitter.bat
+++ b/ctakes-distribution/src/main/bin/runPiperSubmitter.bat
@@ -1,5 +1,4 @@
@ECHO OFF
-::
:: Licensed to the Apache Software Foundation (ASF) under one
:: or more contributor license agreements. See the NOTICE file
:: distributed with this work for additional information
@@ -19,10 +18,8 @@
:: Starts a GUI that can run a pipeline.
:: Deprecated: Identical to the runPiperGUI script.
-
:: Requires Java 17
-
:: The setenv script sets up the environment needed by cTAKES.
@call %~sdp0\setenv.bat
diff --git a/ctakes-distribution/src/main/bin/runPiperSubmitter.sh
b/ctakes-distribution/src/main/bin/runPiperSubmitter.sh
index b22a4a2..def4489 100644
--- a/ctakes-distribution/src/main/bin/runPiperSubmitter.sh
+++ b/ctakes-distribution/src/main/bin/runPiperSubmitter.sh
@@ -1,5 +1,5 @@
#!/bin/sh
-#
+
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -19,7 +19,6 @@
# Starts a GUI that can run a pipeline.
# Deprecated: Identical to the runPiperGUI script.
-
# Requires Java 17
# Sets up environment for cTAKES
diff --git a/ctakes-distribution/src/main/bin/setenv.bat
b/ctakes-distribution/src/main/bin/setenv.bat
index 33c4c96..ce0356a 100644
--- a/ctakes-distribution/src/main/bin/setenv.bat
+++ b/ctakes-distribution/src/main/bin/setenv.bat
@@ -1,5 +1,4 @@
@ECHO OFF
-::
:: Licensed to the Apache Software Foundation (ASF) under one
:: or more contributor license agreements. See the NOTICE file
:: distributed with this work for additional information
@@ -17,12 +16,10 @@
:: specific language governing permissions and limitations
:: under the License.
-
:: Sets up the standard environment for cTAKES.
-
+:: This script is called by cTAKES run scripts.
:: Requires Java 17
-
:: Guess CTAKES_HOME if not defined
set CURRENT_DIR=%cd%
if not "%CTAKES_HOME%" == "" goto gotHome
diff --git a/ctakes-distribution/src/main/bin/setenv.sh
b/ctakes-distribution/src/main/bin/setenv.sh
index 9db1d1e..0bf54b9 100644
--- a/ctakes-distribution/src/main/bin/setenv.sh
+++ b/ctakes-distribution/src/main/bin/setenv.sh
@@ -1,5 +1,5 @@
#!/bin/sh
-#
+
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -17,12 +17,10 @@
# specific language governing permissions and limitations
# under the License.
-
# Sets up the standard environment for cTAKES.
-
+# This script is called by cTAKES run scripts.
# Requires Java 17
-
PRG="$0"
while [ -h "$PRG" ]; do
ls=`ls -ld "$PRG"`
diff --git
a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/pipeline/CnlptNegation.piper
b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/pipeline/CnlptNegation.piper
index 5e298e7..7b18df5 100644
---
a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/pipeline/CnlptNegation.piper
+++
b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/pipeline/CnlptNegation.piper
@@ -8,7 +8,7 @@
# That instance of cTAKES will run the third and final bit of the entire PBJ
pipeline.
#
# This piper will then launch a python PBJ pipeline that runs the negation
model from the external project
-# Clinical NLP Transformers (cnlp_transformers), which can be found at
+# Clinical NLP Transformers (cnlp_transformers , cnlpt), which can be found at
# https://github.com/Machine-Learning-for-Medical-Language/cnlp_transformers
#
@@ -25,6 +25,11 @@
# By default ctakes-pbj will be pip ed at the beginning of a run. You can
turn this off with:
# --pipPbj no
#
+# A non-standard command-line option is the specification of whether or not
to pip the cnlpt package.
+# By default cnlpt will be pip ed at the beginning of a run. You can turn
this off with:
+# ++pipCnlpt no
+# If used, that must be added at the end of your command.
+#
// Sets up required parameters, starts your Artemis Broker, pips the PBJ
project.
load PbjStarter
diff --git
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
index 505a399..5ff68e8 100644
---
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
+++
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
@@ -23,3 +23,7 @@ class CasAnnotator(ABC):
# Called once at the end of the pipeline.
def collection_process_complete(self):
pass
+
+ # Called when an exception is thrown.
+ def handle_exception(self, thrower, exceptable, initializing=False):
+ pass
diff --git
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py
index 3ae3de7..cb4e58d 100644
---
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py
+++
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py
@@ -24,3 +24,11 @@ class CollectionReader(ABC):
@abstractmethod
def start(self):
pass
+
+ # Called to stop reading.
+ def stop(self):
+ pass
+
+ # Called when an exception is thrown.
+ def handle_exception(self):
+ pass
diff --git
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
index 28a8883..0b4d7b3 100644
---
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
+++
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
@@ -1,7 +1,11 @@
import sys
+import time
+import threading
from ctakes_pbj.component.collection_reader import CollectionReader
from ctakes_pbj.pbj_tools import pbj_defaults
-from ctakes_pbj.pbj_tools.stomp_receiver import start_receiver
+# from ctakes_pbj.pbj_tools.stomp_receiver import start_receiver
+# from ctakes_pbj.pbj_tools.stomp_receiver import stop_receiver
+from ctakes_pbj.pbj_tools.stomp_receiver import StompReceiver
class PBJReceiver(CollectionReader):
@@ -22,7 +26,7 @@ class PBJReceiver(CollectionReader):
# Called once at the build of a pipeline.
def declare_params(self, arg_parser):
- arg_parser.add_arg('-rq', '--receive_queue')
+ arg_parser.add_arg('-rq', '--receive_queue',
default=pbj_defaults.get_default_rcv_q())
arg_parser.add_arg('-rh', '--receive_host',
default=pbj_defaults.DEFAULT_HOST)
arg_parser.add_arg('-rpt', '--receive_port',
default=pbj_defaults.DEFAULT_PORT)
arg_parser.add_arg('-ru', '--receive_user',
default=pbj_defaults.DEFAULT_USER)
@@ -44,6 +48,30 @@ class PBJReceiver(CollectionReader):
# Called start reading cas objects and pass them to the pipeline.
def start(self):
if not self.receiving:
+ # start_receiver(self.pipeline, self.queue, self.host, self.port,
+ # self.password, self.username)
self.receiving = True
- start_receiver(self.pipeline, self.queue, self.host, self.port,
- self.password, self.username)
+ self.stomp_receiver = StompReceiver(self.pipeline, self.queue,
self.host, self.port,
+ self.password, self.username,
r_id='1')
+ # stomp_thread =
threading.Thread(target=start_receiver(self.stomp_receiver))
+ # stomp_thread.start()
+ # start_receiver(self.stomp_receiver)
+ self.stomp_receiver.start_receiver()
+
+ # Called to stop reading.
+ def stop(self):
+ print(time.ctime(), "PBJ Receiver: Stopping Stomp receiver ...")
+ self.stomp_receiver.stop_receiver()
+
+ # Called when an exception is thrown.
+ def handle_exception(self, thrower, exceptable, initializing=False):
+ if self.receiving:
+ # self.stop()
+ # print(time.ctime(), "PBJ Receiver: Stopping Stomp receiver ...")
+ # stop_receiver()
+ # self.stomp_receiver.set_stop(True)
+ self.receiving = False
+ self.stomp_receiver.handle_exception()
+ # Stop stomp in a background thread, just in case it is slow to
react.
+ # stop_thread =
threading.Thread(target=self.stomp_receiver.handle_exception)
+ # stop_thread.start()
diff --git
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
index 87a6295..94649d0 100644
---
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
+++
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
@@ -19,7 +19,7 @@ class PBJSender(cas_annotator.CasAnnotator):
# Called once at the build of a pipeline.
def declare_params(self, arg_parser):
# arg_parser.add_arg('send_queue')
- arg_parser.add_arg('-sq', '--send_queue', default=DEFAULT_HOST)
+ arg_parser.add_arg('-sq', '--send_queue', default=get_default_send_q())
arg_parser.add_arg('-sh', '--send_host', default=DEFAULT_HOST)
arg_parser.add_arg('-spt', '--send_port', default=DEFAULT_PORT)
arg_parser.add_arg('-su', '--send_user', default=DEFAULT_USER)
@@ -40,7 +40,7 @@ class PBJSender(cas_annotator.CasAnnotator):
# Called once at the beginning of a pipeline.
def initialize(self):
- print(time.ctime((time.time())), "Starting PBJ Sender on", self.host,
self.queue, "...")
+ print(time.ctime(), "Starting PBJ Sender on", self.host, self.queue,
"...")
# Use a heartbeat of 10 minutes (in milliseconds)
self.conn = stomp.Connection12([(self.host, self.port)],
keepalive=True, heartbeats=(600000,
600000))
@@ -48,7 +48,7 @@ class PBJSender(cas_annotator.CasAnnotator):
# Called for every cas passed through the pipeline.
def process(self, cas):
- print(time.ctime((time.time())), "Sending processed information to",
+ print(time.ctime(), "Sending processed information to",
self.host, self.queue, "...")
xmi = cas.to_xmi()
self.conn.send(self.queue, xmi)
@@ -57,14 +57,18 @@ class PBJSender(cas_annotator.CasAnnotator):
def collection_process_complete(self):
self.send_stop()
+ # Called when an exception is thrown.
+ def handle_exception(self, thrower, exceptable, initializing=False):
+ self.send_stop()
+
def send_text(self, text):
self.conn.send(self.queue, text)
def send_stop(self):
- print(time.ctime((time.time())), "Sending Stop code to", self.host,
self.queue, "...")
+ print(time.ctime(), "Sending Stop code to", self.host, self.queue,
"...", flush=True)
self.conn.send(self.queue, STOP_MESSAGE)
self.conn.disconnect()
- print(time.ctime((time.time())), "Disconnected PBJ Sender on",
self.host, self.queue)
+ print(time.ctime(), "Disconnected PBJ Sender on", self.host,
self.queue)
def set_host(self, host_name):
self.host = host_name
diff --git
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
index f10a37f..eaa0b65 100644
---
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
+++
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
@@ -8,7 +8,6 @@ class ArgParser:
def get_arg_parser(self):
if self.arg_parser is None:
- print('Creating arg_parser')
self.arg_parser = argparse.ArgumentParser(
prog='ctakes-pbj',
description='Does wonderful stuff...',
@@ -20,5 +19,5 @@ class ArgParser:
self.get_arg_parser().add_argument(*args, **kwargs)
def get_args(self):
- print('Parsing Arguments')
+ print('Parsing Arguments ...')
return self.get_arg_parser().parse_args()
diff --git
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py
index 6e8acb8..9fe4ec1 100644
---
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py
+++
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py
@@ -1,6 +1,19 @@
+import sys
+import os.path
+
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 61616
DEFAULT_USER = 'guest'
DEFAULT_PASS = 'guest'
DEFAULT_OUT_DIR = 'pbj_output/'
STOP_MESSAGE = "Apache cTAKES PBJ Stop Message."
+
+
+# The default receive queue is the name of the script with the prefix "to_"
+def get_default_rcv_q():
+ return "to_" + os.path.splitext(os.path.basename(sys.argv[0]))[0]
+
+
+# The default send queue is the name of the script with the prefix "from_"
+def get_default_send_q():
+ return "from_" + os.path.splitext(os.path.basename(sys.argv[0]))[0]
diff --git
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py
index afa3e9a..d5a989a 100644
---
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py
+++
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py
@@ -1,4 +1,5 @@
import time
+import threading
from threading import Event
import stomp
@@ -6,19 +7,43 @@ import stomp
from ctakes_pbj.pbj_tools.pbj_defaults import STOP_MESSAGE
from ctakes_pbj.type_system.type_system_loader import *
-exit_event = Event()
+# exit_event = Event()
-def start_receiver(pipeline, queue_name, host_name, port_name,
- password, username, r_id='1'):
- StompReceiver(pipeline, queue_name, host_name, port_name, password,
username, r_id)
- while not exit_event.is_set():
- exit_event.wait()
+
+# def start_receiver(pipeline, queue_name, host_name, port_name,
+# password, username, r_id='1'):
+# StompReceiver(pipeline, queue_name, host_name, port_name, password,
username, r_id)
+# while not exit_event.is_set():
+# exit_event.wait()
+
+# def start_receiver():
+# while not exit_event.is_set():
+# exit_event.wait()
+
+# def start_receiver(stomp_receiver):
+# stomp_receiver.start_receiver()
+# while not exit_event.is_set():
+# exit_event.wait()
+# wait_thread = threading.Thread(target=run_receiver(stomp_receiver))
+# wait_thread.start()
+
+
+# def run_receiver(stomp_receiver):
+# stomp_receiver.start_receiver()
+# while not exit_event.is_set():
+# exit_event.wait()
+
+
+# def stop_receiver():
+# exit_event.set()
class StompReceiver(stomp.ConnectionListener):
def __init__(self, pipeline, queue_name, host_name, port_name, password,
username, r_id):
+ self.stop = False
+ self.conn = None
self.source_queue = queue_name
self.source_host = host_name
self.source_port = port_name
@@ -27,7 +52,17 @@ class StompReceiver(stomp.ConnectionListener):
self.username = username
self.r_id = r_id
self.typesystem = None
- print(time.ctime((time.time())), "Starting Stomp Receiver on",
self.source_host, self.source_queue, "...")
+ self.completed = False
+ # print(time.ctime((time.time())), "Starting Stomp Receiver on",
self.source_host, self.source_queue, "...")
+ # # Use a heartbeat of 10 minutes (in milliseconds)
+ # self.conn = stomp.Connection12([(self.source_host,
self.source_port)],
+ # keepalive=True, heartbeats=(600000,
600000))
+ # self.conn.set_listener('Stomp_Receiver', self)
+ # self.stop = False
+ # self.__connect_and_subscribe()
+
+ def start_receiver(self):
+ print(time.ctime(), "Starting Stomp Receiver on", self.source_host,
self.source_queue, "...")
# Use a heartbeat of 10 minutes (in milliseconds)
self.conn = stomp.Connection12([(self.source_host, self.source_port)],
keepalive=True, heartbeats=(600000,
600000))
@@ -36,8 +71,9 @@ class StompReceiver(stomp.ConnectionListener):
self.__connect_and_subscribe()
def __connect_and_subscribe(self):
- self.conn.connect(self.username, self.password, wait=True)
- self.conn.subscribe(destination=self.source_queue, id=self.r_id,
ack='auto')
+ if not self.stop:
+ self.conn.connect(self.username, self.password, wait=True)
+ self.conn.subscribe(destination=self.source_queue, id=self.r_id,
ack='auto')
# self.conn.subscribe(destination=self.source_queue, id=self.id,
ack='client')
def set_typesystem(self, typesystem):
@@ -58,28 +94,40 @@ class StompReceiver(stomp.ConnectionListener):
self.stop = stop
def stop_receiver(self):
- self.conn.unsubscribe(destination=self.source_queue, id=self.r_id)
+ self.stop = True
+ print(time.ctime(), "Disconnecting Stomp Receiver on",
+ self.source_host, self.source_queue, "...")
self.conn.disconnect()
- print(time.ctime((time.time())), "Disconnected Stomp Receiver on",
self.source_host, self.source_queue)
- self.pipeline.collection_process_complete()
- exit_event.set()
+ print(time.ctime(), "Stomp Receiver disconnected.")
+ self.conn.unsubscribe(destination=self.source_queue, id=self.r_id)
+ print(time.ctime(), "Stomp Receiver unsubscribed.")
+ # self.conn.disconnect()
+ # print(time.ctime((time.time())), "Stomp Receiver disconnected.")
+ if self.completed:
+ self.pipeline.collection_process_complete()
def on_message(self, frame):
if frame.body == STOP_MESSAGE:
- print(time.ctime((time.time())), "Received Stop code.")
- self.stop = True
+ print(time.ctime(), "Received Stop code.")
+ # self.stop = True
# time.sleep(3)
+ self.completed = True
self.stop_receiver()
else:
if XMI_INDICATOR in frame.body:
cas = cassis.load_cas_from_xmi(frame.body,
self.get_typesystem())
self.pipeline.process(cas)
else:
- print(time.ctime((time.time())), "Malformed Message:\n",
frame.body)
+ print(time.ctime(), "Malformed Message:\n", frame.body)
def on_disconnected(self):
- if self.stop is False:
+ if not self.stop:
self.__connect_and_subscribe()
def on_error(self, frame):
- print(time.ctime((time.time())), "Receiver Error:", frame.body)
+ print(time.ctime(), "Receiver Error:", frame.body)
+
+ # Called when an exception is thrown.
+ def handle_exception(self):
+ self.completed = False
+ self.stop_receiver()
diff --git
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
index 0783453..19bf1b7 100644
---
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
+++
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
@@ -1,6 +1,14 @@
+import sys
+import time
+import traceback
+from traceback import TracebackException
+from threading import Event
+
from ctakes_pbj.pbj_tools import pbj_defaults
from ctakes_pbj.pbj_tools.arg_parser import ArgParser
+exit_event = Event()
+
class PBJPipeline:
@@ -33,27 +41,82 @@ class PBJPipeline:
# If get_args has already been called then added parameters will crash
the tool.
args = self.arg_parser.get_args()
# Set the necessary parameters in the collection reader.
- self.c_reader.init_params(args)
+ try:
+ self.c_reader.init_params(args)
+ except Exception as exceptable:
+ self.handle_exception(self.c_reader, exceptable, True)
# For each annotator set the necessary parameters.
for annotator in self.annotators:
annotator.init_params(args)
# For each annotator initialize resources, etc.
for annotator in self.annotators:
- annotator.initialize()
+ if exit_event.is_set():
+ break
+ try:
+ print(time.ctime(), "Initializing", type(annotator).__name__,
"...", flush=True)
+ annotator.initialize()
+ except Exception as exceptable:
+ self.handle_exception(annotator, exceptable, True)
self.initialized = True
# Starts / Runs the pipeline. This calls start on the collection reader.
def run(self):
if not self.initialized:
self.initialize()
- self.c_reader.start()
+ try:
+ print(time.ctime(), "Starting", type(self.c_reader).__name__,
"...", flush=True)
+ self.c_reader.start()
+ except Exception as exceptable:
+ self.handle_exception(self.c_reader, exceptable)
+ # Start a second thread that does nothing.
+ # It will allow the collection reader to wait for information.
+ while not exit_event.is_set():
+ exit_event.wait()
# For a new cas, call each annotator to process that cas.
def process(self, cas):
for annotator in self.annotators:
- annotator.process(cas)
+ if exit_event.is_set():
+ break
+ try:
+ print(time.ctime(), "Running", type(annotator).__name__,
"...", flush=True)
+ annotator.process(cas)
+ except Exception as exceptable:
+ self.handle_exception(annotator, exceptable)
# At the end of the corpus, call each annotator for cleanup, etc.
def collection_process_complete(self):
+ print(time.ctime(), "Collection processing complete.")
+ for annotator in self.annotators:
+ if exit_event.is_set():
+ break
+ try:
+ print(time.ctime(), "Notifying", type(annotator).__name__, "of
completion ...", flush=True)
+ annotator.collection_process_complete()
+ except Exception as exceptable:
+ self.handle_exception(annotator, exceptable)
+ print(time.ctime(), "Done.", flush=True)
+ exit_event.set()
+
+ def handle_exception(self, thrower, exceptable, initializing=False):
+ print(time.ctime(), "Exception thrown in",
+ type(thrower).__name__, ":",
+ type(exceptable).__name__, exceptable, flush=True)
+ # print(TracebackException.from_exception(exceptable, limit=-3,
capture_locals=True))
+ traceback.print_exc(limit=3, chain=False)
+ # Print nice output regarding the exception.
+ try:
+ print(time.ctime(), "Notifying", type(self.c_reader).__name__, "of
exception ...", flush=True)
+ self.c_reader.handle_exception(thrower, exceptable)
+ except Exception as exceptable_2:
+ traceback.print_exc(limit=3, chain=False)
+ # Distribute handling of exceptions.
for annotator in self.annotators:
- annotator.collection_process_complete()
+ try:
+ print(time.ctime(), "Notifying", type(annotator).__name__, "of
exception ...", flush=True)
+ annotator.handle_exception(thrower, exceptable, initializing)
+ except Exception as exceptable_2:
+ traceback.print_exc(limit=3, chain=False)
+ print(time.ctime(), "Done.", flush=True)
+ # sys.exit(1)
+ exit_event.set()