Repository: atlas Updated Branches: refs/heads/branch-0.8 a7bcd029f -> fd629982f
ATLAS-2827: fix to handle failure in saving indexable string property of large size - #3 (cherry picked from commit eae9761871af9aeecc0fdc2248b7c8555407d0a6) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/fd629982 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/fd629982 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/fd629982 Branch: refs/heads/branch-0.8 Commit: fd629982f9fb5ebc157d2f03cb24562806ae87bd Parents: a7bcd02 Author: Madhan Neethiraj <mad...@apache.org> Authored: Thu Sep 6 19:33:52 2018 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Fri Sep 7 00:29:02 2018 -0700 ---------------------------------------------------------------------- .../store/graph/v1/AtlasEntityStoreV1.java | 5 +++ .../store/graph/v1/EntityGraphMapper.java | 4 ++- .../java/org/apache/atlas/RequestContextV1.java | 38 ++++++++++++++++++++ .../notification/NotificationHookConsumer.java | 2 ++ 4 files changed, 48 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/fd629982/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 2bfef78..449a958 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -583,6 +583,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream); EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities(); EntityMutationContext context = new EntityMutationContext(discoveryContext); + RequestContextV1 requestContext = RequestContextV1.get(); for (String guid : discoveryContext.getReferencedGuids()) { AtlasVertex vertex = discoveryContext.getResolvedEntityVertex(guid); @@ -604,6 +605,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { if (!StringUtils.equals(guidVertex, guid)) { // if entity was found by unique attribute entity.setGuid(guidVertex); + + requestContext.recordEntityGuidUpdate(entity, guid); } context.addUpdated(guid, entity, entityType, vertex); @@ -625,6 +628,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { entity.setGuid(generatedGuid); + requestContext.recordEntityGuidUpdate(entity, guid); + context.addCreated(guid, entity, entityType, vertex); } http://git-wip-us.apache.org/repos/asf/atlas/blob/fd629982/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java index d38fbb3..448d167 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java @@ -459,11 +459,13 @@ public class EntityGraphMapper { } if (trimmedLength < value.length()) { - LOG.warn("Indexed-String-Attribute: {} length is {} characters, trimming to {}", ctx.getAttribute().getQualifiedName(), value.length(), trimmedLength); + LOG.warn("Length of indexed attribute {} is {} characters, longer than safe-limit {}; trimming to {} - attempt #{}", ctx.getAttribute().getQualifiedName(), value.length(), INDEXED_STR_SAFE_LEN, trimmedLength, requestContext.getAttemptCount()); String checksumSuffix = ":" + DigestUtils.shaHex(value); // Storing SHA checksum in case verification is needed after retrieval ret = value.substring(0, trimmedLength - checksumSuffix.length()) + checksumSuffix; + } else { + LOG.warn("Length of indexed attribute {} is {} characters, longer than safe-limit {}", ctx.getAttribute().getQualifiedName(), value.length(), INDEXED_STR_SAFE_LEN); } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/fd629982/server-api/src/main/java/org/apache/atlas/RequestContextV1.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java index a6d5820..bf6df46 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java @@ -24,9 +24,11 @@ import org.apache.atlas.model.instance.AtlasObjectId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -41,6 +43,7 @@ public class RequestContextV1 { private final Map<String, AtlasEntity> entityCacheV2 = new HashMap<>(); private final Metrics metrics = new Metrics(); private final long requestTime = System.currentTimeMillis(); + private List<EntityGuidPair> entityGuidInRequest = null; private String user; @@ -72,6 +75,10 @@ public class RequestContextV1 { instance.deletedEntities.clear(); instance.entityCacheV2.clear(); + if (instance.entityGuidInRequest != null) { + instance.entityGuidInRequest.clear(); + } + synchronized (ACTIVE_REQUESTS) { ACTIVE_REQUESTS.remove(instance); } @@ -162,4 +169,35 @@ public class RequestContextV1 { public static Metrics getMetrics() { return get().metrics; } + + + public void recordEntityGuidUpdate(AtlasEntity entity, String guidInRequest) { + if (entityGuidInRequest == null) { + entityGuidInRequest = new ArrayList<>(); + } + + entityGuidInRequest.add(new EntityGuidPair(entity, guidInRequest)); + } + + public void resetEntityGuidUpdates() { + if (entityGuidInRequest != null) { + for (EntityGuidPair entityGuidPair : entityGuidInRequest) { + entityGuidPair.resetEntityGuid(); + } + } + } + + public class EntityGuidPair { + private final AtlasEntity entity; + private final String guid; + + public EntityGuidPair(AtlasEntity entity, String guid) { + this.entity = entity; + this.guid = guid; + } + + public void resetEntityGuid() { + entity.setGuid(guid); + } + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/fd629982/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 4ad9b75..475be9a 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -494,6 +494,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl break; } catch (Throwable e) { + RequestContextV1.get().resetEntityGuidUpdates(); + LOG.warn("Error handling message", e); try { LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);