This is an automated email from the ASF dual-hosted git repository.

seanfinan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ctakes.git


The following commit(s) were added to refs/heads/main by this push:
     new 42eaef6  bat and sh scripts: simple reformatting PropertyAeFactory: 
Substitute parameter values for parts of values. pbj python: add exception 
handling, move daemon thread to pbj_pipeline from stomp_receiver
42eaef6 is described below

commit 42eaef60aa66445597b4f2db6d6408af02e0975a
Author: Sean Finan <[email protected]>
AuthorDate: Wed Jan 29 13:08:04 2025 -0500

    bat and sh scripts: simple reformatting
    PropertyAeFactory: Substitute parameter values for parts of values.
    pbj python: add exception handling, move daemon thread to pbj_pipeline from 
stomp_receiver
---
 .../apache/ctakes/core/util/PropertyAeFactory.java | 38 +++++++++-
 .../src/main/bin/getUmlsDictionary.bat             |  6 +-
 .../src/main/bin/getUmlsDictionary.sh              |  5 +-
 .../src/main/bin/runClinicalPipeline.bat           |  4 --
 .../src/main/bin/runClinicalPipeline.sh            |  4 +-
 .../src/main/bin/runDictionaryCreator.bat          |  6 +-
 .../src/main/bin/runDictionaryCreator.sh           |  8 +--
 .../src/main/bin/runPiperCreator.bat               |  3 -
 .../src/main/bin/runPiperCreator.sh                |  6 +-
 ctakes-distribution/src/main/bin/runPiperFile.bat  |  8 +--
 ctakes-distribution/src/main/bin/runPiperFile.sh   |  9 +--
 ctakes-distribution/src/main/bin/runPiperGUI.bat   |  4 --
 ctakes-distribution/src/main/bin/runPiperGUI.sh    |  4 +-
 .../src/main/bin/runPiperSubmitter.bat             |  3 -
 .../src/main/bin/runPiperSubmitter.sh              |  3 +-
 ctakes-distribution/src/main/bin/setenv.bat        |  5 +-
 ctakes-distribution/src/main/bin/setenv.sh         |  6 +-
 .../ctakes_cnlpt_py/pipeline/CnlptNegation.piper   |  7 +-
 .../src/ctakes_pbj/component/cas_annotator.py      |  4 ++
 .../src/ctakes_pbj/component/collection_reader.py  |  8 +++
 .../src/ctakes_pbj/component/pbj_receiver.py       | 36 ++++++++--
 .../src/ctakes_pbj/component/pbj_sender.py         | 14 ++--
 .../src/ctakes_pbj/pbj_tools/arg_parser.py         |  3 +-
 .../src/ctakes_pbj/pbj_tools/pbj_defaults.py       | 13 ++++
 .../src/ctakes_pbj/pbj_tools/stomp_receiver.py     | 84 +++++++++++++++++-----
 .../src/ctakes_pbj/pipeline/pbj_pipeline.py        | 73 +++++++++++++++++--
 26 files changed, 266 insertions(+), 98 deletions(-)

diff --git 
a/ctakes-core/src/main/java/org/apache/ctakes/core/util/PropertyAeFactory.java 
b/ctakes-core/src/main/java/org/apache/ctakes/core/util/PropertyAeFactory.java
index e6392a5..ce03060 100644
--- 
a/ctakes-core/src/main/java/org/apache/ctakes/core/util/PropertyAeFactory.java
+++ 
b/ctakes-core/src/main/java/org/apache/ctakes/core/util/PropertyAeFactory.java
@@ -148,7 +148,7 @@ public enum PropertyAeFactory {
       return createParameters( parameterMap );
    }
 
-   static public Object subVariableParameter( final Object parameterValue, 
final Map<String,Object> parameterMap ) {
+   static public Object subVariableParameterFull( final Object parameterValue, 
final Map<String,Object> parameterMap ) {
       if ( parameterMap == null || !(parameterValue instanceof String) ) {
          return parameterValue;
       }
@@ -171,6 +171,42 @@ public enum PropertyAeFactory {
       return subValue;
     }
 
+    static private String subVarParms( final String valueText, final 
Map<String,Object> parameterMap ) {
+       int lastIndex = valueText.lastIndexOf( '$' );
+       if ( lastIndex < 0 ) {
+          return valueText;
+       }
+       final String preText = subVarParms( valueText.substring( 0, lastIndex 
), parameterMap );
+       if ( lastIndex == valueText.length()-1 ) {
+          return preText;
+       }
+       final String text = valueText.substring( lastIndex+1 );
+       String varName = text;
+       for ( int i = text.length(); i>0; i-- ) {
+          varName = text.substring( 0, i );
+          Object subValue = parameterMap.get( varName );
+          if ( subValue == null ) {
+             // Check for an environment variable.
+             subValue = System.getenv( varName );
+          }
+          if ( subValue != null ) {
+             return preText + subValue + text.substring( i );
+          }
+       }
+//         LOGGER.warn( "No value for unknown substitution variable ${}", 
varName );
+//         return parameterValue;
+       return preText + "$" + text;
+    }
+
+   static public Object subVariableParameter( final Object parameterValue, 
final Map<String,Object> parameterMap ) {
+      if ( parameterMap == null || !(parameterValue instanceof String) ) {
+         return parameterValue;
+      }
+      final String textValue = parameterValue.toString();
+      return subVarParms( textValue, parameterMap );
+   }
+
+
    /**
     * @param readerClass Collection Reader class
     * @param parameters  parameters for the main component
diff --git a/ctakes-distribution/src/main/bin/getUmlsDictionary.bat 
b/ctakes-distribution/src/main/bin/getUmlsDictionary.bat
index d46e2d1..65cccc5 100644
--- a/ctakes-distribution/src/main/bin/getUmlsDictionary.bat
+++ b/ctakes-distribution/src/main/bin/getUmlsDictionary.bat
@@ -1,5 +1,4 @@
 @ECHO OFF
-::
 :: Licensed to the Apache Software Foundation (ASF) under one
 :: or more contributor license agreements.  See the NOTICE file
 :: distributed with this work for additional information
@@ -17,13 +16,10 @@
 :: specific language governing permissions and limitations
 :: under the License.
 
-
 ::   Starts a simple GUI to fetch the cTAKES umls (snomed, rxnorm) dictionary.
-
 :: Requires Java 17
 
-
-@REM The setenv script sets up the environment needed by cTAKES.
+:: The setenv script sets up the environment needed by cTAKES.
 @call %~sdp0\setenv.bat
 
 cd %CTAKES_HOME%
diff --git a/ctakes-distribution/src/main/bin/getUmlsDictionary.sh 
b/ctakes-distribution/src/main/bin/getUmlsDictionary.sh
index ac76604..5882a39 100644
--- a/ctakes-distribution/src/main/bin/getUmlsDictionary.sh
+++ b/ctakes-distribution/src/main/bin/getUmlsDictionary.sh
@@ -1,5 +1,5 @@
 #!/bin/sh
-#
+
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -17,12 +17,9 @@
 # specific language governing permissions and limitations
 # under the License.
 
-
 #   Starts a simple GUI to fetch the cTAKES umls (snomed, rxnorm) dictionary.
-
 # Requires Java 17
 
-
 # Sets up environment for cTAKES
 . ${HOME}/setenv.sh
 
diff --git a/ctakes-distribution/src/main/bin/runClinicalPipeline.bat 
b/ctakes-distribution/src/main/bin/runClinicalPipeline.bat
index da2c971..9f58976 100644
--- a/ctakes-distribution/src/main/bin/runClinicalPipeline.bat
+++ b/ctakes-distribution/src/main/bin/runClinicalPipeline.bat
@@ -1,5 +1,4 @@
 @ECHO OFF
-::
 :: Licensed to the Apache Software Foundation (ASF) under one
 :: or more contributor license agreements.  See the NOTICE file
 :: distributed with this work for additional information
@@ -17,7 +16,6 @@
 :: specific language governing permissions and limitations
 :: under the License.
 
-
 ::   Runs the default clinical pipeline with provided parameters.
 ::   Required parameters are:
 ::   -i , --inputDir {inputDirectory}
@@ -29,10 +27,8 @@
 ::   --xmiOut {xmiOutputDirectory} (if different from -o)
 ::   -l , --lookupXml {dictionaryConfigFile} (fast only)
 ::   -? , --help
-
 :: Requires Java 17
 
-
 :: The setenv script sets up the environment needed by cTAKES.
 @call %~sdp0\setenv.bat
 
diff --git a/ctakes-distribution/src/main/bin/runClinicalPipeline.sh 
b/ctakes-distribution/src/main/bin/runClinicalPipeline.sh
index b69732f..b35b3b9 100644
--- a/ctakes-distribution/src/main/bin/runClinicalPipeline.sh
+++ b/ctakes-distribution/src/main/bin/runClinicalPipeline.sh
@@ -1,5 +1,5 @@
 #!/bin/sh
-#
+
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -28,10 +28,8 @@
 #   --xmiOut {xmiOutputDirectory} (if different from -o)
 #   -l , --lookupXml {dictionaryConfigFile} (fast only)
 #   -? , --help
-
 # Requires Java 17
 
-
 # Sets up environment for cTAKES
 . ${HOME}/setenv.sh
 
diff --git a/ctakes-distribution/src/main/bin/runDictionaryCreator.bat 
b/ctakes-distribution/src/main/bin/runDictionaryCreator.bat
index ec73443..f8ea0ab 100644
--- a/ctakes-distribution/src/main/bin/runDictionaryCreator.bat
+++ b/ctakes-distribution/src/main/bin/runDictionaryCreator.bat
@@ -1,4 +1,3 @@
-::
 :: Licensed to the Apache Software Foundation (ASF) under one
 :: or more contributor license agreements.  See the NOTICE file
 :: distributed with this work for additional information
@@ -15,10 +14,11 @@
 :: KIND, either express or implied.  See the License for the
 :: specific language governing permissions and limitations
 :: under the License.
-::
 
-::   Starts a GUI that can facilitate creation of a dictionary for the
+::   Starts a GUI that can facilitate creation of a dictionary.
+::   The created dictionary contains SQL that can be used by the
 ::   [ctakes-dictionary-lookup-fast](ctakes-dictionary-lookup-fast) module.
+:: Requires Java 17
 
 :: The setenv script sets up the environment needed by cTAKES.
 @call %~sdp0\setenv.bat
diff --git a/ctakes-distribution/src/main/bin/runDictionaryCreator.sh 
b/ctakes-distribution/src/main/bin/runDictionaryCreator.sh
index ae36710..b3bf1c2 100644
--- a/ctakes-distribution/src/main/bin/runDictionaryCreator.sh
+++ b/ctakes-distribution/src/main/bin/runDictionaryCreator.sh
@@ -1,4 +1,5 @@
-#
+#!/bin/sh
+
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -16,11 +17,10 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-#   Starts a GUI that can facilitate creation of a dictionary for the
+#   Starts a GUI that can facilitate creation of a dictionary.
+#   The created dictionary contains SQL that can be used by the
 #   [ctakes-dictionary-lookup-fast](ctakes-dictionary-lookup-fast) module.
-#
 # Requires Java 17
-#
 
 # Sets up environment for cTAKES
 . ${HOME}/setenv.sh
diff --git a/ctakes-distribution/src/main/bin/runPiperCreator.bat 
b/ctakes-distribution/src/main/bin/runPiperCreator.bat
index 9bb7c8d..7a64cb3 100644
--- a/ctakes-distribution/src/main/bin/runPiperCreator.bat
+++ b/ctakes-distribution/src/main/bin/runPiperCreator.bat
@@ -1,4 +1,3 @@
-::
 :: Licensed to the Apache Software Foundation (ASF) under one
 :: or more contributor license agreements.  See the NOTICE file
 :: distributed with this work for additional information
@@ -17,10 +16,8 @@
 :: under the License.
 
 ::    Starts a GUI that can facilitate creation of a pipeline.
-
 :: Requires Java 17
 
-
 :: The setenv script sets up the environment needed by cTAKES.
 @call %~sdp0\setenv.bat
 
diff --git a/ctakes-distribution/src/main/bin/runPiperCreator.sh 
b/ctakes-distribution/src/main/bin/runPiperCreator.sh
index 311a6fb..ecb47a0 100644
--- a/ctakes-distribution/src/main/bin/runPiperCreator.sh
+++ b/ctakes-distribution/src/main/bin/runPiperCreator.sh
@@ -1,4 +1,5 @@
-#
+#!/bin/sh
+
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -16,12 +17,9 @@
 # specific language governing permissions and limitations
 # under the License.
 
-
 #    Starts a GUI that can facilitate creation of a pipeline.
-
 # Requires Java 17
 
-
 # Sets up environment for cTAKES
 . ${HOME}/setenv.sh
 
diff --git a/ctakes-distribution/src/main/bin/runPiperFile.bat 
b/ctakes-distribution/src/main/bin/runPiperFile.bat
index 42dc2f4..ffc2768 100644
--- a/ctakes-distribution/src/main/bin/runPiperFile.bat
+++ b/ctakes-distribution/src/main/bin/runPiperFile.bat
@@ -1,5 +1,4 @@
 @ECHO OFF
-::
 :: Licensed to the Apache Software Foundation (ASF) under one
 :: or more contributor license agreements.  See the NOTICE file
 :: distributed with this work for additional information
@@ -17,9 +16,8 @@
 :: specific language governing permissions and limitations
 :: under the License.
 
-
-::   Runs the pipeline in the piper file specified by -p {piperfile}
-::   with any other provided parameters.  Standard parameters are:
+::   Runs the pipeline in the piper file specified by -p {piperfile} with 
provided parameters.
+::   Standard parameters are:
 ::   -i , --inputDir {inputDirectory}
 ::   -o , --outputDir {outputDirectory}
 ::   -s , --subDir {subDirectory}  (for i/o)
@@ -35,10 +33,8 @@
 ::     cli PARAGRAPH_TYPES_PATH=t
 ::   and when executing this script use:
 ::      runPiperFile -p path/to/my/custom.piper -t path/to/my/custom.bsv  ...
-
 :: Requires Java 17
 
-
 :: The setenv script sets up the environment needed by cTAKES.
 @call %~sdp0\setenv.bat
 
diff --git a/ctakes-distribution/src/main/bin/runPiperFile.sh 
b/ctakes-distribution/src/main/bin/runPiperFile.sh
index 0bda7db..44f71ad 100644
--- a/ctakes-distribution/src/main/bin/runPiperFile.sh
+++ b/ctakes-distribution/src/main/bin/runPiperFile.sh
@@ -1,5 +1,5 @@
 #!/bin/sh
-#
+
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -17,9 +17,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-
-#   Runs the pipeline in the piper file specified by -p (piperfile)
-#   with any other provided parameters.  Standard parameters are:
+#   Runs the pipeline in the piper file specified by -p {piperfile} with 
provided parameters.
+#   Standard parameters are:
 #     -i , --inputDir {inputDirectory}
 #     -o , --outputDir {outputDirectory}
 #     -s , --subDir {subDirectory}  (for i/o)
@@ -35,10 +34,8 @@
 #     cli PARAGRAPH_TYPES_PATH=t
 #   and when executing this script use:
 #      runPiperFile -p path/to/my/custom.piper -t path/to/my/custom.bsv  ...
-
 # Requires Java 17
 
-
 # Sets up environment for cTAKES
 . ${HOME}/setenv.sh
 
diff --git a/ctakes-distribution/src/main/bin/runPiperGUI.bat 
b/ctakes-distribution/src/main/bin/runPiperGUI.bat
index 8fe6583..a1aafe3 100644
--- a/ctakes-distribution/src/main/bin/runPiperGUI.bat
+++ b/ctakes-distribution/src/main/bin/runPiperGUI.bat
@@ -1,5 +1,4 @@
 @ECHO OFF
-::
 :: Licensed to the Apache Software Foundation (ASF) under one
 :: or more contributor license agreements.  See the NOTICE file
 :: distributed with this work for additional information
@@ -17,12 +16,9 @@
 :: specific language governing permissions and limitations
 :: under the License.
 
-
 ::   Starts a GUI that can run a pipeline.
-
 :: Requires Java 17
 
-
 :: The setenv script sets up the environment needed by cTAKES.
 @call %~sdp0\setenv.bat
 
diff --git a/ctakes-distribution/src/main/bin/runPiperGUI.sh 
b/ctakes-distribution/src/main/bin/runPiperGUI.sh
index 89bb75d..3f8ba68 100644
--- a/ctakes-distribution/src/main/bin/runPiperGUI.sh
+++ b/ctakes-distribution/src/main/bin/runPiperGUI.sh
@@ -1,5 +1,5 @@
 #!/bin/sh
-#
+
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -18,10 +18,8 @@
 # under the License.
 
 #   Starts a GUI that can run a pipeline.
-
 # Requires Java 17
 
-
 # Sets up environment for cTAKES
 . ${HOME}/setenv.sh
 
diff --git a/ctakes-distribution/src/main/bin/runPiperSubmitter.bat 
b/ctakes-distribution/src/main/bin/runPiperSubmitter.bat
index 0dc7d35..1301a54 100644
--- a/ctakes-distribution/src/main/bin/runPiperSubmitter.bat
+++ b/ctakes-distribution/src/main/bin/runPiperSubmitter.bat
@@ -1,5 +1,4 @@
 @ECHO OFF
-::
 :: Licensed to the Apache Software Foundation (ASF) under one
 :: or more contributor license agreements.  See the NOTICE file
 :: distributed with this work for additional information
@@ -19,10 +18,8 @@
 
 ::   Starts a GUI that can run a pipeline.
 ::   Deprecated: Identical to the runPiperGUI script.
-
 :: Requires Java 17
 
-
 :: The setenv script sets up the environment needed by cTAKES.
 @call %~sdp0\setenv.bat
 
diff --git a/ctakes-distribution/src/main/bin/runPiperSubmitter.sh 
b/ctakes-distribution/src/main/bin/runPiperSubmitter.sh
index b22a4a2..def4489 100644
--- a/ctakes-distribution/src/main/bin/runPiperSubmitter.sh
+++ b/ctakes-distribution/src/main/bin/runPiperSubmitter.sh
@@ -1,5 +1,5 @@
 #!/bin/sh
-#
+
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -19,7 +19,6 @@
 
 #   Starts a GUI that can run a pipeline.
 #   Deprecated: Identical to the runPiperGUI script.
-
 # Requires Java 17
 
 # Sets up environment for cTAKES
diff --git a/ctakes-distribution/src/main/bin/setenv.bat 
b/ctakes-distribution/src/main/bin/setenv.bat
index 33c4c96..ce0356a 100644
--- a/ctakes-distribution/src/main/bin/setenv.bat
+++ b/ctakes-distribution/src/main/bin/setenv.bat
@@ -1,5 +1,4 @@
 @ECHO OFF
-::
 :: Licensed to the Apache Software Foundation (ASF) under one
 :: or more contributor license agreements.  See the NOTICE file
 :: distributed with this work for additional information
@@ -17,12 +16,10 @@
 :: specific language governing permissions and limitations
 :: under the License.
 
-
 ::   Sets up the standard environment for cTAKES.
-
+::   This script is called by cTAKES run scripts.
 :: Requires Java 17
 
-
 :: Guess CTAKES_HOME if not defined
 set CURRENT_DIR=%cd%
 if not "%CTAKES_HOME%" == "" goto gotHome
diff --git a/ctakes-distribution/src/main/bin/setenv.sh 
b/ctakes-distribution/src/main/bin/setenv.sh
index 9db1d1e..0bf54b9 100644
--- a/ctakes-distribution/src/main/bin/setenv.sh
+++ b/ctakes-distribution/src/main/bin/setenv.sh
@@ -1,5 +1,5 @@
 #!/bin/sh
-#
+
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -17,12 +17,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
-
 #   Sets up the standard environment for cTAKES.
-
+#   This script is called by cTAKES run scripts.
 # Requires Java 17
 
-
 PRG="$0"
 while [ -h "$PRG" ]; do
   ls=`ls -ld "$PRG"`
diff --git 
a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/pipeline/CnlptNegation.piper
 
b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/pipeline/CnlptNegation.piper
index 5e298e7..7b18df5 100644
--- 
a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/pipeline/CnlptNegation.piper
+++ 
b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/pipeline/CnlptNegation.piper
@@ -8,7 +8,7 @@
 #  That instance of cTAKES will run the third and final bit of the entire PBJ 
pipeline.
 #
 #  This piper will then launch a python PBJ pipeline that runs the negation 
model from the external project
-#  Clinical NLP Transformers (cnlp_transformers), which can be found at
+#  Clinical NLP Transformers (cnlp_transformers , cnlpt), which can be found at
 #  https://github.com/Machine-Learning-for-Medical-Language/cnlp_transformers
 #
 
@@ -25,6 +25,11 @@
 #  By default ctakes-pbj will be pip ed at the beginning of a run.  You can 
turn this off with:
 #  --pipPbj no
 #
+#  A non-standard command-line option is the specification of whether or not 
to pip the cnlpt package.
+#  By default cnlpt will be pip ed at the beginning of a run.  You can turn 
this off with:
+#  ++pipCnlpt no
+#  If used, that must be added at the end of your command.
+#
 
 //  Sets up required parameters, starts your Artemis Broker, pips the PBJ 
project.
 load PbjStarter
diff --git 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
index 505a399..5ff68e8 100644
--- 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
+++ 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
@@ -23,3 +23,7 @@ class CasAnnotator(ABC):
     # Called once at the end of the pipeline.
     def collection_process_complete(self):
         pass
+
+    # Called when an exception is thrown.
+    def handle_exception(self, thrower, exceptable, initializing=False):
+        pass
diff --git 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py
 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py
index 3ae3de7..cb4e58d 100644
--- 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py
+++ 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py
@@ -24,3 +24,11 @@ class CollectionReader(ABC):
     @abstractmethod
     def start(self):
         pass
+
+    # Called to stop reading.
+    def stop(self):
+        pass
+
+    # Called when an exception is thrown.
+    def handle_exception(self):
+        pass
diff --git 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
index 28a8883..0b4d7b3 100644
--- 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
+++ 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
@@ -1,7 +1,11 @@
 import sys
+import time
+import threading
 from ctakes_pbj.component.collection_reader import CollectionReader
 from ctakes_pbj.pbj_tools import pbj_defaults
-from ctakes_pbj.pbj_tools.stomp_receiver import start_receiver
+# from ctakes_pbj.pbj_tools.stomp_receiver import start_receiver
+# from ctakes_pbj.pbj_tools.stomp_receiver import stop_receiver
+from ctakes_pbj.pbj_tools.stomp_receiver import StompReceiver
 
 
 class PBJReceiver(CollectionReader):
@@ -22,7 +26,7 @@ class PBJReceiver(CollectionReader):
 
     # Called once at the build of a pipeline.
     def declare_params(self, arg_parser):
-        arg_parser.add_arg('-rq', '--receive_queue')
+        arg_parser.add_arg('-rq', '--receive_queue', 
default=pbj_defaults.get_default_rcv_q())
         arg_parser.add_arg('-rh', '--receive_host', 
default=pbj_defaults.DEFAULT_HOST)
         arg_parser.add_arg('-rpt', '--receive_port', 
default=pbj_defaults.DEFAULT_PORT)
         arg_parser.add_arg('-ru', '--receive_user', 
default=pbj_defaults.DEFAULT_USER)
@@ -44,6 +48,30 @@ class PBJReceiver(CollectionReader):
     # Called start reading cas objects and pass them to the pipeline.
     def start(self):
         if not self.receiving:
+            # start_receiver(self.pipeline, self.queue, self.host, self.port,
+            #                self.password, self.username)
             self.receiving = True
-            start_receiver(self.pipeline, self.queue, self.host, self.port,
-                           self.password, self.username)
+            self.stomp_receiver = StompReceiver(self.pipeline, self.queue, 
self.host, self.port,
+                                                self.password, self.username, 
r_id='1')
+            # stomp_thread = 
threading.Thread(target=start_receiver(self.stomp_receiver))
+            # stomp_thread.start()
+            # start_receiver(self.stomp_receiver)
+            self.stomp_receiver.start_receiver()
+
+    # Called to stop reading.
+    def stop(self):
+        print(time.ctime(), "PBJ Receiver: Stopping Stomp receiver ...")
+        self.stomp_receiver.stop_receiver()
+
+    # Called when an exception is thrown.
+    def handle_exception(self, thrower, exceptable, initializing=False):
+        if self.receiving:
+            # self.stop()
+            # print(time.ctime(), "PBJ Receiver: Stopping Stomp receiver ...")
+            # stop_receiver()
+            # self.stomp_receiver.set_stop(True)
+            self.receiving = False
+            self.stomp_receiver.handle_exception()
+            # Stop stomp in a background thread, just in case it is slow to 
react.
+            # stop_thread = 
threading.Thread(target=self.stomp_receiver.handle_exception)
+            # stop_thread.start()
diff --git 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
index 87a6295..94649d0 100644
--- 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
+++ 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
@@ -19,7 +19,7 @@ class PBJSender(cas_annotator.CasAnnotator):
     # Called once at the build of a pipeline.
     def declare_params(self, arg_parser):
         # arg_parser.add_arg('send_queue')
-        arg_parser.add_arg('-sq', '--send_queue', default=DEFAULT_HOST)
+        arg_parser.add_arg('-sq', '--send_queue', default=get_default_send_q())
         arg_parser.add_arg('-sh', '--send_host', default=DEFAULT_HOST)
         arg_parser.add_arg('-spt', '--send_port', default=DEFAULT_PORT)
         arg_parser.add_arg('-su', '--send_user', default=DEFAULT_USER)
@@ -40,7 +40,7 @@ class PBJSender(cas_annotator.CasAnnotator):
 
     # Called once at the beginning of a pipeline.
     def initialize(self):
-        print(time.ctime((time.time())), "Starting PBJ Sender on", self.host, 
self.queue, "...")
+        print(time.ctime(), "Starting PBJ Sender on", self.host, self.queue, 
"...")
         # Use a heartbeat of 10 minutes  (in milliseconds)
         self.conn = stomp.Connection12([(self.host, self.port)],
                                        keepalive=True, heartbeats=(600000, 
600000))
@@ -48,7 +48,7 @@ class PBJSender(cas_annotator.CasAnnotator):
 
     # Called for every cas passed through the pipeline.
     def process(self, cas):
-        print(time.ctime((time.time())), "Sending processed information to",
+        print(time.ctime(), "Sending processed information to",
               self.host, self.queue, "...")
         xmi = cas.to_xmi()
         self.conn.send(self.queue, xmi)
@@ -57,14 +57,18 @@ class PBJSender(cas_annotator.CasAnnotator):
     def collection_process_complete(self):
         self.send_stop()
 
+    # Called when an exception is thrown.
+    def handle_exception(self, thrower, exceptable, initializing=False):
+        self.send_stop()
+
     def send_text(self, text):
         self.conn.send(self.queue, text)
 
     def send_stop(self):
-        print(time.ctime((time.time())), "Sending Stop code to", self.host, 
self.queue, "...")
+        print(time.ctime(), "Sending Stop code to", self.host, self.queue, 
"...", flush=True)
         self.conn.send(self.queue, STOP_MESSAGE)
         self.conn.disconnect()
-        print(time.ctime((time.time())), "Disconnected PBJ Sender on", 
self.host, self.queue)
+        print(time.ctime(), "Disconnected PBJ Sender on", self.host, 
self.queue)
 
     def set_host(self, host_name):
         self.host = host_name
diff --git 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
index f10a37f..eaa0b65 100644
--- 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
+++ 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
@@ -8,7 +8,6 @@ class ArgParser:
 
     def get_arg_parser(self):
         if self.arg_parser is None:
-            print('Creating arg_parser')
             self.arg_parser = argparse.ArgumentParser(
                 prog='ctakes-pbj',
                 description='Does wonderful stuff...',
@@ -20,5 +19,5 @@ class ArgParser:
         self.get_arg_parser().add_argument(*args, **kwargs)
 
     def get_args(self):
-        print('Parsing Arguments')
+        print('Parsing Arguments ...')
         return self.get_arg_parser().parse_args()
diff --git 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py
 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py
index 6e8acb8..9fe4ec1 100644
--- 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py
+++ 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py
@@ -1,6 +1,19 @@
+import sys
+import os.path
+
 DEFAULT_HOST = 'localhost'
 DEFAULT_PORT = 61616
 DEFAULT_USER = 'guest'
 DEFAULT_PASS = 'guest'
 DEFAULT_OUT_DIR = 'pbj_output/'
 STOP_MESSAGE = "Apache cTAKES PBJ Stop Message."
+
+
+# The default receive queue is the name of the script with the prefix "to_"
+def get_default_rcv_q():
+    return "to_" + os.path.splitext(os.path.basename(sys.argv[0]))[0]
+
+
+# The default send queue is the name of the script with the prefix "from_"
+def get_default_send_q():
+    return "from_" + os.path.splitext(os.path.basename(sys.argv[0]))[0]
diff --git 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py
 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py
index afa3e9a..d5a989a 100644
--- 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py
+++ 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py
@@ -1,4 +1,5 @@
 import time
+import threading
 from threading import Event
 
 import stomp
@@ -6,19 +7,43 @@ import stomp
 from ctakes_pbj.pbj_tools.pbj_defaults import STOP_MESSAGE
 from ctakes_pbj.type_system.type_system_loader import *
 
-exit_event = Event()
 
+# exit_event = Event()
 
-def start_receiver(pipeline, queue_name, host_name, port_name,
-                   password, username, r_id='1'):
-    StompReceiver(pipeline, queue_name, host_name, port_name, password, 
username, r_id)
-    while not exit_event.is_set():
-        exit_event.wait()
+
+# def start_receiver(pipeline, queue_name, host_name, port_name,
+#                    password, username, r_id='1'):
+#     StompReceiver(pipeline, queue_name, host_name, port_name, password, 
username, r_id)
+#     while not exit_event.is_set():
+#         exit_event.wait()
+
+# def start_receiver():
+#     while not exit_event.is_set():
+#         exit_event.wait()
+
+# def start_receiver(stomp_receiver):
+#     stomp_receiver.start_receiver()
+# while not exit_event.is_set():
+#     exit_event.wait()
+# wait_thread = threading.Thread(target=run_receiver(stomp_receiver))
+# wait_thread.start()
+
+
+# def run_receiver(stomp_receiver):
+#     stomp_receiver.start_receiver()
+#     while not exit_event.is_set():
+#         exit_event.wait()
+
+
+# def stop_receiver():
+#     exit_event.set()
 
 
 class StompReceiver(stomp.ConnectionListener):
 
     def __init__(self, pipeline, queue_name, host_name, port_name, password, 
username, r_id):
+        self.stop = False
+        self.conn = None
         self.source_queue = queue_name
         self.source_host = host_name
         self.source_port = port_name
@@ -27,7 +52,17 @@ class StompReceiver(stomp.ConnectionListener):
         self.username = username
         self.r_id = r_id
         self.typesystem = None
-        print(time.ctime((time.time())), "Starting Stomp Receiver on", 
self.source_host, self.source_queue, "...")
+        self.completed = False
+        # print(time.ctime((time.time())), "Starting Stomp Receiver on", 
self.source_host, self.source_queue, "...")
+        # # Use a heartbeat of 10 minutes  (in milliseconds)
+        # self.conn = stomp.Connection12([(self.source_host, 
self.source_port)],
+        #                                keepalive=True, heartbeats=(600000, 
600000))
+        # self.conn.set_listener('Stomp_Receiver', self)
+        # self.stop = False
+        # self.__connect_and_subscribe()
+
+    def start_receiver(self):
+        print(time.ctime(), "Starting Stomp Receiver on", self.source_host, 
self.source_queue, "...")
         # Use a heartbeat of 10 minutes  (in milliseconds)
         self.conn = stomp.Connection12([(self.source_host, self.source_port)],
                                        keepalive=True, heartbeats=(600000, 
600000))
@@ -36,8 +71,9 @@ class StompReceiver(stomp.ConnectionListener):
         self.__connect_and_subscribe()
 
     def __connect_and_subscribe(self):
-        self.conn.connect(self.username, self.password, wait=True)
-        self.conn.subscribe(destination=self.source_queue, id=self.r_id, 
ack='auto')
+        if not self.stop:
+            self.conn.connect(self.username, self.password, wait=True)
+            self.conn.subscribe(destination=self.source_queue, id=self.r_id, 
ack='auto')
         # self.conn.subscribe(destination=self.source_queue, id=self.id, 
ack='client')
 
     def set_typesystem(self, typesystem):
@@ -58,28 +94,40 @@ class StompReceiver(stomp.ConnectionListener):
         self.stop = stop
 
     def stop_receiver(self):
-        self.conn.unsubscribe(destination=self.source_queue, id=self.r_id)
+        self.stop = True
+        print(time.ctime(), "Disconnecting Stomp Receiver on",
+              self.source_host, self.source_queue, "...")
         self.conn.disconnect()
-        print(time.ctime((time.time())), "Disconnected Stomp Receiver on", 
self.source_host, self.source_queue)
-        self.pipeline.collection_process_complete()
-        exit_event.set()
+        print(time.ctime(), "Stomp Receiver disconnected.")
+        self.conn.unsubscribe(destination=self.source_queue, id=self.r_id)
+        print(time.ctime(), "Stomp Receiver unsubscribed.")
+        # self.conn.disconnect()
+        # print(time.ctime((time.time())), "Stomp Receiver disconnected.")
+        if self.completed:
+            self.pipeline.collection_process_complete()
 
     def on_message(self, frame):
         if frame.body == STOP_MESSAGE:
-            print(time.ctime((time.time())), "Received Stop code.")
-            self.stop = True
+            print(time.ctime(), "Received Stop code.")
+            # self.stop = True
             # time.sleep(3)
+            self.completed = True
             self.stop_receiver()
         else:
             if XMI_INDICATOR in frame.body:
                 cas = cassis.load_cas_from_xmi(frame.body, 
self.get_typesystem())
                 self.pipeline.process(cas)
             else:
-                print(time.ctime((time.time())), "Malformed Message:\n", 
frame.body)
+                print(time.ctime(), "Malformed Message:\n", frame.body)
 
     def on_disconnected(self):
-        if self.stop is False:
+        if not self.stop:
             self.__connect_and_subscribe()
 
     def on_error(self, frame):
-        print(time.ctime((time.time())), "Receiver Error:", frame.body)
+        print(time.ctime(), "Receiver Error:", frame.body)
+
+    # Called when an exception is thrown.
+    def handle_exception(self):
+        self.completed = False
+        self.stop_receiver()
diff --git 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
index 0783453..19bf1b7 100644
--- 
a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
+++ 
b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
@@ -1,6 +1,14 @@
+import sys
+import time
+import traceback
+from traceback import TracebackException
+from threading import Event
+
 from ctakes_pbj.pbj_tools import pbj_defaults
 from ctakes_pbj.pbj_tools.arg_parser import ArgParser
 
+exit_event = Event()
+
 
 class PBJPipeline:
 
@@ -33,27 +41,82 @@ class PBJPipeline:
         # If get_args has already been called then added parameters will crash 
the tool.
         args = self.arg_parser.get_args()
         # Set the necessary parameters in the collection reader.
-        self.c_reader.init_params(args)
+        try:
+            self.c_reader.init_params(args)
+        except Exception as exceptable:
+            self.handle_exception(self.c_reader, exceptable, True)
         # For each annotator set the necessary parameters.
         for annotator in self.annotators:
             annotator.init_params(args)
         # For each annotator initialize resources, etc.
         for annotator in self.annotators:
-            annotator.initialize()
+            if exit_event.is_set():
+                break
+            try:
+                print(time.ctime(), "Initializing", type(annotator).__name__, 
"...", flush=True)
+                annotator.initialize()
+            except Exception as exceptable:
+                self.handle_exception(annotator, exceptable, True)
         self.initialized = True
 
     # Starts / Runs the pipeline.  This calls start on the collection reader.
     def run(self):
         if not self.initialized:
             self.initialize()
-        self.c_reader.start()
+        try:
+            print(time.ctime(), "Starting", type(self.c_reader).__name__, 
"...", flush=True)
+            self.c_reader.start()
+        except Exception as exceptable:
+            self.handle_exception(self.c_reader, exceptable)
+        # Start a second thread that does nothing.
+        # It will allow the collection reader to wait for information.
+        while not exit_event.is_set():
+            exit_event.wait()
 
     # For a new cas, call each annotator to process that cas.
     def process(self, cas):
         for annotator in self.annotators:
-            annotator.process(cas)
+            if exit_event.is_set():
+                break
+            try:
+                print(time.ctime(), "Running", type(annotator).__name__, 
"...", flush=True)
+                annotator.process(cas)
+            except Exception as exceptable:
+                self.handle_exception(annotator, exceptable)
 
     # At the end of the corpus, call each annotator for cleanup, etc.
     def collection_process_complete(self):
+        print(time.ctime(), "Collection processing complete.")
+        for annotator in self.annotators:
+            if exit_event.is_set():
+                break
+            try:
+                print(time.ctime(), "Notifying", type(annotator).__name__, "of 
completion ...", flush=True)
+                annotator.collection_process_complete()
+            except Exception as exceptable:
+                self.handle_exception(annotator, exceptable)
+        print(time.ctime(), "Done.", flush=True)
+        exit_event.set()
+
+    def handle_exception(self, thrower, exceptable, initializing=False):
+        print(time.ctime(), "Exception thrown in",
+              type(thrower).__name__, ":",
+              type(exceptable).__name__, exceptable, flush=True)
+        # print(TracebackException.from_exception(exceptable, limit=-3, 
capture_locals=True))
+        traceback.print_exc(limit=3, chain=False)
+        # Print nice output regarding the exception.
+        try:
+            print(time.ctime(), "Notifying", type(self.c_reader).__name__, "of 
exception ...", flush=True)
+            self.c_reader.handle_exception(thrower, exceptable)
+        except Exception as exceptable_2:
+            traceback.print_exc(limit=3, chain=False)
+        # Distribute handling of exceptions.
         for annotator in self.annotators:
-            annotator.collection_process_complete()
+            try:
+                print(time.ctime(), "Notifying", type(annotator).__name__, "of 
exception ...", flush=True)
+                annotator.handle_exception(thrower, exceptable, initializing)
+            except Exception as exceptable_2:
+                traceback.print_exc(limit=3, chain=False)
+        print(time.ctime(), "Done.", flush=True)
+        # sys.exit(1)
+        exit_event.set()


Reply via email to