Repository: atlas
Updated Branches:
  refs/heads/master 9a4ca16d7 -> f57fd7f0f


http://git-wip-us.apache.org/repos/asf/atlas/blob/f57fd7f0/omrs/src/main/java/org/apache/atlas/omrs/rest/server/OMRSRepositoryRESTServices.java
----------------------------------------------------------------------
diff --git 
a/omrs/src/main/java/org/apache/atlas/omrs/rest/server/OMRSRepositoryRESTServices.java
 
b/omrs/src/main/java/org/apache/atlas/omrs/rest/server/OMRSRepositoryRESTServices.java
index 237a2b0..7ad981e 100644
--- 
a/omrs/src/main/java/org/apache/atlas/omrs/rest/server/OMRSRepositoryRESTServices.java
+++ 
b/omrs/src/main/java/org/apache/atlas/omrs/rest/server/OMRSRepositoryRESTServices.java
@@ -104,8 +104,17 @@ public class OMRSRepositoryRESTServices
     public static void setLocalRepository(LocalOMRSRepositoryConnector    
localRepositoryConnector,
                                           String                          
localServerURL)
     {
-        OMRSRepositoryRESTServices.localRepositoryConnector = 
localRepositoryConnector;
-        OMRSRepositoryRESTServices.localMetadataCollection = 
localRepositoryConnector.getMetadataCollection();
+        try
+        {
+            OMRSRepositoryRESTServices.localRepositoryConnector = 
localRepositoryConnector;
+            OMRSRepositoryRESTServices.localMetadataCollection = 
localRepositoryConnector.getMetadataCollection();
+        }
+        catch (Throwable error)
+        {
+            OMRSRepositoryRESTServices.localRepositoryConnector = null;
+            OMRSRepositoryRESTServices.localMetadataCollection = null;
+        }
+
         OMRSRepositoryRESTServices.localServerURL = localServerURL;
     }
 
@@ -433,7 +442,7 @@ public class OMRSRepositoryRESTServices
      * RepositoryErrorException - there is a problem communicating with the 
metadata repository or
      * UserNotAuthorizedException - the userId is not permitted to perform 
this operation.
      */
-    @RequestMapping(method = RequestMethod.GET, path = 
"/{userId}/types/typedefs/by-search-criteria")
+    @RequestMapping(method = RequestMethod.GET, path = 
"/{userId}/types/typedefs/by-property-value")
 
     public TypeDefListResponse searchForTypeDefs(@PathVariable String userId,
                                                  @RequestParam String 
searchCriteria)
@@ -1492,11 +1501,11 @@ public class OMRSRepositoryRESTServices
 
 
     /**
-     * Return a historical versionName of an entity - includes the header, 
classifications and properties of the entity.
+     * Return a historical version of an entity - includes the header, 
classifications and properties of the entity.
      *
      * @param userId - unique identifier for requesting user.
      * @param guid - String unique identifier for the entity.
-     * @param asOfTime - the time used to determine which versionName of the 
entity that is desired.
+     * @param asOfTime - the time used to determine which version of the 
entity that is desired.
      * @return EnityDetailResponse:
      * EntityDetail structure or
      * InvalidParameterException - the guid or date is null or the asOfTime 
property is for a future time or
@@ -1926,9 +1935,12 @@ public class OMRSRepositoryRESTServices
 
 
     /**
-     * Return a list of entities matching the search criteria.
+     * Return a list of entities whose string based property values match the 
search criteria.  The
+     * search criteria may include regex style wild cards.
      *
      * @param userId - unique identifier for requesting user.
+     * @param entityTypeGUID - GUID of the type of entity to search for. Null 
means all types will
+     *                       be searched (could be slow so not recommended).
      * @param searchCriteria - String expression of the characteristics of the 
required relationships.
      * @param fromEntityElement - the starting element number of the entities 
to return.
      *                                This is used when retrieving elements
@@ -1955,19 +1967,20 @@ public class OMRSRepositoryRESTServices
      * FunctionNotSupportedException - the repository does not support 
satOfTime parameter or
      * UserNotAuthorizedException - the userId is not permitted to perform 
this operation.
      */
-    @RequestMapping(method = RequestMethod.GET, path = 
"/{userId}/instances/entities/by-search-criteria")
-
-    public  EntityListResponse searchForEntities(@PathVariable                 
  String                          userId,
-                                                 @RequestParam                 
  String                          searchCriteria,
-                                                 @RequestParam(required = 
false) int                             fromEntityElement,
-                                                 @RequestParam(required = 
false) List<InstanceStatus>            limitResultsByStatus,
-                                                 @RequestParam(required = 
false) List<String>                    limitResultsByClassification,
-                                                 @RequestParam(required = 
false) Date                            asOfTime,
-                                                 @RequestParam(required = 
false) String                          sequencingProperty,
-                                                 @RequestParam(required = 
false) SequencingOrder                 sequencingOrder,
-                                                 @RequestParam(required = 
false) int                             pageSize)
+    @RequestMapping(method = RequestMethod.GET, path = 
"/{userId}/instances/entities/by-property-value")
+
+    public  EntityListResponse fndEntitiesByPropertyValue(@PathVariable        
           String                  userId,
+                                                          
@RequestParam(required = false) String                  entityTypeGUID,
+                                                          @RequestParam        
           String                  searchCriteria,
+                                                          
@RequestParam(required = false) int                     fromEntityElement,
+                                                          
@RequestParam(required = false) List<InstanceStatus>    limitResultsByStatus,
+                                                          
@RequestParam(required = false) List<String>            
limitResultsByClassification,
+                                                          
@RequestParam(required = false) Date                    asOfTime,
+                                                          
@RequestParam(required = false) String                  sequencingProperty,
+                                                          
@RequestParam(required = false) SequencingOrder         sequencingOrder,
+                                                          
@RequestParam(required = false) int                     pageSize)
     {
-        final  String   methodName = "searchForEntities";
+        final  String   methodName = "findEntitiesByPropertyValue";
 
         EntityListResponse response = new EntityListResponse();
 
@@ -1975,15 +1988,16 @@ public class OMRSRepositoryRESTServices
         {
             validateLocalRepository(methodName);
 
-            List<EntityDetail>  entities = 
localMetadataCollection.searchForEntities(userId,
-                                                                               
      searchCriteria,
-                                                                               
      fromEntityElement,
-                                                                               
      limitResultsByStatus,
-                                                                               
      limitResultsByClassification,
-                                                                               
      asOfTime,
-                                                                               
      sequencingProperty,
-                                                                               
      sequencingOrder,
-                                                                               
      pageSize);
+            List<EntityDetail>  entities = 
localMetadataCollection.findEntitiesByPropertyValue(userId,
+                                                                               
                entityTypeGUID,
+                                                                               
                searchCriteria,
+                                                                               
                fromEntityElement,
+                                                                               
                limitResultsByStatus,
+                                                                               
                limitResultsByClassification,
+                                                                               
                asOfTime,
+                                                                               
                sequencingProperty,
+                                                                               
                sequencingOrder,
+                                                                               
                pageSize);
             response.setEntities(entities);
             if (entities != null)
             {
@@ -1991,7 +2005,7 @@ public class OMRSRepositoryRESTServices
                 response.setPageSize(pageSize);
                 if (entities.size() == pageSize)
                 {
-                    final String urlTemplate = 
"{0}/instances/entities/by-search-criteria?searchCriteria={1}?fromEntityElement={2}&limitResultsByStatus={3}&limitResultsByClassification={4}&asOfTime={5}&sequencingProperty={6}&sequencingOrder={7}&pageSize={8}";
+                    final String urlTemplate = 
"{0}/instances/entities/by-property-value?entityTypeGUID={1}&searchCriteria={2}&fromEntityElement={3}&limitResultsByStatus={4}&limitResultsByClassification={5}&asOfTime={6}&sequencingProperty={7}&sequencingOrder={8}&pageSize={9}";
 
                     response.setNextPageURL(formatNextPageURL(localServerURL + 
urlTemplate,
                                                               userId,
@@ -2157,7 +2171,7 @@ public class OMRSRepositoryRESTServices
      *
      * @param userId - unique identifier for requesting user.
      * @param guid - String unique identifier for the relationship.
-     * @param asOfTime - the time used to determine which versionName of the 
entity that is desired.
+     * @param asOfTime - the time used to determine which version of the 
entity that is desired.
      * @return RelationshipResponse:
      * a relationship structure or
      * InvalidParameterException - the guid or date is null or the asOfTime 
property is for a future time or
@@ -2335,6 +2349,7 @@ public class OMRSRepositoryRESTServices
      * Return a list of relationships that match the search criteria.  The 
results can be paged.
      *
      * @param userId - unique identifier for requesting user.
+     * @param relationshipTypeGUID - unique identifier of a relationship type 
(or null for all types of relationship.
      * @param searchCriteria - String expression of the characteristics of the 
required relationships.
      * @param fromRelationshipElement - Element number of the results to skip 
to when building the results list
      *                                to return.  Zero means begin at the 
start of the results.  This is used
@@ -2359,18 +2374,19 @@ public class OMRSRepositoryRESTServices
      * FunctionNotSupportedException - the repository does not support 
satOfTime parameter or
      * UserNotAuthorizedException - the userId is not permitted to perform 
this operation.
      */
-    @RequestMapping(method = RequestMethod.GET, path = 
"/{userId}/instances/relationships/by-search-criteria")
-
-    public  RelationshipListResponse searchForRelationships(@PathVariable      
             String                    userId,
-                                                            @RequestParam      
             String                    searchCriteria,
-                                                            
@RequestParam(required = false) int                       
fromRelationshipElement,
-                                                            
@RequestParam(required = false) List<InstanceStatus>      limitResultsByStatus,
-                                                            
@RequestParam(required = false) Date                      asOfTime,
-                                                            
@RequestParam(required = false) String                    sequencingProperty,
-                                                            
@RequestParam(required = false) SequencingOrder           sequencingOrder,
-                                                            
@RequestParam(required = false) int                       pageSize)
+    @RequestMapping(method = RequestMethod.GET, path = 
"/{userId}/instances/relationships/by-property-value")
+
+    public  RelationshipListResponse 
findRelationshipsByPropertyValue(@PathVariable                   String         
           userId,
+                                                                      
@RequestParam(required = false) String                    relationshipTypeGUID,
+                                                                      
@RequestParam                   String                    searchCriteria,
+                                                                      
@RequestParam(required = false) int                       
fromRelationshipElement,
+                                                                      
@RequestParam(required = false) List<InstanceStatus>      limitResultsByStatus,
+                                                                      
@RequestParam(required = false) Date                      asOfTime,
+                                                                      
@RequestParam(required = false) String                    sequencingProperty,
+                                                                      
@RequestParam(required = false) SequencingOrder           sequencingOrder,
+                                                                      
@RequestParam(required = false) int                       pageSize)
     {
-        final  String   methodName = "searchForRelationships";
+        final  String   methodName = "findRelationshipsByPropertyValue";
 
         RelationshipListResponse response = new RelationshipListResponse();
 
@@ -2378,14 +2394,15 @@ public class OMRSRepositoryRESTServices
         {
             validateLocalRepository(methodName);
 
-            List<Relationship>  relationships = 
localMetadataCollection.searchForRelationships(userId,
-                                                                               
                searchCriteria,
-                                                                               
                fromRelationshipElement,
-                                                                               
                limitResultsByStatus,
-                                                                               
                asOfTime,
-                                                                               
                sequencingProperty,
-                                                                               
                sequencingOrder,
-                                                                               
                pageSize);
+            List<Relationship>  relationships = 
localMetadataCollection.findRelationshipsByPropertyValue(userId,
+                                                                               
                          relationshipTypeGUID,
+                                                                               
                          searchCriteria,
+                                                                               
                          fromRelationshipElement,
+                                                                               
                          limitResultsByStatus,
+                                                                               
                          asOfTime,
+                                                                               
                          sequencingProperty,
+                                                                               
                          sequencingOrder,
+                                                                               
                          pageSize);
             response.setRelationships(relationships);
             if (relationships != null)
             {
@@ -2393,7 +2410,7 @@ public class OMRSRepositoryRESTServices
                 response.setPageSize(pageSize);
                 if (response.getRelationships().size() == pageSize)
                 {
-                    final String urlTemplate = 
"{0}/instances/relationships/by-search-criteria?searchCriteria={1}?fromRelationshipElement={2}&limitResultsByStatus={3}&asOfTime={4}&sequencingProperty={5}&sequencingOrder={6}&pageSize={7}";
+                    final String urlTemplate = 
"{0}/instances/relationships/by-property-value?relationshipTypeGUID={1}&searchCriteria={2}?fromRelationshipElement={3}&limitResultsByStatus={4}&asOfTime={5}&sequencingProperty={6}&sequencingOrder={7}&pageSize={8}";
 
                     response.setNextPageURL(formatNextPageURL(localServerURL + 
urlTemplate,
                                                               userId,
@@ -4314,7 +4331,7 @@ public class OMRSRepositoryRESTServices
      * Save the entity as a reference copy.  The id of the home metadata 
collection is already set up in the
      * entity.
      *
-     * @param serverName - unique identifier for requesting user.
+     * @param userId - unique identifier for requesting user.
      * @param entity - details of the entity to save.
      * @return VoidResponse:
      * void or
@@ -4332,9 +4349,9 @@ public class OMRSRepositoryRESTServices
      * FunctionNotSupportedException - the repository does not support 
instance reference copies or
      * UserNotAuthorizedException - the userId is not permitted to perform 
this operation.
      */
-    @RequestMapping(method = RequestMethod.PATCH, path = 
"/{serverName}/instances/entities/reference-copy")
+    @RequestMapping(method = RequestMethod.PATCH, path = 
"/{userId}/instances/entities/reference-copy")
 
-    public VoidResponse saveEntityReferenceCopy(@PathVariable String         
serverName,
+    public VoidResponse saveEntityReferenceCopy(@PathVariable String         
userId,
                                                 @RequestParam EntityDetail   
entity)
     {
         final  String   methodName = "saveEntityReferenceCopy";
@@ -4345,7 +4362,7 @@ public class OMRSRepositoryRESTServices
         {
             validateLocalRepository(methodName);
 
-            localMetadataCollection.saveEntityReferenceCopy(serverName, 
entity);
+            localMetadataCollection.saveEntityReferenceCopy(userId, entity);
         }
         catch (RepositoryErrorException  error)
         {
@@ -4393,7 +4410,7 @@ public class OMRSRepositoryRESTServices
      * remove reference copies from the local cohort, repositories that have 
left the cohort,
      * or entities that have come from open metadata archives.
      *
-     * @param serverName - unique identifier for requesting server.
+     * @param userId - unique identifier for requesting server.
      * @param entityGUID - the unique identifier for the entity.
      * @param typeDefGUID - the guid of the TypeDef for the relationship - 
used to verify the relationship identity.
      * @param typeDefName - the name of the TypeDef for the relationship - 
used to verify the relationship identity.
@@ -4409,9 +4426,9 @@ public class OMRSRepositoryRESTServices
      * FunctionNotSupportedException - the repository does not support 
instance reference copies or
      * UserNotAuthorizedException - the userId is not permitted to perform 
this operation.
      */
-    @RequestMapping(method = RequestMethod.PATCH, path = 
"/{serverName}/instances/entities/reference-copy/{entityGUID}/purge")
+    @RequestMapping(method = RequestMethod.PATCH, path = 
"/{userId}/instances/entities/reference-copy/{entityGUID}/purge")
 
-    public VoidResponse purgeEntityReferenceCopy(@PathVariable String   
serverName,
+    public VoidResponse purgeEntityReferenceCopy(@PathVariable String   userId,
                                                  @PathVariable String   
entityGUID,
                                                  @RequestParam String   
typeDefGUID,
                                                  @RequestParam String   
typeDefName,
@@ -4425,7 +4442,7 @@ public class OMRSRepositoryRESTServices
         {
             validateLocalRepository(methodName);
 
-            localMetadataCollection.purgeEntityReferenceCopy(serverName,
+            localMetadataCollection.purgeEntityReferenceCopy(userId,
                                                              entityGUID,
                                                              typeDefGUID,
                                                              typeDefName,
@@ -4463,7 +4480,7 @@ public class OMRSRepositoryRESTServices
      * The local repository has requested that the repository that hosts the 
home metadata collection for the
      * specified entity sends out the details of this entity so the local 
repository can create a reference copy.
      *
-     * @param serverName - unique identifier for requesting server.
+     * @param userId - unique identifier for requesting server.
      * @param entityGUID - unique identifier of requested entity.
      * @param typeDefGUID - unique identifier of requested entity's TypeDef.
      * @param typeDefName - unique name of requested entity's TypeDef.
@@ -4479,9 +4496,9 @@ public class OMRSRepositoryRESTServices
      * FunctionNotSupportedException - the repository does not support 
instance reference copies or
      * UserNotAuthorizedException - the userId is not permitted to perform 
this operation.
      */
-    @RequestMapping(method = RequestMethod.PATCH, path = 
"/{serverName}/instances/entities/reference-copy/{entityGUID}/refresh")
+    @RequestMapping(method = RequestMethod.PATCH, path = 
"/{userId}/instances/entities/reference-copy/{entityGUID}/refresh")
 
-    public VoidResponse refreshEntityReferenceCopy(@PathVariable String   
serverName,
+    public VoidResponse refreshEntityReferenceCopy(@PathVariable String   
userId,
                                                    @PathVariable String   
entityGUID,
                                                    @RequestParam String   
typeDefGUID,
                                                    @RequestParam String   
typeDefName,
@@ -4495,7 +4512,7 @@ public class OMRSRepositoryRESTServices
         {
             validateLocalRepository(methodName);
 
-            localMetadataCollection.refreshEntityReferenceCopy(serverName,
+            localMetadataCollection.refreshEntityReferenceCopy(userId,
                                                                entityGUID,
                                                                typeDefGUID,
                                                                typeDefName,
@@ -4533,7 +4550,7 @@ public class OMRSRepositoryRESTServices
      * Save the relationship as a reference copy.  The id of the home metadata 
collection is already set up in the
      * relationship.
      *
-     * @param serverName - unique identifier for requesting serverName.
+     * @param userId - unique identifier for requesting userId.
      * @param relationship - relationship to save.
      * @return VoidResponse:
      * void or
@@ -4553,9 +4570,9 @@ public class OMRSRepositoryRESTServices
      * FunctionNotSupportedException - the repository does not support 
instance reference copies or
      * UserNotAuthorizedException - the userId is not permitted to perform 
this operation.
      */
-    @RequestMapping(method = RequestMethod.PATCH, path = 
"/{serverName}/instances/relationships/reference-copy")
+    @RequestMapping(method = RequestMethod.PATCH, path = 
"/{userId}/instances/relationships/reference-copy")
 
-    public VoidResponse saveRelationshipReferenceCopy(@PathVariable String     
    serverName,
+    public VoidResponse saveRelationshipReferenceCopy(@PathVariable String     
    userId,
                                                       @RequestParam 
Relationship   relationship)
     {
         final  String   methodName = "saveRelationshipReferenceCopy";
@@ -4566,7 +4583,7 @@ public class OMRSRepositoryRESTServices
         {
             validateLocalRepository(methodName);
 
-            localMetadataCollection.saveRelationshipReferenceCopy(serverName, 
relationship);
+            localMetadataCollection.saveRelationshipReferenceCopy(userId, 
relationship);
         }
         catch (RepositoryErrorException  error)
         {
@@ -4620,7 +4637,7 @@ public class OMRSRepositoryRESTServices
      * remove reference copies from the local cohort, repositories that have 
left the cohort,
      * or relationships that have come from open metadata archives.
      *
-     * @param serverName - unique identifier for requesting server.
+     * @param userId - unique identifier for requesting server.
      * @param relationshipGUID - the unique identifier for the relationship.
      * @param typeDefGUID - the guid of the TypeDef for the relationship - 
used to verify the relationship identity.
      * @param typeDefName - the name of the TypeDef for the relationship - 
used to verify the relationship identity.
@@ -4636,9 +4653,9 @@ public class OMRSRepositoryRESTServices
      * FunctionNotSupportedException - the repository does not support 
instance reference copies or
      * UserNotAuthorizedException - the userId is not permitted to perform 
this operation.
      */
-    @RequestMapping(method = RequestMethod.PATCH, path = 
"/{serverName}/instances/relationships/reference-copy/{relationshipGUID}/purge")
+    @RequestMapping(method = RequestMethod.PATCH, path = 
"/{userId}/instances/relationships/reference-copy/{relationshipGUID}/purge")
 
-    public VoidResponse purgeRelationshipReferenceCopy(@PathVariable String   
serverName,
+    public VoidResponse purgeRelationshipReferenceCopy(@PathVariable String   
userId,
                                                        @PathVariable String   
relationshipGUID,
                                                        @RequestParam String   
typeDefGUID,
                                                        @RequestParam String   
typeDefName,
@@ -4652,7 +4669,7 @@ public class OMRSRepositoryRESTServices
         {
             validateLocalRepository(methodName);
 
-            localMetadataCollection.purgeRelationshipReferenceCopy(serverName,
+            localMetadataCollection.purgeRelationshipReferenceCopy(userId,
                                                                    
relationshipGUID,
                                                                    typeDefGUID,
                                                                    typeDefName,
@@ -4691,7 +4708,7 @@ public class OMRSRepositoryRESTServices
      * specified relationship sends out the details of this relationship so 
the local repository can create a
      * reference copy.
      *
-     * @param serverName - unique identifier for requesting server.
+     * @param userId - unique identifier for requesting server.
      * @param relationshipGUID - unique identifier of the relationship.
      * @param typeDefGUID - the guid of the TypeDef for the relationship - 
used to verify the relationship identity.
      * @param typeDefName - the name of the TypeDef for the relationship - 
used to verify the relationship identity.
@@ -4707,9 +4724,9 @@ public class OMRSRepositoryRESTServices
      * FunctionNotSupportedException - the repository does not support 
instance reference copies or
      * UserNotAuthorizedException - the userId is not permitted to perform 
this operation.
      */
-    @RequestMapping(method = RequestMethod.PATCH, path = 
"/{serverName}/instances/relationships/reference-copy/{relationshipGUID}/refresh")
+    @RequestMapping(method = RequestMethod.PATCH, path = 
"/{userId}/instances/relationships/reference-copy/{relationshipGUID}/refresh")
 
-    public VoidResponse refreshRelationshipReferenceCopy(@PathVariable String 
serverName,
+    public VoidResponse refreshRelationshipReferenceCopy(@PathVariable String 
userId,
                                                          @PathVariable String 
relationshipGUID,
                                                          @RequestParam String 
typeDefGUID,
                                                          @RequestParam String 
typeDefName,
@@ -4723,7 +4740,7 @@ public class OMRSRepositoryRESTServices
         {
             validateLocalRepository(methodName);
 
-            
localMetadataCollection.refreshRelationshipReferenceCopy(serverName,
+            localMetadataCollection.refreshRelationshipReferenceCopy(userId,
                                                                      
relationshipGUID,
                                                                      
typeDefGUID,
                                                                      
typeDefName,
@@ -5148,8 +5165,11 @@ public class OMRSRepositoryRESTServices
      *
      * @param response - REST Response
      * @param error returned response.
+     * @param exceptionClassName - class name of the exception to recreate
      */
-    private void captureCheckedException(OMRSRESTAPIResponse response, 
OMRSCheckedExceptionBase error, String exceptionClassName)
+    private void captureCheckedException(OMRSRESTAPIResponse      response,
+                                         OMRSCheckedExceptionBase error,
+                                         String                   
exceptionClassName)
     {
         response.setRelatedHTTPCode(error.getReportedHTTPCode());
         response.setExceptionClassName(exceptionClassName);

http://git-wip-us.apache.org/repos/asf/atlas/blob/f57fd7f0/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopic.java
----------------------------------------------------------------------
diff --git 
a/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopic.java 
b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopic.java
index 0fec97c..e58d97d 100644
--- a/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopic.java
+++ b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopic.java
@@ -21,7 +21,7 @@ package org.apache.atlas.omrs.topicconnectors;
 import org.apache.atlas.omrs.eventmanagement.events.v1.OMRSEventV1;
 
 /**
- * OMRSTopic defines the interface to the messaging Topic for OMRS Events.  It 
implemented by the OMTSTopicConnector.
+ * OMRSTopic defines the interface to the messaging Topic for OMRS Events.  It 
implemented by the OMRSTopicConnector.
  */
 public interface OMRSTopic
 {

http://git-wip-us.apache.org/repos/asf/atlas/blob/f57fd7f0/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopicConnector.java
----------------------------------------------------------------------
diff --git 
a/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopicConnector.java
 
b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopicConnector.java
index 8498620..f0572ea 100644
--- 
a/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopicConnector.java
+++ 
b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopicConnector.java
@@ -19,6 +19,8 @@ package org.apache.atlas.omrs.topicconnectors;
 
 import org.apache.atlas.ocf.ConnectorBase;
 import org.apache.atlas.ocf.ffdc.ConnectorCheckedException;
+import org.apache.atlas.ocf.properties.Connection;
+import org.apache.atlas.ocf.properties.Endpoint;
 import org.apache.atlas.omrs.auditlog.OMRSAuditCode;
 import org.apache.atlas.omrs.auditlog.OMRSAuditLog;
 import org.apache.atlas.omrs.auditlog.OMRSAuditingComponent;
@@ -27,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.List;
 
 
 /**
@@ -45,13 +48,22 @@ import java.util.ArrayList;
  *     </li>
  * </ul>
  */
-public abstract class OMRSTopicConnector extends ConnectorBase implements 
OMRSTopic
+public abstract class OMRSTopicConnector extends ConnectorBase implements 
OMRSTopic, Runnable
 {
     ArrayList<OMRSTopicListener> topicListeners     = new  ArrayList<>();
 
     private static final Logger       log      = 
LoggerFactory.getLogger(OMRSTopicConnector.class);
     private static final OMRSAuditLog auditLog = new 
OMRSAuditLog(OMRSAuditingComponent.OMRS_TOPIC_CONNECTOR);
 
+    private static final String       defaultThreadName = "OMRSTopicListener";
+    private static final String       defaultTopicName  = "OMRSTopic";
+
+    private volatile boolean keepRunning = false;
+
+    private String   listenerThreadName = defaultThreadName;
+    private String   topicName = defaultTopicName;
+    private int      sleepTime = 100;
+
     /**
      * Simple constructor
      */
@@ -62,6 +74,68 @@ public abstract class OMRSTopicConnector extends 
ConnectorBase implements OMRSTo
 
 
     /**
+     * Return the name of the topic for this connector.
+     *
+     * @return String topic name.
+     */
+    public String getTopicName()
+    {
+        return topicName;
+    }
+
+    /**
+     * This is the method called by the listener thread when it starts.
+     */
+    public void run()
+    {
+        OMRSAuditCode auditCode = OMRSAuditCode.OMRS_TOPIC_LISTENER_START;
+        auditLog.logRecord(listenerThreadName,
+                           auditCode.getLogMessageId(),
+                           auditCode.getSeverity(),
+                           auditCode.getFormattedLogMessage(topicName),
+                           this.getConnection().toString(),
+                           auditCode.getSystemAction(),
+                           auditCode.getUserAction());
+
+        while (keepRunning)
+        {
+            try
+            {
+                List<OMRSEventV1> receivedEvents = this.checkForEvents();
+
+                if (receivedEvents != null)
+                {
+                    for (OMRSEventV1  event : receivedEvents)
+                    {
+                        if (event != null)
+                        {
+                            this.distributeEvent(event);
+                        }
+                    }
+                }
+                else
+                {
+                    Thread.sleep(sleepTime);
+                }
+            }
+            catch (InterruptedException   wakeUp)
+            {
+
+            }
+        }
+
+        auditCode = OMRSAuditCode.OMRS_TOPIC_LISTENER_SHUTDOWN;
+        auditLog.logRecord(listenerThreadName,
+                           auditCode.getLogMessageId(),
+                           auditCode.getSeverity(),
+                           auditCode.getFormattedLogMessage(topicName),
+                           this.getConnection().toString(),
+                           auditCode.getSystemAction(),
+                           auditCode.getUserAction());
+    }
+
+
+    /**
      * Pass an event that has been received on the topic to each of the 
registered listeners.
      *
      * @param event - OMRSEvent to distribute
@@ -76,7 +150,7 @@ public abstract class OMRSTopicConnector extends 
ConnectorBase implements OMRSTo
             }
             catch (Throwable  error)
             {
-                final String   actionDescription = "Initialize Repository 
Operational Services";
+                final String   actionDescription = "distributeEvent";
 
                 OMRSAuditCode auditCode = OMRSAuditCode.EVENT_PROCESSING_ERROR;
                 auditLog.logRecord(actionDescription,
@@ -92,6 +166,17 @@ public abstract class OMRSTopicConnector extends 
ConnectorBase implements OMRSTo
 
 
     /**
+     * Look to see if there is one of more new events to process.
+     *
+     * @return a list of received events or null
+     */
+    protected List<OMRSEventV1> checkForEvents()
+    {
+        return null;
+    }
+
+
+    /**
      * Register a listener object.  This object will be supplied with all of 
the events received on the topic.
      *
      * @param topicListener - object implementing the OMRSTopicListener 
interface
@@ -113,6 +198,22 @@ public abstract class OMRSTopicConnector extends 
ConnectorBase implements OMRSTo
     public void start() throws ConnectorCheckedException
     {
         super.start();
+
+        keepRunning = true;
+
+        if (super.connection != null)
+        {
+            Endpoint endpoint = super.connection.getEndpoint();
+
+            if (endpoint != null)
+            {
+                topicName = endpoint.getAddress();
+                listenerThreadName = defaultThreadName + ": " + topicName;
+            }
+        }
+
+        Thread listenerThread = new Thread(this, listenerThreadName);
+        listenerThread.start();
     }
 
 
@@ -124,5 +225,7 @@ public abstract class OMRSTopicConnector extends 
ConnectorBase implements OMRSTo
     public  void disconnect() throws ConnectorCheckedException
     {
         super.disconnect();
+
+        keepRunning = false;
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/f57fd7f0/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicConnector.java
----------------------------------------------------------------------
diff --git 
a/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicConnector.java
 
b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicConnector.java
new file mode 100644
index 0000000..f969923
--- /dev/null
+++ 
b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicConnector.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.omrs.topicconnectors.inmemory;
+
+import org.apache.atlas.ocf.ffdc.ConnectorCheckedException;
+import org.apache.atlas.omrs.eventmanagement.events.v1.OMRSEventV1;
+import org.apache.atlas.omrs.topicconnectors.OMRSTopicConnector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * InMemoryOMRSTopicConnector provides a concrete implementation of the 
OMRSTopicConnector that
+ * uses an in-memory list as the event/messaging infrastructure.
+ */
+public class InMemoryOMRSTopicConnector extends OMRSTopicConnector
+{
+    private volatile List<OMRSEventV1>   inMemoryOMRSTopic = new ArrayList<>();
+
+    public InMemoryOMRSTopicConnector()
+    {
+        super();
+    }
+
+    /**
+     * Supports putting events to the in memory OMRS Topic
+     *
+     * @param newEvent - event to publish
+     */
+    private synchronized void putEvent(OMRSEventV1  newEvent)
+    {
+        inMemoryOMRSTopic.add(newEvent);
+    }
+
+
+    /**
+     * Returns all of the events found on the in memory OMRS Topic.
+     *
+     * @return list of received events.
+     */
+    private synchronized List<OMRSEventV1> getEvents()
+    {
+        List<OMRSEventV1>   receivedEvents = inMemoryOMRSTopic;
+        inMemoryOMRSTopic = new ArrayList<>();
+
+        return receivedEvents;
+    }
+
+
+    /**
+     * Sends the supplied event to the topic.
+     *
+     * @param event - OMRSEvent object containing the event properties.
+     */
+    public void sendEvent(OMRSEventV1 event)
+    {
+        this.putEvent(event);
+
+    }
+
+
+    /**
+     * Look to see if there is one of more new events to process.
+     *
+     * @return a list of received events or null
+     */
+    protected List<OMRSEventV1> checkForEvents()
+    {
+        return this.getEvents();
+    }
+
+
+    /**
+     * Indicates that the connector is completely configured and can begin 
processing.
+     *
+     * @throws ConnectorCheckedException - there is a problem within the 
connector.
+     */
+    public void start() throws ConnectorCheckedException
+    {
+        super.start();
+    }
+
+
+    /**
+     * Free up any resources held since the connector is no longer needed.
+     *
+     * @throws ConnectorCheckedException - there is a problem within the 
connector.
+     */
+    public  void disconnect() throws ConnectorCheckedException
+    {
+        super.disconnect();
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/f57fd7f0/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicProvider.java
----------------------------------------------------------------------
diff --git 
a/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicProvider.java
 
b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicProvider.java
new file mode 100644
index 0000000..e2d641a
--- /dev/null
+++ 
b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.omrs.topicconnectors.inmemory;
+
+import org.apache.atlas.ocf.ConnectorProviderBase;
+import org.apache.atlas.omrs.topicconnectors.kafka.KafkaOMRSTopicConnector;
+
+
+/**
+ * InMemoryOMRSTopicProvider provides implementation of the connector provider 
for the InMemoryOMRSTopicConnector.
+ */
+public class InMemoryOMRSTopicProvider extends ConnectorProviderBase
+{
+    /**
+     * Constructor used to initialize the ConnectorProviderBase with the Java 
class name of the specific
+     * OMRS Connector implementation.
+     */
+    public InMemoryOMRSTopicProvider()
+    {
+        Class    connectorClass = InMemoryOMRSTopicConnector.class;
+
+        super.setConnectorClassName(connectorClass.getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/f57fd7f0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a6d1268..9c9d746 100644
--- a/pom.xml
+++ b/pom.xml
@@ -738,6 +738,9 @@
         <module>om-fwk-ocf</module>
         <module>omrs</module>
         <module>omag-api</module>
+        <module>omag-server</module>
+        <module>omas-connectedasset</module>
+        <module>omas-assetconsumer</module>
         <module>intg</module>
         <module>common</module>
         <module>server-api</module>

Reply via email to