Repository: atlas Updated Branches: refs/heads/master 9a4ca16d7 -> f57fd7f0f
http://git-wip-us.apache.org/repos/asf/atlas/blob/f57fd7f0/omrs/src/main/java/org/apache/atlas/omrs/rest/server/OMRSRepositoryRESTServices.java ---------------------------------------------------------------------- diff --git a/omrs/src/main/java/org/apache/atlas/omrs/rest/server/OMRSRepositoryRESTServices.java b/omrs/src/main/java/org/apache/atlas/omrs/rest/server/OMRSRepositoryRESTServices.java index 237a2b0..7ad981e 100644 --- a/omrs/src/main/java/org/apache/atlas/omrs/rest/server/OMRSRepositoryRESTServices.java +++ b/omrs/src/main/java/org/apache/atlas/omrs/rest/server/OMRSRepositoryRESTServices.java @@ -104,8 +104,17 @@ public class OMRSRepositoryRESTServices public static void setLocalRepository(LocalOMRSRepositoryConnector localRepositoryConnector, String localServerURL) { - OMRSRepositoryRESTServices.localRepositoryConnector = localRepositoryConnector; - OMRSRepositoryRESTServices.localMetadataCollection = localRepositoryConnector.getMetadataCollection(); + try + { + OMRSRepositoryRESTServices.localRepositoryConnector = localRepositoryConnector; + OMRSRepositoryRESTServices.localMetadataCollection = localRepositoryConnector.getMetadataCollection(); + } + catch (Throwable error) + { + OMRSRepositoryRESTServices.localRepositoryConnector = null; + OMRSRepositoryRESTServices.localMetadataCollection = null; + } + OMRSRepositoryRESTServices.localServerURL = localServerURL; } @@ -433,7 +442,7 @@ public class OMRSRepositoryRESTServices * RepositoryErrorException - there is a problem communicating with the metadata repository or * UserNotAuthorizedException - the userId is not permitted to perform this operation. */ - @RequestMapping(method = RequestMethod.GET, path = "/{userId}/types/typedefs/by-search-criteria") + @RequestMapping(method = RequestMethod.GET, path = "/{userId}/types/typedefs/by-property-value") public TypeDefListResponse searchForTypeDefs(@PathVariable String userId, @RequestParam String searchCriteria) @@ -1492,11 +1501,11 @@ public class OMRSRepositoryRESTServices /** - * Return a historical versionName of an entity - includes the header, classifications and properties of the entity. + * Return a historical version of an entity - includes the header, classifications and properties of the entity. * * @param userId - unique identifier for requesting user. * @param guid - String unique identifier for the entity. - * @param asOfTime - the time used to determine which versionName of the entity that is desired. + * @param asOfTime - the time used to determine which version of the entity that is desired. * @return EnityDetailResponse: * EntityDetail structure or * InvalidParameterException - the guid or date is null or the asOfTime property is for a future time or @@ -1926,9 +1935,12 @@ public class OMRSRepositoryRESTServices /** - * Return a list of entities matching the search criteria. + * Return a list of entities whose string based property values match the search criteria. The + * search criteria may include regex style wild cards. * * @param userId - unique identifier for requesting user. + * @param entityTypeGUID - GUID of the type of entity to search for. Null means all types will + * be searched (could be slow so not recommended). * @param searchCriteria - String expression of the characteristics of the required relationships. * @param fromEntityElement - the starting element number of the entities to return. * This is used when retrieving elements @@ -1955,19 +1967,20 @@ public class OMRSRepositoryRESTServices * FunctionNotSupportedException - the repository does not support satOfTime parameter or * UserNotAuthorizedException - the userId is not permitted to perform this operation. */ - @RequestMapping(method = RequestMethod.GET, path = "/{userId}/instances/entities/by-search-criteria") - - public EntityListResponse searchForEntities(@PathVariable String userId, - @RequestParam String searchCriteria, - @RequestParam(required = false) int fromEntityElement, - @RequestParam(required = false) List<InstanceStatus> limitResultsByStatus, - @RequestParam(required = false) List<String> limitResultsByClassification, - @RequestParam(required = false) Date asOfTime, - @RequestParam(required = false) String sequencingProperty, - @RequestParam(required = false) SequencingOrder sequencingOrder, - @RequestParam(required = false) int pageSize) + @RequestMapping(method = RequestMethod.GET, path = "/{userId}/instances/entities/by-property-value") + + public EntityListResponse fndEntitiesByPropertyValue(@PathVariable String userId, + @RequestParam(required = false) String entityTypeGUID, + @RequestParam String searchCriteria, + @RequestParam(required = false) int fromEntityElement, + @RequestParam(required = false) List<InstanceStatus> limitResultsByStatus, + @RequestParam(required = false) List<String> limitResultsByClassification, + @RequestParam(required = false) Date asOfTime, + @RequestParam(required = false) String sequencingProperty, + @RequestParam(required = false) SequencingOrder sequencingOrder, + @RequestParam(required = false) int pageSize) { - final String methodName = "searchForEntities"; + final String methodName = "findEntitiesByPropertyValue"; EntityListResponse response = new EntityListResponse(); @@ -1975,15 +1988,16 @@ public class OMRSRepositoryRESTServices { validateLocalRepository(methodName); - List<EntityDetail> entities = localMetadataCollection.searchForEntities(userId, - searchCriteria, - fromEntityElement, - limitResultsByStatus, - limitResultsByClassification, - asOfTime, - sequencingProperty, - sequencingOrder, - pageSize); + List<EntityDetail> entities = localMetadataCollection.findEntitiesByPropertyValue(userId, + entityTypeGUID, + searchCriteria, + fromEntityElement, + limitResultsByStatus, + limitResultsByClassification, + asOfTime, + sequencingProperty, + sequencingOrder, + pageSize); response.setEntities(entities); if (entities != null) { @@ -1991,7 +2005,7 @@ public class OMRSRepositoryRESTServices response.setPageSize(pageSize); if (entities.size() == pageSize) { - final String urlTemplate = "{0}/instances/entities/by-search-criteria?searchCriteria={1}?fromEntityElement={2}&limitResultsByStatus={3}&limitResultsByClassification={4}&asOfTime={5}&sequencingProperty={6}&sequencingOrder={7}&pageSize={8}"; + final String urlTemplate = "{0}/instances/entities/by-property-value?entityTypeGUID={1}&searchCriteria={2}&fromEntityElement={3}&limitResultsByStatus={4}&limitResultsByClassification={5}&asOfTime={6}&sequencingProperty={7}&sequencingOrder={8}&pageSize={9}"; response.setNextPageURL(formatNextPageURL(localServerURL + urlTemplate, userId, @@ -2157,7 +2171,7 @@ public class OMRSRepositoryRESTServices * * @param userId - unique identifier for requesting user. * @param guid - String unique identifier for the relationship. - * @param asOfTime - the time used to determine which versionName of the entity that is desired. + * @param asOfTime - the time used to determine which version of the entity that is desired. * @return RelationshipResponse: * a relationship structure or * InvalidParameterException - the guid or date is null or the asOfTime property is for a future time or @@ -2335,6 +2349,7 @@ public class OMRSRepositoryRESTServices * Return a list of relationships that match the search criteria. The results can be paged. * * @param userId - unique identifier for requesting user. + * @param relationshipTypeGUID - unique identifier of a relationship type (or null for all types of relationship. * @param searchCriteria - String expression of the characteristics of the required relationships. * @param fromRelationshipElement - Element number of the results to skip to when building the results list * to return. Zero means begin at the start of the results. This is used @@ -2359,18 +2374,19 @@ public class OMRSRepositoryRESTServices * FunctionNotSupportedException - the repository does not support satOfTime parameter or * UserNotAuthorizedException - the userId is not permitted to perform this operation. */ - @RequestMapping(method = RequestMethod.GET, path = "/{userId}/instances/relationships/by-search-criteria") - - public RelationshipListResponse searchForRelationships(@PathVariable String userId, - @RequestParam String searchCriteria, - @RequestParam(required = false) int fromRelationshipElement, - @RequestParam(required = false) List<InstanceStatus> limitResultsByStatus, - @RequestParam(required = false) Date asOfTime, - @RequestParam(required = false) String sequencingProperty, - @RequestParam(required = false) SequencingOrder sequencingOrder, - @RequestParam(required = false) int pageSize) + @RequestMapping(method = RequestMethod.GET, path = "/{userId}/instances/relationships/by-property-value") + + public RelationshipListResponse findRelationshipsByPropertyValue(@PathVariable String userId, + @RequestParam(required = false) String relationshipTypeGUID, + @RequestParam String searchCriteria, + @RequestParam(required = false) int fromRelationshipElement, + @RequestParam(required = false) List<InstanceStatus> limitResultsByStatus, + @RequestParam(required = false) Date asOfTime, + @RequestParam(required = false) String sequencingProperty, + @RequestParam(required = false) SequencingOrder sequencingOrder, + @RequestParam(required = false) int pageSize) { - final String methodName = "searchForRelationships"; + final String methodName = "findRelationshipsByPropertyValue"; RelationshipListResponse response = new RelationshipListResponse(); @@ -2378,14 +2394,15 @@ public class OMRSRepositoryRESTServices { validateLocalRepository(methodName); - List<Relationship> relationships = localMetadataCollection.searchForRelationships(userId, - searchCriteria, - fromRelationshipElement, - limitResultsByStatus, - asOfTime, - sequencingProperty, - sequencingOrder, - pageSize); + List<Relationship> relationships = localMetadataCollection.findRelationshipsByPropertyValue(userId, + relationshipTypeGUID, + searchCriteria, + fromRelationshipElement, + limitResultsByStatus, + asOfTime, + sequencingProperty, + sequencingOrder, + pageSize); response.setRelationships(relationships); if (relationships != null) { @@ -2393,7 +2410,7 @@ public class OMRSRepositoryRESTServices response.setPageSize(pageSize); if (response.getRelationships().size() == pageSize) { - final String urlTemplate = "{0}/instances/relationships/by-search-criteria?searchCriteria={1}?fromRelationshipElement={2}&limitResultsByStatus={3}&asOfTime={4}&sequencingProperty={5}&sequencingOrder={6}&pageSize={7}"; + final String urlTemplate = "{0}/instances/relationships/by-property-value?relationshipTypeGUID={1}&searchCriteria={2}?fromRelationshipElement={3}&limitResultsByStatus={4}&asOfTime={5}&sequencingProperty={6}&sequencingOrder={7}&pageSize={8}"; response.setNextPageURL(formatNextPageURL(localServerURL + urlTemplate, userId, @@ -4314,7 +4331,7 @@ public class OMRSRepositoryRESTServices * Save the entity as a reference copy. The id of the home metadata collection is already set up in the * entity. * - * @param serverName - unique identifier for requesting user. + * @param userId - unique identifier for requesting user. * @param entity - details of the entity to save. * @return VoidResponse: * void or @@ -4332,9 +4349,9 @@ public class OMRSRepositoryRESTServices * FunctionNotSupportedException - the repository does not support instance reference copies or * UserNotAuthorizedException - the userId is not permitted to perform this operation. */ - @RequestMapping(method = RequestMethod.PATCH, path = "/{serverName}/instances/entities/reference-copy") + @RequestMapping(method = RequestMethod.PATCH, path = "/{userId}/instances/entities/reference-copy") - public VoidResponse saveEntityReferenceCopy(@PathVariable String serverName, + public VoidResponse saveEntityReferenceCopy(@PathVariable String userId, @RequestParam EntityDetail entity) { final String methodName = "saveEntityReferenceCopy"; @@ -4345,7 +4362,7 @@ public class OMRSRepositoryRESTServices { validateLocalRepository(methodName); - localMetadataCollection.saveEntityReferenceCopy(serverName, entity); + localMetadataCollection.saveEntityReferenceCopy(userId, entity); } catch (RepositoryErrorException error) { @@ -4393,7 +4410,7 @@ public class OMRSRepositoryRESTServices * remove reference copies from the local cohort, repositories that have left the cohort, * or entities that have come from open metadata archives. * - * @param serverName - unique identifier for requesting server. + * @param userId - unique identifier for requesting server. * @param entityGUID - the unique identifier for the entity. * @param typeDefGUID - the guid of the TypeDef for the relationship - used to verify the relationship identity. * @param typeDefName - the name of the TypeDef for the relationship - used to verify the relationship identity. @@ -4409,9 +4426,9 @@ public class OMRSRepositoryRESTServices * FunctionNotSupportedException - the repository does not support instance reference copies or * UserNotAuthorizedException - the userId is not permitted to perform this operation. */ - @RequestMapping(method = RequestMethod.PATCH, path = "/{serverName}/instances/entities/reference-copy/{entityGUID}/purge") + @RequestMapping(method = RequestMethod.PATCH, path = "/{userId}/instances/entities/reference-copy/{entityGUID}/purge") - public VoidResponse purgeEntityReferenceCopy(@PathVariable String serverName, + public VoidResponse purgeEntityReferenceCopy(@PathVariable String userId, @PathVariable String entityGUID, @RequestParam String typeDefGUID, @RequestParam String typeDefName, @@ -4425,7 +4442,7 @@ public class OMRSRepositoryRESTServices { validateLocalRepository(methodName); - localMetadataCollection.purgeEntityReferenceCopy(serverName, + localMetadataCollection.purgeEntityReferenceCopy(userId, entityGUID, typeDefGUID, typeDefName, @@ -4463,7 +4480,7 @@ public class OMRSRepositoryRESTServices * The local repository has requested that the repository that hosts the home metadata collection for the * specified entity sends out the details of this entity so the local repository can create a reference copy. * - * @param serverName - unique identifier for requesting server. + * @param userId - unique identifier for requesting server. * @param entityGUID - unique identifier of requested entity. * @param typeDefGUID - unique identifier of requested entity's TypeDef. * @param typeDefName - unique name of requested entity's TypeDef. @@ -4479,9 +4496,9 @@ public class OMRSRepositoryRESTServices * FunctionNotSupportedException - the repository does not support instance reference copies or * UserNotAuthorizedException - the userId is not permitted to perform this operation. */ - @RequestMapping(method = RequestMethod.PATCH, path = "/{serverName}/instances/entities/reference-copy/{entityGUID}/refresh") + @RequestMapping(method = RequestMethod.PATCH, path = "/{userId}/instances/entities/reference-copy/{entityGUID}/refresh") - public VoidResponse refreshEntityReferenceCopy(@PathVariable String serverName, + public VoidResponse refreshEntityReferenceCopy(@PathVariable String userId, @PathVariable String entityGUID, @RequestParam String typeDefGUID, @RequestParam String typeDefName, @@ -4495,7 +4512,7 @@ public class OMRSRepositoryRESTServices { validateLocalRepository(methodName); - localMetadataCollection.refreshEntityReferenceCopy(serverName, + localMetadataCollection.refreshEntityReferenceCopy(userId, entityGUID, typeDefGUID, typeDefName, @@ -4533,7 +4550,7 @@ public class OMRSRepositoryRESTServices * Save the relationship as a reference copy. The id of the home metadata collection is already set up in the * relationship. * - * @param serverName - unique identifier for requesting serverName. + * @param userId - unique identifier for requesting userId. * @param relationship - relationship to save. * @return VoidResponse: * void or @@ -4553,9 +4570,9 @@ public class OMRSRepositoryRESTServices * FunctionNotSupportedException - the repository does not support instance reference copies or * UserNotAuthorizedException - the userId is not permitted to perform this operation. */ - @RequestMapping(method = RequestMethod.PATCH, path = "/{serverName}/instances/relationships/reference-copy") + @RequestMapping(method = RequestMethod.PATCH, path = "/{userId}/instances/relationships/reference-copy") - public VoidResponse saveRelationshipReferenceCopy(@PathVariable String serverName, + public VoidResponse saveRelationshipReferenceCopy(@PathVariable String userId, @RequestParam Relationship relationship) { final String methodName = "saveRelationshipReferenceCopy"; @@ -4566,7 +4583,7 @@ public class OMRSRepositoryRESTServices { validateLocalRepository(methodName); - localMetadataCollection.saveRelationshipReferenceCopy(serverName, relationship); + localMetadataCollection.saveRelationshipReferenceCopy(userId, relationship); } catch (RepositoryErrorException error) { @@ -4620,7 +4637,7 @@ public class OMRSRepositoryRESTServices * remove reference copies from the local cohort, repositories that have left the cohort, * or relationships that have come from open metadata archives. * - * @param serverName - unique identifier for requesting server. + * @param userId - unique identifier for requesting server. * @param relationshipGUID - the unique identifier for the relationship. * @param typeDefGUID - the guid of the TypeDef for the relationship - used to verify the relationship identity. * @param typeDefName - the name of the TypeDef for the relationship - used to verify the relationship identity. @@ -4636,9 +4653,9 @@ public class OMRSRepositoryRESTServices * FunctionNotSupportedException - the repository does not support instance reference copies or * UserNotAuthorizedException - the userId is not permitted to perform this operation. */ - @RequestMapping(method = RequestMethod.PATCH, path = "/{serverName}/instances/relationships/reference-copy/{relationshipGUID}/purge") + @RequestMapping(method = RequestMethod.PATCH, path = "/{userId}/instances/relationships/reference-copy/{relationshipGUID}/purge") - public VoidResponse purgeRelationshipReferenceCopy(@PathVariable String serverName, + public VoidResponse purgeRelationshipReferenceCopy(@PathVariable String userId, @PathVariable String relationshipGUID, @RequestParam String typeDefGUID, @RequestParam String typeDefName, @@ -4652,7 +4669,7 @@ public class OMRSRepositoryRESTServices { validateLocalRepository(methodName); - localMetadataCollection.purgeRelationshipReferenceCopy(serverName, + localMetadataCollection.purgeRelationshipReferenceCopy(userId, relationshipGUID, typeDefGUID, typeDefName, @@ -4691,7 +4708,7 @@ public class OMRSRepositoryRESTServices * specified relationship sends out the details of this relationship so the local repository can create a * reference copy. * - * @param serverName - unique identifier for requesting server. + * @param userId - unique identifier for requesting server. * @param relationshipGUID - unique identifier of the relationship. * @param typeDefGUID - the guid of the TypeDef for the relationship - used to verify the relationship identity. * @param typeDefName - the name of the TypeDef for the relationship - used to verify the relationship identity. @@ -4707,9 +4724,9 @@ public class OMRSRepositoryRESTServices * FunctionNotSupportedException - the repository does not support instance reference copies or * UserNotAuthorizedException - the userId is not permitted to perform this operation. */ - @RequestMapping(method = RequestMethod.PATCH, path = "/{serverName}/instances/relationships/reference-copy/{relationshipGUID}/refresh") + @RequestMapping(method = RequestMethod.PATCH, path = "/{userId}/instances/relationships/reference-copy/{relationshipGUID}/refresh") - public VoidResponse refreshRelationshipReferenceCopy(@PathVariable String serverName, + public VoidResponse refreshRelationshipReferenceCopy(@PathVariable String userId, @PathVariable String relationshipGUID, @RequestParam String typeDefGUID, @RequestParam String typeDefName, @@ -4723,7 +4740,7 @@ public class OMRSRepositoryRESTServices { validateLocalRepository(methodName); - localMetadataCollection.refreshRelationshipReferenceCopy(serverName, + localMetadataCollection.refreshRelationshipReferenceCopy(userId, relationshipGUID, typeDefGUID, typeDefName, @@ -5148,8 +5165,11 @@ public class OMRSRepositoryRESTServices * * @param response - REST Response * @param error returned response. + * @param exceptionClassName - class name of the exception to recreate */ - private void captureCheckedException(OMRSRESTAPIResponse response, OMRSCheckedExceptionBase error, String exceptionClassName) + private void captureCheckedException(OMRSRESTAPIResponse response, + OMRSCheckedExceptionBase error, + String exceptionClassName) { response.setRelatedHTTPCode(error.getReportedHTTPCode()); response.setExceptionClassName(exceptionClassName); http://git-wip-us.apache.org/repos/asf/atlas/blob/f57fd7f0/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopic.java ---------------------------------------------------------------------- diff --git a/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopic.java b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopic.java index 0fec97c..e58d97d 100644 --- a/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopic.java +++ b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopic.java @@ -21,7 +21,7 @@ package org.apache.atlas.omrs.topicconnectors; import org.apache.atlas.omrs.eventmanagement.events.v1.OMRSEventV1; /** - * OMRSTopic defines the interface to the messaging Topic for OMRS Events. It implemented by the OMTSTopicConnector. + * OMRSTopic defines the interface to the messaging Topic for OMRS Events. It implemented by the OMRSTopicConnector. */ public interface OMRSTopic { http://git-wip-us.apache.org/repos/asf/atlas/blob/f57fd7f0/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopicConnector.java ---------------------------------------------------------------------- diff --git a/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopicConnector.java b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopicConnector.java index 8498620..f0572ea 100644 --- a/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopicConnector.java +++ b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/OMRSTopicConnector.java @@ -19,6 +19,8 @@ package org.apache.atlas.omrs.topicconnectors; import org.apache.atlas.ocf.ConnectorBase; import org.apache.atlas.ocf.ffdc.ConnectorCheckedException; +import org.apache.atlas.ocf.properties.Connection; +import org.apache.atlas.ocf.properties.Endpoint; import org.apache.atlas.omrs.auditlog.OMRSAuditCode; import org.apache.atlas.omrs.auditlog.OMRSAuditLog; import org.apache.atlas.omrs.auditlog.OMRSAuditingComponent; @@ -27,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.List; /** @@ -45,13 +48,22 @@ import java.util.ArrayList; * </li> * </ul> */ -public abstract class OMRSTopicConnector extends ConnectorBase implements OMRSTopic +public abstract class OMRSTopicConnector extends ConnectorBase implements OMRSTopic, Runnable { ArrayList<OMRSTopicListener> topicListeners = new ArrayList<>(); private static final Logger log = LoggerFactory.getLogger(OMRSTopicConnector.class); private static final OMRSAuditLog auditLog = new OMRSAuditLog(OMRSAuditingComponent.OMRS_TOPIC_CONNECTOR); + private static final String defaultThreadName = "OMRSTopicListener"; + private static final String defaultTopicName = "OMRSTopic"; + + private volatile boolean keepRunning = false; + + private String listenerThreadName = defaultThreadName; + private String topicName = defaultTopicName; + private int sleepTime = 100; + /** * Simple constructor */ @@ -62,6 +74,68 @@ public abstract class OMRSTopicConnector extends ConnectorBase implements OMRSTo /** + * Return the name of the topic for this connector. + * + * @return String topic name. + */ + public String getTopicName() + { + return topicName; + } + + /** + * This is the method called by the listener thread when it starts. + */ + public void run() + { + OMRSAuditCode auditCode = OMRSAuditCode.OMRS_TOPIC_LISTENER_START; + auditLog.logRecord(listenerThreadName, + auditCode.getLogMessageId(), + auditCode.getSeverity(), + auditCode.getFormattedLogMessage(topicName), + this.getConnection().toString(), + auditCode.getSystemAction(), + auditCode.getUserAction()); + + while (keepRunning) + { + try + { + List<OMRSEventV1> receivedEvents = this.checkForEvents(); + + if (receivedEvents != null) + { + for (OMRSEventV1 event : receivedEvents) + { + if (event != null) + { + this.distributeEvent(event); + } + } + } + else + { + Thread.sleep(sleepTime); + } + } + catch (InterruptedException wakeUp) + { + + } + } + + auditCode = OMRSAuditCode.OMRS_TOPIC_LISTENER_SHUTDOWN; + auditLog.logRecord(listenerThreadName, + auditCode.getLogMessageId(), + auditCode.getSeverity(), + auditCode.getFormattedLogMessage(topicName), + this.getConnection().toString(), + auditCode.getSystemAction(), + auditCode.getUserAction()); + } + + + /** * Pass an event that has been received on the topic to each of the registered listeners. * * @param event - OMRSEvent to distribute @@ -76,7 +150,7 @@ public abstract class OMRSTopicConnector extends ConnectorBase implements OMRSTo } catch (Throwable error) { - final String actionDescription = "Initialize Repository Operational Services"; + final String actionDescription = "distributeEvent"; OMRSAuditCode auditCode = OMRSAuditCode.EVENT_PROCESSING_ERROR; auditLog.logRecord(actionDescription, @@ -92,6 +166,17 @@ public abstract class OMRSTopicConnector extends ConnectorBase implements OMRSTo /** + * Look to see if there is one of more new events to process. + * + * @return a list of received events or null + */ + protected List<OMRSEventV1> checkForEvents() + { + return null; + } + + + /** * Register a listener object. This object will be supplied with all of the events received on the topic. * * @param topicListener - object implementing the OMRSTopicListener interface @@ -113,6 +198,22 @@ public abstract class OMRSTopicConnector extends ConnectorBase implements OMRSTo public void start() throws ConnectorCheckedException { super.start(); + + keepRunning = true; + + if (super.connection != null) + { + Endpoint endpoint = super.connection.getEndpoint(); + + if (endpoint != null) + { + topicName = endpoint.getAddress(); + listenerThreadName = defaultThreadName + ": " + topicName; + } + } + + Thread listenerThread = new Thread(this, listenerThreadName); + listenerThread.start(); } @@ -124,5 +225,7 @@ public abstract class OMRSTopicConnector extends ConnectorBase implements OMRSTo public void disconnect() throws ConnectorCheckedException { super.disconnect(); + + keepRunning = false; } } http://git-wip-us.apache.org/repos/asf/atlas/blob/f57fd7f0/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicConnector.java ---------------------------------------------------------------------- diff --git a/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicConnector.java b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicConnector.java new file mode 100644 index 0000000..f969923 --- /dev/null +++ b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicConnector.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.omrs.topicconnectors.inmemory; + +import org.apache.atlas.ocf.ffdc.ConnectorCheckedException; +import org.apache.atlas.omrs.eventmanagement.events.v1.OMRSEventV1; +import org.apache.atlas.omrs.topicconnectors.OMRSTopicConnector; + +import java.util.ArrayList; +import java.util.List; + + +/** + * InMemoryOMRSTopicConnector provides a concrete implementation of the OMRSTopicConnector that + * uses an in-memory list as the event/messaging infrastructure. + */ +public class InMemoryOMRSTopicConnector extends OMRSTopicConnector +{ + private volatile List<OMRSEventV1> inMemoryOMRSTopic = new ArrayList<>(); + + public InMemoryOMRSTopicConnector() + { + super(); + } + + /** + * Supports putting events to the in memory OMRS Topic + * + * @param newEvent - event to publish + */ + private synchronized void putEvent(OMRSEventV1 newEvent) + { + inMemoryOMRSTopic.add(newEvent); + } + + + /** + * Returns all of the events found on the in memory OMRS Topic. + * + * @return list of received events. + */ + private synchronized List<OMRSEventV1> getEvents() + { + List<OMRSEventV1> receivedEvents = inMemoryOMRSTopic; + inMemoryOMRSTopic = new ArrayList<>(); + + return receivedEvents; + } + + + /** + * Sends the supplied event to the topic. + * + * @param event - OMRSEvent object containing the event properties. + */ + public void sendEvent(OMRSEventV1 event) + { + this.putEvent(event); + + } + + + /** + * Look to see if there is one of more new events to process. + * + * @return a list of received events or null + */ + protected List<OMRSEventV1> checkForEvents() + { + return this.getEvents(); + } + + + /** + * Indicates that the connector is completely configured and can begin processing. + * + * @throws ConnectorCheckedException - there is a problem within the connector. + */ + public void start() throws ConnectorCheckedException + { + super.start(); + } + + + /** + * Free up any resources held since the connector is no longer needed. + * + * @throws ConnectorCheckedException - there is a problem within the connector. + */ + public void disconnect() throws ConnectorCheckedException + { + super.disconnect(); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/f57fd7f0/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicProvider.java ---------------------------------------------------------------------- diff --git a/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicProvider.java b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicProvider.java new file mode 100644 index 0000000..e2d641a --- /dev/null +++ b/omrs/src/main/java/org/apache/atlas/omrs/topicconnectors/inmemory/InMemoryOMRSTopicProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.omrs.topicconnectors.inmemory; + +import org.apache.atlas.ocf.ConnectorProviderBase; +import org.apache.atlas.omrs.topicconnectors.kafka.KafkaOMRSTopicConnector; + + +/** + * InMemoryOMRSTopicProvider provides implementation of the connector provider for the InMemoryOMRSTopicConnector. + */ +public class InMemoryOMRSTopicProvider extends ConnectorProviderBase +{ + /** + * Constructor used to initialize the ConnectorProviderBase with the Java class name of the specific + * OMRS Connector implementation. + */ + public InMemoryOMRSTopicProvider() + { + Class connectorClass = InMemoryOMRSTopicConnector.class; + + super.setConnectorClassName(connectorClass.getName()); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/f57fd7f0/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a6d1268..9c9d746 100644 --- a/pom.xml +++ b/pom.xml @@ -738,6 +738,9 @@ <module>om-fwk-ocf</module> <module>omrs</module> <module>omag-api</module> + <module>omag-server</module> + <module>omas-connectedasset</module> + <module>omas-assetconsumer</module> <module>intg</module> <module>common</module> <module>server-api</module>
