http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index f241681..891d7ac 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -19,34 +19,50 @@ package org.apache.atlas.notification; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.inject.Inject; import com.google.inject.Singleton; import kafka.consumer.ConsumerTimeoutException; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.LocalAtlasClient; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; import org.apache.atlas.service.Service; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.web.filters.AuditFilter; +import org.apache.atlas.web.service.ServiceState; +import org.apache.atlas.web.util.DateTimeHelper; import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Inject; import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.atlas.notification.hook.HookNotification.*; + /** * Consumer of notifications from hooks e.g., hive hook etc. */ @Singleton public class NotificationHookConsumer implements Service, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); + private static final String LOCALHOST = "localhost"; private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED"); private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); @@ -57,7 +73,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_RETRY_INTERVAL="atlas.notification.consumer.retry.interval"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; - private final LocalAtlasClient atlasClient; + private final AtlasEntityStore atlasEntityStore; + private final ServiceState serviceState; + private final AtlasInstanceConverter instanceConverter; + private final AtlasTypeRegistry typeRegistry; private final int maxRetries; private final int failedMsgCacheSize; private final int consumerRetryInterval; @@ -68,10 +87,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private List<HookConsumer> consumers; @Inject - public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient) - throws AtlasException { + public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore, + ServiceState serviceState, AtlasInstanceConverter instanceConverter, + AtlasTypeRegistry typeRegistry) throws AtlasException { this.notificationInterface = notificationInterface; - this.atlasClient = atlasClient; + this.atlasEntityStore = atlasEntityStore; + this.serviceState = serviceState; + this.instanceConverter = instanceConverter; + this.typeRegistry = typeRegistry; + this.applicationProperties = ApplicationProperties.get(); maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); @@ -208,48 +232,78 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } @VisibleForTesting - void handleMessage(HookNotification.HookNotificationMessage message) throws - AtlasServiceException, AtlasException { + void handleMessage(HookNotificationMessage message) throws AtlasServiceException, AtlasException { + String messageUser = message.getUser(); + // Used for intermediate conversions during create and update + AtlasEntity.AtlasEntitiesWithExtInfo entities; for (int numRetries = 0; numRetries < maxRetries; numRetries++) { - LOG.debug("Running attempt {}", numRetries); + if (LOG.isDebugEnabled()) { + LOG.debug("Running attempt {}", numRetries); + } try { - atlasClient.setUser(message.getUser()); switch (message.getType()) { - case ENTITY_CREATE: - HookNotification.EntityCreateRequest createRequest = - (HookNotification.EntityCreateRequest) message; - atlasClient.createEntity(createRequest.getEntities()); - break; - - case ENTITY_PARTIAL_UPDATE: - HookNotification.EntityPartialUpdateRequest partialUpdateRequest = - (HookNotification.EntityPartialUpdateRequest) message; - atlasClient.updateEntity(partialUpdateRequest.getTypeName(), - partialUpdateRequest.getAttribute(), - partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity()); - break; - - case ENTITY_DELETE: - HookNotification.EntityDeleteRequest deleteRequest = - (HookNotification.EntityDeleteRequest) message; - atlasClient.deleteEntity(deleteRequest.getTypeName(), - deleteRequest.getAttribute(), - deleteRequest.getAttributeValue()); - break; - - case ENTITY_FULL_UPDATE: - HookNotification.EntityUpdateRequest updateRequest = - (HookNotification.EntityUpdateRequest) message; - atlasClient.updateEntities(updateRequest.getEntities()); - break; - - default: - throw new IllegalStateException("Unhandled exception!"); + case ENTITY_CREATE: + if (LOG.isDebugEnabled()) { + LOG.debug("EntityCreate via hook"); + } + EntityCreateRequest createRequest = (EntityCreateRequest) message; + audit(messageUser, AtlasClient.API.CREATE_ENTITY); + + entities = instanceConverter.getEntities(createRequest.getEntities()); + + atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); + break; + + case ENTITY_PARTIAL_UPDATE: + if (LOG.isDebugEnabled()) { + LOG.debug("EntityPartialUpdate via hook"); + } + final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message; + audit(messageUser, AtlasClient.API.UPDATE_ENTITY_PARTIAL); + + Referenceable referenceable = partialUpdateRequest.getEntity(); + entities = instanceConverter.getEntities(Collections.singletonList(referenceable)); + // There should only be one root entity after the conversion + AtlasEntity entity = entities.getEntities().get(0); + // Need to set the attributes explicitly here as the qualified name might have changed during update + entity.setAttribute(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); + atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true); + break; + + case ENTITY_DELETE: + if (LOG.isDebugEnabled()) { + LOG.debug("EntityDelete via hook"); + } + final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message; + audit(messageUser, AtlasClient.API.DELETE_ENTITY); + + try { + AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName()); + atlasEntityStore.deleteByUniqueAttributes(type, + new HashMap<String, Object>() {{ put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()); }}); + } catch (ClassCastException cle) { + LOG.error("Failed to do a partial update on Entity"); + } + break; + + case ENTITY_FULL_UPDATE: + if (LOG.isDebugEnabled()) { + LOG.debug("EntityFullUpdate via hook"); + } + EntityUpdateRequest updateRequest = (EntityUpdateRequest) message; + audit(messageUser, AtlasClient.API.UPDATE_ENTITY); + + entities = instanceConverter.getEntities(updateRequest.getEntities()); + atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); + break; + + default: + throw new IllegalStateException("Unhandled exception!"); } break; } catch (Throwable e) { - LOG.warn("Error handling message{}", e.getMessage()); + LOG.warn("Error handling message: {}", e.getMessage()); try{ LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); Thread.sleep(consumerRetryInterval); @@ -272,7 +326,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private void recordFailedMessages() { //logging failed messages - for (HookNotification.HookNotificationMessage message : failedMessages) { + for (HookNotificationMessage message : failedMessages) { FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message)); } failedMessages.clear(); @@ -285,7 +339,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl boolean serverAvailable(Timer timer) { try { - while (!atlasClient.isServerReady()) { + while (serviceState.getState() != ServiceState.ServiceStateValue.ACTIVE) { try { LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", SERVER_READY_WAIT_TIME_MS); @@ -311,4 +365,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl consumer.close(); } } + + private void audit(String messageUser, AtlasClient.API api) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> audit({},{})", messageUser, api); + } + + AuditFilter.audit(messageUser, THREADNAME_PREFIX, api.getMethod(), LOCALHOST, api.getPath(), LOCALHOST, + DateTimeHelper.formatDateUTC(new Date())); + } }
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasAbstractFormatConverter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasAbstractFormatConverter.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasAbstractFormatConverter.java deleted file mode 100644 index f1f3d18..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasAbstractFormatConverter.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.adapters; - - -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.type.AtlasTypeRegistry; - - -public abstract class AtlasAbstractFormatConverter implements AtlasFormatConverter { - protected final AtlasFormatConverters converterRegistry; - protected final AtlasTypeRegistry typeRegistry; - protected final TypeCategory typeCategory; - - protected AtlasAbstractFormatConverter(AtlasFormatConverters converterRegistry, AtlasTypeRegistry typeRegistry, TypeCategory typeCategory) { - this.converterRegistry = converterRegistry; - this.typeRegistry = typeRegistry; - this.typeCategory = typeCategory; - } - - @Override - public TypeCategory getTypeCategory() { - return typeCategory; - } -} - http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasArrayFormatConverter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasArrayFormatConverter.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasArrayFormatConverter.java deleted file mode 100644 index aa14aff..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasArrayFormatConverter.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.adapters; - - -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.type.AtlasArrayType; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; - -public class AtlasArrayFormatConverter extends AtlasAbstractFormatConverter { - - public AtlasArrayFormatConverter(AtlasFormatConverters registry, AtlasTypeRegistry typeRegistry) { - super(registry, typeRegistry, TypeCategory.ARRAY); - } - - @Override - public Collection fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext ctx) throws AtlasBaseException { - Collection ret = null; - - if (v1Obj != null) { - if (v1Obj instanceof List) { - ret = new ArrayList(); - } else if (v1Obj instanceof Set) { - ret = new LinkedHashSet(); - } else { - throw new AtlasBaseException(AtlasErrorCode.UNEXPECTED_TYPE, "List or Set", - v1Obj.getClass().getCanonicalName()); - } - - AtlasArrayType arrType = (AtlasArrayType) type; - AtlasType elemType = arrType.getElementType(); - AtlasFormatConverter elemConverter = converterRegistry.getConverter(elemType.getTypeCategory()); - Collection v1List = (Collection) v1Obj; - - for (Object v1Elem : v1List) { - Object convertedVal = elemConverter.fromV1ToV2(v1Elem, elemType, ctx); - - ret.add(convertedVal); - } - } - - return ret; - } - - @Override - public Collection fromV2ToV1(Object v2Obj, AtlasType type, ConverterContext ctx) throws AtlasBaseException { - Collection ret = null; - - if (v2Obj != null) { - if (v2Obj instanceof List) { - ret = new ArrayList(); - } else if (v2Obj instanceof Set) { - ret = new LinkedHashSet(); - } else { - throw new AtlasBaseException(AtlasErrorCode.UNEXPECTED_TYPE, "List or Set", - v2Obj.getClass().getCanonicalName()); - } - - AtlasArrayType arrType = (AtlasArrayType) type; - AtlasType elemType = arrType.getElementType(); - AtlasFormatConverter elemConverter = converterRegistry.getConverter(elemType.getTypeCategory()); - Collection v2List = (Collection) v2Obj; - - for (Object v2Elem : v2List) { - Object convertedVal = elemConverter.fromV2ToV1(v2Elem, elemType, ctx); - - ret.add(convertedVal); - } - } - - return ret; - } -} - http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasClassificationFormatConverter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasClassificationFormatConverter.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasClassificationFormatConverter.java deleted file mode 100644 index dc740f5..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasClassificationFormatConverter.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.adapters; - -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.AtlasException; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.model.instance.AtlasClassification; -import org.apache.atlas.type.AtlasClassificationType; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.typesystem.IStruct; -import org.apache.commons.collections.MapUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public class AtlasClassificationFormatConverter extends AtlasStructFormatConverter { - private static final Logger LOG = LoggerFactory.getLogger(AtlasClassificationFormatConverter.class); - - public AtlasClassificationFormatConverter(AtlasFormatConverters registry, AtlasTypeRegistry typeRegistry) { - super(registry, typeRegistry, TypeCategory.CLASSIFICATION); - } - - @Override - public AtlasClassification fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext ctx) throws AtlasBaseException { - AtlasClassification ret = null; - - if (v1Obj != null) { - AtlasClassificationType classificationType = (AtlasClassificationType)type; - - if (v1Obj instanceof Map) { - final Map v1Map = (Map) v1Obj; - final Map v1Attribs = (Map) v1Map.get(ATTRIBUTES_PROPERTY_KEY); - - if (MapUtils.isNotEmpty(v1Attribs)) { - ret = new AtlasClassification(type.getTypeName(), fromV1ToV2(classificationType, v1Attribs, ctx)); - } else { - ret = new AtlasClassification(type.getTypeName()); - } - } else if (v1Obj instanceof IStruct) { - IStruct struct = (IStruct) v1Obj; - Map<String, Object> v1Attribs = null; - - try { - v1Attribs = struct.getValuesMap(); - } catch (AtlasException excp) { - LOG.error("IStruct.getValuesMap() failed", excp); - } - - ret = new AtlasClassification(type.getTypeName(), fromV1ToV2(classificationType, v1Attribs, ctx)); - } else { - throw new AtlasBaseException(AtlasErrorCode.UNEXPECTED_TYPE, "Map or IStruct", - v1Obj.getClass().getCanonicalName()); - } - } - - return ret; - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEntityFormatConverter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEntityFormatConverter.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEntityFormatConverter.java deleted file mode 100644 index cb1390d..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEntityFormatConverter.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.adapters; - -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.AtlasException; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.model.instance.AtlasClassification; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntity.Status; -import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.type.AtlasEntityType; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.persistence.Id; -import org.apache.atlas.typesystem.persistence.Id.EntityState; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class AtlasEntityFormatConverter extends AtlasStructFormatConverter { - private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityFormatConverter.class); - - public AtlasEntityFormatConverter(AtlasFormatConverters registry, AtlasTypeRegistry typeRegistry) { - super(registry, typeRegistry, TypeCategory.ENTITY); - } - - @Override - public Object fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext context) throws AtlasBaseException { - AtlasObjectId ret = null; - - if (v1Obj != null) { - AtlasEntityType entityType = (AtlasEntityType) type; - - if (v1Obj instanceof Id) { - Id id = (Id) v1Obj; - - ret = new AtlasObjectId(id._getId(), id.getTypeName()); - } else if (v1Obj instanceof IReferenceableInstance) { - IReferenceableInstance entRef = (IReferenceableInstance) v1Obj; - - ret = new AtlasObjectId(entRef.getId()._getId(), entRef.getTypeName()); - - if (!context.entityExists(ret.getGuid())) { - Map<String, Object> v1Attribs = null; - - try { - v1Attribs = entRef.getValuesMap(); - } catch (AtlasException excp) { - LOG.error("IReferenceableInstance.getValuesMap() failed", excp); - } - - AtlasEntity entity = new AtlasEntity(entRef.getTypeName(), - super.fromV1ToV2(entityType, v1Attribs, context)); - entity.setGuid(entRef.getId()._getId()); - entity.setStatus(convertState(entRef.getId().getState())); - entity.setCreatedBy(entRef.getSystemAttributes().createdBy); - entity.setCreateTime(entRef.getSystemAttributes().createdTime); - entity.setUpdatedBy(entRef.getSystemAttributes().modifiedBy); - entity.setUpdateTime(entRef.getSystemAttributes().modifiedTime); - entity.setVersion(new Long(entRef.getId().version)); - - if (CollectionUtils.isNotEmpty(entRef.getTraits())) { - List<AtlasClassification> classifications = new ArrayList<>(); - AtlasFormatConverter traitConverter = converterRegistry.getConverter(TypeCategory.CLASSIFICATION); - - for (String traitName : entRef.getTraits()) { - IStruct trait = entRef.getTrait(traitName); - AtlasType classifiType = typeRegistry.getType(traitName); - AtlasClassification classification = (AtlasClassification) traitConverter.fromV1ToV2(trait, classifiType, context); - - classifications.add(classification); - } - - entity.setClassifications(classifications); - } - - context.addEntity(entity); - } - } else { - throw new AtlasBaseException(AtlasErrorCode.UNEXPECTED_TYPE, "IReferenceableInstance", - v1Obj.getClass().getCanonicalName()); - } - } - return ret; - } - - private AtlasEntity.Status convertState(EntityState state){ - Status status = Status.ACTIVE; - if(state != null && state.equals(EntityState.DELETED)){ - status = Status.DELETED; - } - LOG.debug("Setting state to {}", state); - return status; - } - - @Override - public Object fromV2ToV1(Object v2Obj, AtlasType type, ConverterContext context) throws AtlasBaseException { - Object ret = null; - - if (v2Obj != null) { - AtlasEntityType entityType = (AtlasEntityType) type; - - if (v2Obj instanceof Map) { - Map v2Map = (Map) v2Obj; - String idStr = (String)v2Map.get(AtlasObjectId.KEY_GUID); - String typeName = type.getTypeName(); - - if (StringUtils.isEmpty(idStr)) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND); - } - - final Map v2Attribs = (Map) v2Map.get(ATTRIBUTES_PROPERTY_KEY); - - if (MapUtils.isEmpty(v2Attribs)) { - ret = new Id(idStr, 0, typeName); - } else { - ret = new Referenceable(idStr, typeName, super.fromV2ToV1(entityType, v2Attribs, context)); - } - } else if (v2Obj instanceof AtlasEntity) { - AtlasEntity entity = (AtlasEntity) v2Obj; - - ret = new Referenceable(entity.getGuid(), entity.getTypeName(), - fromV2ToV1(entityType, entity.getAttributes(), context)); - - } else if (v2Obj instanceof AtlasObjectId) { // transient-id - AtlasEntity entity = context.getById(((AtlasObjectId) v2Obj).getGuid()); - if ( entity == null) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Could not find entity ", - v2Obj.toString()); - } - ret = this.fromV2ToV1(entity, typeRegistry.getType(((AtlasObjectId) v2Obj).getTypeName()), context); - } else { - throw new AtlasBaseException(AtlasErrorCode.UNEXPECTED_TYPE, "Map or AtlasEntity or String", - v2Obj.getClass().getCanonicalName()); - } - } - - return ret; - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEnumFormatConverter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEnumFormatConverter.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEnumFormatConverter.java deleted file mode 100644 index 6d8e3ae..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEnumFormatConverter.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.adapters; - - -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; - - -public class AtlasEnumFormatConverter extends AtlasAbstractFormatConverter { - public AtlasEnumFormatConverter(AtlasFormatConverters registry, AtlasTypeRegistry typeRegistry) { - super(registry, typeRegistry, TypeCategory.ENUM); - } - - @Override - public Object fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext ctx) throws AtlasBaseException { - return type.getNormalizedValue(v1Obj); - } - - @Override - public Object fromV2ToV1(Object v2Obj, AtlasType type, ConverterContext ctx) throws AtlasBaseException { - return type.getNormalizedValue(v2Obj); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverter.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverter.java deleted file mode 100644 index a7157a3..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverter.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.adapters; - - -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.type.AtlasType; - -import java.util.HashMap; -import java.util.Map; - -public interface AtlasFormatConverter { - Object fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext context) throws AtlasBaseException; - - Object fromV2ToV1(Object v2Obj, AtlasType type, ConverterContext context) throws AtlasBaseException; - - TypeCategory getTypeCategory(); - - public static class ConverterContext { - - private Map<String, AtlasEntity> entities = null; - - public void addEntity(AtlasEntity entity) { - if (entities == null) { - entities = new HashMap<>(); - } - entities.put(entity.getGuid(), entity); - } - - public AtlasEntity getById(String guid) { - if( entities != null) { - return entities.get(guid); - } - - return null; - } - - public boolean entityExists(String guid) { return entities != null && entities.containsKey(guid); } - - public Map<String, AtlasEntity> getEntities() { - return entities; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverters.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverters.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverters.java deleted file mode 100644 index 968d74f..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverters.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.adapters; - -import com.google.inject.Inject; -import com.google.inject.Singleton; -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.type.AtlasTypeRegistry; - -import java.util.HashMap; -import java.util.Map; - -@Singleton -public class AtlasFormatConverters { - - private final Map<TypeCategory, AtlasFormatConverter> registry = new HashMap<>(); - - @Inject - public AtlasFormatConverters(AtlasTypeRegistry typeRegistry) { - registerConverter(new AtlasPrimitiveFormatConverter(this, typeRegistry)); - registerConverter(new AtlasEnumFormatConverter(this, typeRegistry)); - registerConverter(new AtlasStructFormatConverter(this, typeRegistry)); - registerConverter(new AtlasClassificationFormatConverter(this, typeRegistry)); - registerConverter(new AtlasEntityFormatConverter(this, typeRegistry)); - registerConverter(new AtlasArrayFormatConverter(this, typeRegistry)); - registerConverter(new AtlasMapFormatConverter(this, typeRegistry)); - } - - private void registerConverter(AtlasFormatConverter converter) { - registry.put(converter.getTypeCategory(), converter); - - if (converter.getTypeCategory() == TypeCategory.ENTITY) { - registry.put(TypeCategory.OBJECT_ID_TYPE, converter); - } - } - - public AtlasFormatConverter getConverter(TypeCategory typeCategory) throws AtlasBaseException { - AtlasFormatConverter ret = registry.get(typeCategory); - - if (ret == null) { - throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, - "Could not find the converter for this type " + typeCategory); - } - - return ret; - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasInstanceRestAdapters.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasInstanceRestAdapters.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasInstanceRestAdapters.java deleted file mode 100644 index b1dae56..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasInstanceRestAdapters.java +++ /dev/null @@ -1,189 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.adapters; - -import org.apache.atlas.AtlasClient; -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.AtlasException; -import org.apache.atlas.CreateUpdateEntitiesResult; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.model.instance.AtlasClassification; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntityHeader; -import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.model.instance.EntityMutations; -import org.apache.atlas.model.instance.GuidMapping; -import org.apache.atlas.services.MetadataService; -import org.apache.atlas.type.AtlasClassificationType; -import org.apache.atlas.type.AtlasEntityType; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.ITypedReferenceableInstance; -import org.apache.atlas.typesystem.ITypedStruct; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.Struct; -import org.apache.atlas.typesystem.exception.EntityNotFoundException; -import org.apache.atlas.typesystem.exception.TraitNotFoundException; -import org.apache.atlas.typesystem.exception.TypeNotFoundException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.Iterator; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import java.util.Map; - -@Singleton -public class AtlasInstanceRestAdapters { - - private static final Logger LOG = LoggerFactory.getLogger(AtlasInstanceRestAdapters.class); - - @Inject - private AtlasTypeRegistry typeRegistry; - - @Inject - private AtlasFormatConverters instanceFormatters; - - @Inject - private MetadataService metadataService; - - public ITypedReferenceableInstance[] getITypedReferenceables(Collection<AtlasEntity> entities) throws AtlasBaseException { - ITypedReferenceableInstance[] entitiesInOldFormat = new ITypedReferenceableInstance[entities.size()]; - - AtlasFormatConverter.ConverterContext ctx = new AtlasFormatConverter.ConverterContext(); - for(Iterator<AtlasEntity> i = entities.iterator(); i.hasNext(); ) { - ctx.addEntity(i.next()); - } - - Iterator<AtlasEntity> entityIterator = entities.iterator(); - for (int i = 0; i < entities.size(); i++) { - ITypedReferenceableInstance typedInstance = getITypedReferenceable(entityIterator.next(), ctx); - entitiesInOldFormat[i] = typedInstance; - } - return entitiesInOldFormat; - } - - public ITypedReferenceableInstance getITypedReferenceable(AtlasEntity entity, AtlasFormatConverter.ConverterContext ctx) throws AtlasBaseException { - Referenceable ref = getReferenceable(entity, ctx); - - try { - return metadataService.getTypedReferenceableInstance(ref); - } catch (AtlasException e) { - LOG.error("Exception while getting a typed reference for the entity ", e); - throw toAtlasBaseException(e); - } - } - - public Referenceable getReferenceable(AtlasEntity entity, final AtlasFormatConverter.ConverterContext ctx) throws AtlasBaseException { - AtlasFormatConverter converter = instanceFormatters.getConverter(TypeCategory.ENTITY); - AtlasType entityType = typeRegistry.getType(entity.getTypeName()); - Referenceable ref = (Referenceable)converter.fromV2ToV1(entity, entityType, ctx); - - return ref; - } - - public ITypedStruct getTrait(AtlasClassification classification) throws AtlasBaseException { - AtlasFormatConverter converter = instanceFormatters.getConverter(TypeCategory.CLASSIFICATION); - AtlasType classificationType = typeRegistry.getType(classification.getTypeName()); - Struct trait = (Struct)converter.fromV2ToV1(classification, classificationType, new AtlasFormatConverter.ConverterContext()); - - try { - return metadataService.createTraitInstance(trait); - } catch (AtlasException e) { - LOG.error("Exception while getting a typed reference for the entity ", e); - throw toAtlasBaseException(e); - } - } - - public AtlasClassification getClassification(IStruct classification) throws AtlasBaseException { - AtlasFormatConverter converter = instanceFormatters.getConverter(TypeCategory.CLASSIFICATION); - AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName()); - if (classificationType == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.CLASSIFICATION.name(), classification.getTypeName()); - } - AtlasClassification ret = (AtlasClassification)converter.fromV1ToV2(classification, classificationType, new AtlasFormatConverter.ConverterContext()); - - return ret; - } - - public Map<String, AtlasEntity> getAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException { - - AtlasFormatConverter converter = instanceFormatters.getConverter(TypeCategory.ENTITY); - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(referenceable.getTypeName()); - - if (entityType == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), referenceable.getTypeName()); - } - - AtlasFormatConverter.ConverterContext ctx = new AtlasFormatConverter.ConverterContext(); - - converter.fromV1ToV2(referenceable, entityType, ctx); - - return ctx.getEntities(); - } - - public static EntityMutationResponse toEntityMutationResponse(AtlasClient.EntityResult entityResult) { - - CreateUpdateEntitiesResult result = new CreateUpdateEntitiesResult(); - result.setEntityResult(entityResult); - return toEntityMutationResponse(result); - } - - public static EntityMutationResponse toEntityMutationResponse(CreateUpdateEntitiesResult result) { - EntityMutationResponse response = new EntityMutationResponse(); - for (String guid : result.getCreatedEntities()) { - AtlasEntityHeader header = new AtlasEntityHeader(); - header.setGuid(guid); - response.addEntity(EntityMutations.EntityOperation.CREATE, header); - } - - for (String guid : result.getUpdatedEntities()) { - AtlasEntityHeader header = new AtlasEntityHeader(); - header.setGuid(guid); - response.addEntity(EntityMutations.EntityOperation.UPDATE, header); - } - - for (String guid : result.getDeletedEntities()) { - AtlasEntityHeader header = new AtlasEntityHeader(); - header.setGuid(guid); - response.addEntity(EntityMutations.EntityOperation.DELETE, header); - } - GuidMapping guidMapping = result.getGuidMapping(); - if(guidMapping != null) { - response.setGuidAssignments(guidMapping.getGuidAssignments()); - } - return response; - } - - public static AtlasBaseException toAtlasBaseException(AtlasException e) { - if ( e instanceof EntityNotFoundException || e instanceof TraitNotFoundException) { - return new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, e); - } - - if ( e instanceof TypeNotFoundException) { - return new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, e); - } - - return new AtlasBaseException(e); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasMapFormatConverter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasMapFormatConverter.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasMapFormatConverter.java deleted file mode 100644 index 6967c4f..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasMapFormatConverter.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.adapters; - - -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.type.AtlasMapType; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; - -import java.util.HashMap; -import java.util.Map; - -public class AtlasMapFormatConverter extends AtlasAbstractFormatConverter { - - public AtlasMapFormatConverter(AtlasFormatConverters registry, AtlasTypeRegistry typeRegistry) { - super(registry, typeRegistry, TypeCategory.MAP); - } - - @Override - public Map fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext ctx) throws AtlasBaseException { - Map ret = null; - - if (v1Obj != null) { - if (v1Obj instanceof Map) { - AtlasMapType mapType = (AtlasMapType)type; - AtlasType keyType = mapType.getKeyType(); - AtlasType valueType = mapType.getValueType(); - AtlasFormatConverter keyConverter = converterRegistry.getConverter(keyType.getTypeCategory()); - AtlasFormatConverter valueConverter = converterRegistry.getConverter(valueType.getTypeCategory()); - Map v1Map = (Map)v1Obj; - - ret = new HashMap<>(); - - for (Object key : v1Map.keySet()) { - Object value = v1Map.get(key); - - Object v2Key = keyConverter.fromV1ToV2(key, keyType, ctx); - Object v2Value = valueConverter.fromV1ToV2(value, valueType, ctx); - - ret.put(v2Key, v2Value); - } - } else { - throw new AtlasBaseException(AtlasErrorCode.UNEXPECTED_TYPE, "Map", v1Obj.getClass().getCanonicalName()); - } - - } - - return ret; - } - - @Override - public Map fromV2ToV1(Object v2Obj, AtlasType type, ConverterContext ctx) throws AtlasBaseException { - Map ret = null; - - if (v2Obj != null) { - if (v2Obj instanceof Map) { - AtlasMapType mapType = (AtlasMapType)type; - AtlasType keyType = mapType.getKeyType(); - AtlasType valueType = mapType.getValueType(); - AtlasFormatConverter keyConverter = converterRegistry.getConverter(keyType.getTypeCategory()); - AtlasFormatConverter valueConverter = converterRegistry.getConverter(valueType.getTypeCategory()); - Map v1Map = (Map)v2Obj; - - ret = new HashMap<>(); - - for (Object key : v1Map.keySet()) { - Object value = v1Map.get(key); - - Object v2Key = keyConverter.fromV2ToV1(key, keyType, ctx); - Object v2Value = valueConverter.fromV2ToV1(value, valueType, ctx); - - ret.put(v2Key, v2Value); - } - } else { - throw new AtlasBaseException(AtlasErrorCode.UNEXPECTED_TYPE, "Map", v2Obj.getClass().getCanonicalName()); - } - } - - return ret; - } -} - http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasObjectIdConverter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasObjectIdConverter.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasObjectIdConverter.java deleted file mode 100644 index 11a020d..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasObjectIdConverter.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.adapters; - - -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.persistence.Id; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; - -import java.util.Map; - -public class -AtlasObjectIdConverter extends AtlasAbstractFormatConverter { - - public AtlasObjectIdConverter(AtlasFormatConverters registry, AtlasTypeRegistry typeRegistry) { - this(registry, typeRegistry, TypeCategory.OBJECT_ID_TYPE); - } - - protected AtlasObjectIdConverter(AtlasFormatConverters registry, AtlasTypeRegistry typeRegistry, TypeCategory typeCategory) { - super(registry, typeRegistry, typeCategory); - } - - @Override - public Object fromV1ToV2(Object v1Obj, AtlasType type, AtlasFormatConverter.ConverterContext converterContext) throws AtlasBaseException { - Object ret = null; - - if (v1Obj != null) { - if (v1Obj instanceof Id) { - Id id = (Id) v1Obj; - ret = new AtlasObjectId(id._getId(), id.getTypeName()); - } else if (v1Obj instanceof IReferenceableInstance) { - IReferenceableInstance entity = (IReferenceableInstance) v1Obj; - ret = new AtlasObjectId(entity.getId()._getId(), entity.getTypeName()); - } - } - return ret; - } - - @Override - public Object fromV2ToV1(Object v2Obj, AtlasType type, ConverterContext converterContext) throws AtlasBaseException { - Id ret = null; - - if (v2Obj != null) { - - if (v2Obj instanceof Map) { - Map v2Map = (Map) v2Obj; - String idStr = (String)v2Map.get(AtlasObjectId.KEY_GUID); - String typeName = type.getTypeName(); - - if (StringUtils.isEmpty(idStr)) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND); - } - - ret = new Id(idStr, 0, typeName); - } else if (v2Obj instanceof AtlasObjectId) { // transient-id - ret = new Id(((AtlasObjectId) v2Obj).getGuid(), 0, type.getTypeName()); - } else if (v2Obj instanceof AtlasEntity) { - AtlasEntity entity = (AtlasEntity) v2Obj; - ret = new Id(((AtlasObjectId) v2Obj).getGuid(), 0, type.getTypeName()); - } else { - throw new AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, type.getTypeCategory().name()); - } - } - return ret; - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasPrimitiveFormatConverter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasPrimitiveFormatConverter.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasPrimitiveFormatConverter.java deleted file mode 100644 index 2b70c5e..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasPrimitiveFormatConverter.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.adapters; - - -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; - - -public class AtlasPrimitiveFormatConverter extends AtlasAbstractFormatConverter { - public AtlasPrimitiveFormatConverter(AtlasFormatConverters registry, AtlasTypeRegistry typeRegistry) { - super(registry, typeRegistry, TypeCategory.PRIMITIVE); - } - - @Override - public Object fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext ctx) throws AtlasBaseException { - return type.getNormalizedValue(v1Obj); - } - - @Override - public Object fromV2ToV1(Object v2Obj, AtlasType type, ConverterContext ctx) throws AtlasBaseException { - return type.getNormalizedValue(v2Obj); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasStructFormatConverter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasStructFormatConverter.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasStructFormatConverter.java deleted file mode 100644 index 920b48b..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasStructFormatConverter.java +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.adapters; - -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.AtlasException; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.model.instance.AtlasStruct; -import org.apache.atlas.model.typedef.AtlasStructDef; -import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; -import org.apache.atlas.type.*; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.Struct; -import org.apache.commons.collections.MapUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { - private static final Logger LOG = LoggerFactory.getLogger(AtlasStructFormatConverter.class); - - public static final String ATTRIBUTES_PROPERTY_KEY = "attributes"; - - public AtlasStructFormatConverter(AtlasFormatConverters registry, AtlasTypeRegistry typeRegistry) { - this(registry, typeRegistry, TypeCategory.STRUCT); - } - - protected AtlasStructFormatConverter(AtlasFormatConverters registry, AtlasTypeRegistry typeRegistry, TypeCategory typeCategory) { - super(registry, typeRegistry, typeCategory); - } - - @Override - public Object fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext converterContext) throws AtlasBaseException { - AtlasStruct ret = null; - - if (v1Obj != null) { - AtlasStructType structType = (AtlasStructType)type; - - if (v1Obj instanceof Map) { - final Map v1Map = (Map) v1Obj; - final Map v1Attribs = (Map) v1Map.get(ATTRIBUTES_PROPERTY_KEY); - - if (MapUtils.isNotEmpty(v1Attribs)) { - ret = new AtlasStruct(type.getTypeName(), fromV1ToV2(structType, v1Attribs, converterContext)); - } else { - ret = new AtlasStruct(type.getTypeName()); - } - } else if (v1Obj instanceof IStruct) { - IStruct struct = (IStruct) v1Obj; - Map<String, Object> v1Attribs = null; - - try { - v1Attribs = struct.getValuesMap(); - } catch (AtlasException excp) { - LOG.error("IStruct.getValuesMap() failed", excp); - } - - ret = new AtlasStruct(type.getTypeName(), fromV1ToV2(structType, v1Attribs, converterContext)); - } else { - throw new AtlasBaseException(AtlasErrorCode.UNEXPECTED_TYPE, "Map or IStruct", v1Obj.getClass().getCanonicalName()); - } - } - - return ret; - } - - @Override - public Object fromV2ToV1(Object v2Obj, AtlasType type, ConverterContext converterContext) throws AtlasBaseException { - Struct ret = null; - - if (v2Obj != null) { - AtlasStructType structType = (AtlasStructType)type; - - if (v2Obj instanceof Map) { - final Map v2Map = (Map) v2Obj; - final Map v2Attribs; - - if (v2Map.containsKey(ATTRIBUTES_PROPERTY_KEY)) { - v2Attribs = (Map) v2Map.get(ATTRIBUTES_PROPERTY_KEY); - } else { - v2Attribs = v2Map; - } - - if (MapUtils.isNotEmpty(v2Attribs)) { - ret = new Struct(type.getTypeName(), fromV2ToV1(structType, v2Attribs, converterContext)); - } else { - ret = new Struct(type.getTypeName()); - } - } else if (v2Obj instanceof AtlasStruct) { - AtlasStruct struct = (AtlasStruct) v2Obj; - - ret = new Struct(type.getTypeName(), fromV2ToV1(structType, struct.getAttributes(), converterContext)); - } else { - throw new AtlasBaseException(AtlasErrorCode.UNEXPECTED_TYPE, "Map or AtlasStruct", v2Obj.getClass().getCanonicalName()); - } - } - - return ret; - } - - protected Map<String, Object> fromV2ToV1(AtlasStructType structType, Map attributes, ConverterContext context) throws AtlasBaseException { - Map<String, Object> ret = null; - - if (MapUtils.isNotEmpty(attributes)) { - ret = new HashMap<>(); - - for (AtlasStructType.AtlasAttribute attr : structType.getAllAttributes().values()) { - AtlasType attrType = attr.getAttributeType(); - - if (attrType == null) { - LOG.warn("ignored attribute {}.{}: failed to find AtlasType", structType.getTypeName(), attr.getName()); - continue; - } - - Object v2Value = attributes.get(attr.getName()); - Object v1Value = null; - - AtlasFormatConverter attrConverter = null; - if (attrType.getTypeCategory() == TypeCategory.OBJECT_ID_TYPE && !attr.isOwnedRef()) { - attrConverter = new AtlasObjectIdConverter(converterRegistry, typeRegistry); - v1Value = attrConverter.fromV2ToV1(v2Value, attrType, context); - } else { - attrConverter = converterRegistry.getConverter(attrType.getTypeCategory()); - v1Value = attrConverter.fromV2ToV1(v2Value, attrType, context); - } - ret.put(attr.getName(), v1Value); - } - } - - return ret; - } - - protected Map<String, Object> fromV1ToV2(AtlasStructType structType, Map attributes, ConverterContext context) throws AtlasBaseException { - Map<String, Object> ret = null; - - if (MapUtils.isNotEmpty(attributes)) { - ret = new HashMap<>(); - - for (AtlasStructType.AtlasAttribute attr : structType.getAllAttributes().values()) { - AtlasType attrType = attr.getAttributeType(); - - if (attrType == null) { - LOG.warn("ignored attribute {}.{}: failed to find AtlasType", structType.getTypeName(), attr.getName()); - continue; - } - - AtlasFormatConverter attrConverter = converterRegistry.getConverter(attrType.getTypeCategory()); - Object v1Value = attributes.get(attr.getName()); - Object v2Value = attrConverter.fromV1ToV2(v1Value, attrType, context); - - ret.put(attr.getAttributeDef().getName(), v2Value); - } - } - - return ret; - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index 2c2c16d..c8c0099 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -60,7 +60,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.*; -import static org.apache.atlas.web.adapters.AtlasInstanceRestAdapters.toAtlasBaseException; +import static org.apache.atlas.repository.converters.AtlasInstanceConverter.toAtlasBaseException; /** * Jersey Resource for admin operations. http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java index 8518e12..2f7ba20 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java @@ -23,11 +23,12 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.ClassificationAssociateRequest; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; import org.apache.atlas.repository.store.graph.v1.EntityStream; @@ -37,7 +38,6 @@ import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.ITypedStruct; -import org.apache.atlas.web.adapters.AtlasInstanceRestAdapters; import org.apache.atlas.web.util.Servlets; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; @@ -46,7 +46,15 @@ import org.apache.commons.lang3.StringUtils; import javax.inject.Inject; import javax.inject.Singleton; import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.*; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import java.util.ArrayList; @@ -54,7 +62,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.atlas.web.adapters.AtlasInstanceRestAdapters.toAtlasBaseException; +import static org.apache.atlas.repository.converters.AtlasInstanceConverter.toAtlasBaseException; /** * REST for a single entity @@ -66,14 +74,15 @@ public class EntityREST { public static final String PREFIX_ATTR = "attr:"; private final AtlasTypeRegistry typeRegistry; - private final AtlasInstanceRestAdapters restAdapters; + private final AtlasInstanceConverter restAdapters; private final MetadataService metadataService; private final AtlasEntityStore entitiesStore; @Inject - public EntityREST(AtlasTypeRegistry typeRegistry, AtlasInstanceRestAdapters restAdapters, MetadataService metadataService, AtlasEntityStore entitiesStore) { + public EntityREST(AtlasTypeRegistry typeRegistry, AtlasInstanceConverter instanceConverter, + MetadataService metadataService, AtlasEntityStore entitiesStore) { this.typeRegistry = typeRegistry; - this.restAdapters = restAdapters; + this.restAdapters = instanceConverter; this.metadataService = metadataService; this.entitiesStore = entitiesStore; } @@ -437,4 +446,4 @@ public class EntityREST { } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java b/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java deleted file mode 100644 index c5616df..0000000 --- a/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas; - -import com.google.inject.Inject; -import com.sun.jersey.api.client.ClientResponse; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.web.resources.EntityResource; -import org.apache.atlas.web.service.ServiceState; -import org.apache.commons.lang.RandomStringUtils; -import org.codehaus.jettison.json.JSONObject; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Guice; -import org.testng.annotations.Test; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static org.apache.atlas.AtlasClient.ENTITIES; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyListOf; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -@Guice(modules= RepositoryMetadataModule.class) -public class LocalAtlasClientTest { - @Mock - private EntityResource mockEntityResource; - - @Inject - private EntityResource entityResource; - - @Mock - private ServiceState serviceState; - - @BeforeMethod - public void setup() { - MockitoAnnotations.initMocks(this); - } - - @Test - public void testCreateEntity() throws Exception { - Response response = mock(Response.class); - when(mockEntityResource.submit(any(HttpServletRequest.class))).thenReturn(response); - final String guid = random(); - when(response.getEntity()).thenReturn(new JSONObject() {{ - put(ENTITIES, new JSONObject( - new AtlasClient.EntityResult(Arrays.asList(guid), null, null).toString()).get(ENTITIES)); - }}); - - LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource); - List<String> results = atlasClient.createEntity(new Referenceable(random())); - assertEquals(results.size(), 1); - assertEquals(results.get(0), guid); - } - - @Test - public void testException() throws Exception { - LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource); - - Response response = mock(Response.class); - when(mockEntityResource.submit(any(HttpServletRequest.class))).thenThrow(new WebApplicationException(response)); - when(response.getEntity()).thenReturn(new JSONObject() {{ - put("stackTrace", "stackTrace"); - }}); - when(response.getStatus()).thenReturn(Response.Status.BAD_REQUEST.getStatusCode()); - try { - atlasClient.createEntity(new Referenceable(random())); - fail("Expected AtlasServiceException"); - } catch(AtlasServiceException e) { - assertEquals(e.getStatus(), ClientResponse.Status.BAD_REQUEST); - } - - when(mockEntityResource.updateByUniqueAttribute(anyString(), anyString(), anyString(), - any(HttpServletRequest.class))).thenThrow(new WebApplicationException(response)); - when(response.getStatus()).thenReturn(Response.Status.NOT_FOUND.getStatusCode()); - try { - atlasClient.updateEntity(random(), random(), random(), new Referenceable(random())); - fail("Expected AtlasServiceException"); - } catch(AtlasServiceException e) { - assertEquals(e.getStatus(), ClientResponse.Status.NOT_FOUND); - } - - } - - @Test - public void testIsServerReady() throws Exception { - when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); - LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource); - assertTrue(atlasClient.isServerReady()); - - when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.BECOMING_ACTIVE); - assertFalse(atlasClient.isServerReady()); - } - - @Test - public void testUpdateEntity() throws Exception { - final String guid = random(); - Response response = mock(Response.class); - when(mockEntityResource.updateByUniqueAttribute(anyString(), anyString(), anyString(), - any(HttpServletRequest.class))).thenReturn(response); - when(response.getEntity()).thenReturn(new JSONObject() {{ - put(ENTITIES, new JSONObject( - new AtlasClient.EntityResult(null, Arrays.asList(guid), null).toString()).get(ENTITIES)); - }}); - - LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource); - AtlasClient.EntityResult - entityResult = atlasClient.updateEntity(random(), random(), random(), new Referenceable(random())); - assertEquals(entityResult.getUpdateEntities(), Arrays.asList(guid)); - } - - @Test - public void testDeleteEntity() throws Exception { - final String guid = random(); - Response response = mock(Response.class); - when(response.getEntity()).thenReturn(new JSONObject() {{ - put(ENTITIES, new JSONObject( - new AtlasClient.EntityResult(null, null, Arrays.asList(guid)).toString()).get(ENTITIES)); - }}); - - when(mockEntityResource.deleteEntities(anyListOf(String.class), anyString(), anyString(), anyString())).thenReturn(response); - LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource); - AtlasClient.EntityResult entityResult = atlasClient.deleteEntity(random(), random(), random()); - assertEquals(entityResult.getDeletedEntities(), Arrays.asList(guid)); - } - - private String random() { - return RandomStringUtils.randomAlphanumeric(10); - } - - @Test - @Inject - public void testGetLocationURI() { - final String guid = "123"; - URI uri = entityResource.getLocationURI(new ArrayList<String>() {{ add(guid); }}); - uri.getRawPath().equals(AtlasConstants.DEFAULT_ATLAS_REST_ADDRESS + "/" + AtlasClient.API.GET_ENTITY.getPath() + "/" + guid); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index 873e562..13747b2 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -22,19 +22,30 @@ import com.google.inject.Inject; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.LocalAtlasClient; +import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.kafka.KafkaNotification; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v1.EntityStream; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.web.service.ServiceState; import org.apache.commons.lang.RandomStringUtils; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.testng.Assert; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.Guice; import org.testng.annotations.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.*; @Guice(modules = NotificationModule.class) public class NotificationHookConsumerKafkaTest { @@ -45,10 +56,28 @@ public class NotificationHookConsumerKafkaTest { @Inject private NotificationInterface notificationInterface; + + @Mock + private AtlasEntityStore atlasEntityStore; + + @Mock + private ServiceState serviceState; + + @Mock + private AtlasInstanceConverter instanceConverter; + + @Mock + private AtlasTypeRegistry typeRegistry; + private KafkaNotification kafkaNotification; @BeforeTest - public void setup() throws AtlasException, InterruptedException { + public void setup() throws AtlasException, InterruptedException, AtlasBaseException { + MockitoAnnotations.initMocks(this); + AtlasType mockType = mock(AtlasType.class); + when(typeRegistry.getType(anyString())).thenReturn(mockType); + AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class); + when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity); kafkaNotification = startKafkaServer(); } @@ -58,25 +87,25 @@ public class NotificationHookConsumerKafkaTest { } @Test - public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException { + public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException { try { produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); NotificationConsumer<HookNotification.HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false); - LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class); NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(kafkaNotification, localAtlasClient); + new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); - verify(localAtlasClient).setUser("test_user1"); - + verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); + // produce another message, and make sure it moves ahead. If commit succeeded, this would work. produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity())); consumeOneMessage(consumer, hookConsumer); - verify(localAtlasClient).setUser("test_user2"); + verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); + reset(atlasEntityStore); } finally { kafkaNotification.close(); @@ -90,20 +119,19 @@ public class NotificationHookConsumerKafkaTest { NotificationConsumer<HookNotification.HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, true); - LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class); NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(kafkaNotification, localAtlasClient); + new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); - verify(localAtlasClient).setUser("test_user3"); + verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); // produce another message, but this will not be consumed, as commit code is not executed in hook consumer. produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity())); consumeOneMessage(consumer, hookConsumer); - verify(localAtlasClient).setUser("test_user3"); + verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); } finally { kafkaNotification.close();
