Repository: incubator-atlas Updated Branches: refs/heads/master 57f4f79d6 -> 4b8b9e22f
ATLAS-1472: updated type-registry to handle simultaneous updates from multiple threads Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/4b8b9e22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/4b8b9e22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/4b8b9e22 Branch: refs/heads/master Commit: 4b8b9e22f8e279f739899bc50e20ffdae6142586 Parents: 57f4f79 Author: Madhan Neethiraj <[email protected]> Authored: Fri Jan 27 03:22:26 2017 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Sat Jan 28 17:47:43 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/atlas/AtlasErrorCode.java | 1 + .../apache/atlas/type/AtlasTypeRegistry.java | 117 ++++++++- .../org/apache/atlas/type/AtlasTypeUtil.java | 12 +- .../org/apache/atlas/model/ModelTestUtil.java | 36 ++- .../apache/atlas/type/TestAtlasEntityType.java | 51 ++-- .../atlas/type/TestAtlasTypeRegistry.java | 185 ++++++++++++++- .../store/graph/AtlasTypeDefGraphStore.java | 236 ++++++------------- .../graph/v1/AtlasTypeDefGraphStoreV1.java | 8 +- .../util/AtlasRepositoryConfiguration.java | 21 ++ 9 files changed, 450 insertions(+), 217 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index e7dbb1c..0fb16c6 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -76,6 +76,7 @@ public enum AtlasErrorCode { INTERNAL_ERROR(500, "ATLAS5001E", "Internal server error {0}"), INDEX_CREATION_FAILED(500, "ATLAS5002E", "Index creation failed for {0}"), INDEX_ROLLBACK_FAILED(500, "ATLAS5003E", "Index rollback failed for {0}"), + FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5004E", "Failed to get the lock; another type update might be in progress. Please try again"), INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND(400, "ATLAS40018E", "Instance {0} with unique attribute {1} does not exist"), http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java index 3de0215..3f3ea59 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java @@ -38,6 +38,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX; import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX; @@ -51,15 +53,20 @@ import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_SUF @Singleton public class AtlasTypeRegistry { private static final Logger LOG = LoggerFactory.getLogger(AtlasStructType.class); + private static final int DEFAULT_LOCK_MAX_WAIT_TIME_IN_SECONDS = 15; - protected RegistryData registryData; + protected RegistryData registryData; + private final TypeRegistryUpdateSynchronizer updateSynchronizer; public AtlasTypeRegistry() { - registryData = new RegistryData(); + registryData = new RegistryData(); + updateSynchronizer = new TypeRegistryUpdateSynchronizer(this); } + // used only by AtlasTransientTypeRegistry protected AtlasTypeRegistry(AtlasTypeRegistry other) { - registryData = new RegistryData(other.registryData); + registryData = new RegistryData(other.registryData); + updateSynchronizer = other.updateSynchronizer; } public Collection<String> getAllTypeNames() { return registryData.allTypes.getAllTypeNames(); } @@ -195,14 +202,19 @@ public class AtlasTypeRegistry { public AtlasEntityType getEntityTypeByName(String name) { return registryData.entityDefs.getTypeByName(name); } - public AtlasTransientTypeRegistry createTransientTypeRegistry() { - return new AtlasTransientTypeRegistry(this); + public AtlasTransientTypeRegistry lockTypeRegistryForUpdate() throws AtlasBaseException { + return lockTypeRegistryForUpdate(DEFAULT_LOCK_MAX_WAIT_TIME_IN_SECONDS); } - public void commitTransientTypeRegistry(AtlasTransientTypeRegistry transientTypeRegistry) { - this.registryData = transientTypeRegistry.registryData; + public AtlasTransientTypeRegistry lockTypeRegistryForUpdate(int lockMaxWaitTimeInSeconds) throws AtlasBaseException { + return updateSynchronizer.lockTypeRegistryForUpdate(lockMaxWaitTimeInSeconds); } + public void releaseTypeRegistryForUpdate(AtlasTransientTypeRegistry transientTypeRegistry, boolean commitUpdates) { + updateSynchronizer.releaseTypeRegistryForUpdate(transientTypeRegistry, commitUpdates); + } + + static class RegistryData { final TypeCache allTypes; final TypeDefCache<AtlasEnumDef, AtlasEnumType> enumDefs; @@ -519,12 +531,16 @@ public class AtlasTypeRegistry { public List<AtlasBaseTypeDef> getDeleteedTypes() { return deletedTypes; } - private void addTypeWithNoRefResolve(AtlasBaseTypeDef typeDef) { + private void addTypeWithNoRefResolve(AtlasBaseTypeDef typeDef) throws AtlasBaseException{ if (LOG.isDebugEnabled()) { LOG.debug("==> AtlasTypeRegistry.addTypeWithNoRefResolve({})", typeDef); } if (typeDef != null) { + if (this.isRegisteredType(typeDef.getName())) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, typeDef.getName()); + } + if (typeDef.getClass().equals(AtlasEnumDef.class)) { AtlasEnumDef enumDef = (AtlasEnumDef) typeDef; @@ -552,7 +568,7 @@ public class AtlasTypeRegistry { } } - private void addTypesWithNoRefResolve(Collection<? extends AtlasBaseTypeDef> typeDefs) { + private void addTypesWithNoRefResolve(Collection<? extends AtlasBaseTypeDef> typeDefs) throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> AtlasTypeRegistry.addTypesWithNoRefResolve(length={})", (typeDefs == null ? 0 : typeDefs.size())); @@ -681,6 +697,89 @@ public class AtlasTypeRegistry { } } } + + static class TypeRegistryUpdateSynchronizer { + private final AtlasTypeRegistry typeRegistry; + private final ReentrantLock typeRegistryUpdateLock; + private AtlasTransientTypeRegistry typeRegistryUnderUpdate = null; + private String lockedByThread = null; + + TypeRegistryUpdateSynchronizer(AtlasTypeRegistry typeRegistry) { + this.typeRegistry = typeRegistry; + this.typeRegistryUpdateLock = new ReentrantLock(); + } + + AtlasTransientTypeRegistry lockTypeRegistryForUpdate(int lockMaxWaitTimeInSeconds) throws AtlasBaseException { + LOG.debug("==> lockTypeRegistryForUpdate()"); + + boolean alreadyLockedByCurrentThread = typeRegistryUpdateLock.isHeldByCurrentThread(); + + if (!alreadyLockedByCurrentThread) { + if (LOG.isDebugEnabled()) { + LOG.debug("lockTypeRegistryForUpdate(): waiting for lock to be released by thread {}", lockedByThread); + } + } else { + LOG.warn("lockTypeRegistryForUpdate(): already locked. currentLockCount={}", + typeRegistryUpdateLock.getHoldCount()); + } + + try { + boolean isLocked = typeRegistryUpdateLock.tryLock(lockMaxWaitTimeInSeconds, TimeUnit.SECONDS); + + if (!isLocked) { + throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK); + } + } catch (InterruptedException excp) { + throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK, excp); + } + + if (!alreadyLockedByCurrentThread) { + if (LOG.isDebugEnabled()) { + LOG.debug("lockTypeRegistryForUpdate(): wait over..got the lock"); + } + + typeRegistryUnderUpdate = new AtlasTransientTypeRegistry(typeRegistry); + lockedByThread = Thread.currentThread().getName(); + } + + LOG.debug("<== lockTypeRegistryForUpdate()"); + + return typeRegistryUnderUpdate; + } + + void releaseTypeRegistryForUpdate(AtlasTransientTypeRegistry ttr, boolean commitUpdates) { + LOG.debug("==> releaseTypeRegistryForUpdate()"); + + if (typeRegistryUpdateLock.isHeldByCurrentThread()) { + try { + if (typeRegistryUnderUpdate != ttr) { + LOG.error("releaseTypeRegistryForUpdate(): incorrect typeRegistry returned for release" + + ": found=" + ttr + "; expected=" + typeRegistryUnderUpdate, + new Exception().fillInStackTrace()); + } else if (typeRegistryUpdateLock.getHoldCount() == 1) { + if (ttr != null && commitUpdates) { + typeRegistry.registryData = ttr.registryData; + } + } + + if (typeRegistryUpdateLock.getHoldCount() == 1) { + lockedByThread = null; + typeRegistryUnderUpdate = null; + } else { + LOG.warn("releaseTypeRegistryForUpdate(): pendingReleaseCount={}", typeRegistryUpdateLock.getHoldCount() - 1); + } + } finally { + typeRegistryUpdateLock.unlock(); + } + } else { + LOG.error("releaseTypeRegistryForUpdate(): current thread does not hold the lock", + new Exception().fillInStackTrace()); + } + + LOG.debug("<== releaseTypeRegistryForUpdate()"); + } + + } } class TypeCache { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java index 089bebee..e4f1eea 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java @@ -17,7 +17,6 @@ */ package org.apache.atlas.type; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.atlas.AtlasErrorCode; @@ -36,7 +35,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -52,21 +50,19 @@ import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_S import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_KEY_VAL_SEP; import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_PREFIX; import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_SUFFIX; -import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_PARAM_REF_ATTRIBUTE; -import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_MAPPED_FROM_REF; /** * Utility methods for AtlasType/AtlasTypeDef. */ public class AtlasTypeUtil { private static final Set<String> ATLAS_BUILTIN_TYPENAMES = new HashSet<>(); - private static final String NAME_REGEX = "[a-zA-Z][a-zA-Z0-9_ ]*"; + private static final String NAME_REGEX = "[a-zA-Z][a-zA-Z0-9_ ]*"; private static final String TRAIT_NAME_REGEX = "[a-zA-Z][a-zA-Z0-9_ .]*"; - private static final Pattern NAME_PATTERN = Pattern.compile(NAME_REGEX); + private static final Pattern NAME_PATTERN = Pattern.compile(NAME_REGEX); private static final Pattern TRAIT_NAME_PATTERN = Pattern.compile(TRAIT_NAME_REGEX); - private static final String InvalidTypeNameErrorMessage = "Names must consist of a letter followed by a sequence of letter, number, or '_' characters."; - private static final String InvalidTraitTypeNameErrorMessage = "Names must consist of a leter followed by a sequence of letters, numbers, '.', or '_' characters."; + private static final String InvalidTypeNameErrorMessage = "Name must consist of a letter followed by a sequence of [ letter, number, '_' ] characters."; + private static final String InvalidTraitTypeNameErrorMessage = "Name must consist of a letter followed by a sequence of [ letter, number, '_', '.' ] characters."; static { Collections.addAll(ATLAS_BUILTIN_TYPENAMES, AtlasBaseTypeDef.ATLAS_BUILTIN_TYPES); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java ---------------------------------------------------------------------- diff --git a/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java b/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java index 6d3c312..5c72470 100644 --- a/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java +++ b/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java @@ -158,16 +158,21 @@ public final class ModelTestUtil { ret.setDefaultValue(ret.getElementDefs().get(idxDefault).getValue()); } + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; + try { - AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry(); + ttr = typesRegistry.lockTypeRegistryForUpdate(); ttr.addType(ret); - typesRegistry.commitTransientTypeRegistry(ttr); + commit = true; } catch (AtlasBaseException excp) { LOG.error("failed to create enum-def", excp); ret = null; + } finally { + typesRegistry.releaseTypeRegistryForUpdate(ttr, commit); } return ret; @@ -186,16 +191,21 @@ public final class ModelTestUtil { ret.setDescription(ret.getName()); ret.setAttributeDefs(newAttributeDefsWithAllBuiltInTypes(PREFIX_ATTRIBUTE_NAME)); + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; + try { - AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry(); + ttr = typesRegistry.lockTypeRegistryForUpdate(); ttr.addType(ret); - typesRegistry.commitTransientTypeRegistry(ttr); + commit = true; } catch (AtlasBaseException excp) { LOG.error("failed to create struct-def", excp); ret = null; + } finally { + typesRegistry.releaseTypeRegistryForUpdate(ttr, commit); } return ret; @@ -228,16 +238,21 @@ public final class ModelTestUtil { } } + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; + try { - AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry(); + ttr = typesRegistry.lockTypeRegistryForUpdate(); ttr.addType(ret); - typesRegistry.commitTransientTypeRegistry(ttr); + commit = true; } catch (AtlasBaseException excp) { LOG.error("failed to create entity-def", excp); ret = null; + } finally { + typesRegistry.releaseTypeRegistryForUpdate(ttr, commit); } return ret; @@ -279,16 +294,21 @@ public final class ModelTestUtil { } } + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; + try { - AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry(); + ttr = typesRegistry.lockTypeRegistryForUpdate(); ttr.addType(ret); - typesRegistry.commitTransientTypeRegistry(ttr); + commit = true; } catch (AtlasBaseException excp) { LOG.error("failed to create classification-def", excp); ret = null; + } finally { + typesRegistry.releaseTypeRegistryForUpdate(ttr, commit); } return ret; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java ---------------------------------------------------------------------- diff --git a/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java b/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java index 710840f..4e15edd 100644 --- a/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java +++ b/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java @@ -124,21 +124,25 @@ public class TestAtlasEntityType { @Test public void testForeignKeyConstraintValid() { - AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); - List<AtlasEntityDef> entityDefs = new ArrayList<>(); - String failureMsg = null; + AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; + List<AtlasEntityDef> entityDefs = new ArrayList<>(); + String failureMsg = null; entityDefs.add(createTableEntityDef()); entityDefs.add(createColumnEntityDef()); try { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + ttr = typeRegistry.lockTypeRegistryForUpdate(); ttr.addTypes(entityDefs); - typeRegistry.commitTransientTypeRegistry(ttr); + commit = true; } catch (AtlasBaseException excp) { failureMsg = excp.getMessage(); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commit); } assertNull(failureMsg, "failed to create types my_table and my_column"); } @@ -151,55 +155,68 @@ public class TestAtlasEntityType { entityDefs.add(createTableEntityDef()); + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; + try { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + ttr = typeRegistry.lockTypeRegistryForUpdate(); ttr.addTypes(entityDefs); - typeRegistry.commitTransientTypeRegistry(ttr); + commit = true; } catch (AtlasBaseException excp) { failureMsg = excp.getMessage(); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commit); } assertNotNull(failureMsg, "expected invalid constraint failure - unknown attribute in mappedFromRef"); } @Test public void testForeignKeyConstraintInValidMappedFromRef2() { - AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); - List<AtlasEntityDef> entityDefs = new ArrayList<>(); - String failureMsg = null; + AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; + List<AtlasEntityDef> entityDefs = new ArrayList<>(); + String failureMsg = null; entityDefs.add(createTableEntityDefWithMissingRefAttribute()); entityDefs.add(createColumnEntityDef()); try { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + ttr = typeRegistry.lockTypeRegistryForUpdate(); ttr.addTypes(entityDefs); - typeRegistry.commitTransientTypeRegistry(ttr); + commit = true; } catch (AtlasBaseException excp) { failureMsg = excp.getMessage(); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commit); } assertNotNull(failureMsg, "expected invalid constraint failure - missing refAttribute in mappedFromRef"); } @Test public void testForeignKeyConstraintInValidForeignKey() { - AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); - List<AtlasEntityDef> entityDefs = new ArrayList<>(); - String failureMsg = null; + AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; + List<AtlasEntityDef> entityDefs = new ArrayList<>(); + String failureMsg = null; entityDefs.add(createColumnEntityDef()); try { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + ttr = typeRegistry.lockTypeRegistryForUpdate(); ttr.addTypes(entityDefs); - typeRegistry.commitTransientTypeRegistry(ttr); + commit = true; } catch (AtlasBaseException excp) { failureMsg = excp.getMessage(); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commit); } assertNotNull(failureMsg, "expected invalid constraint failure - unknown attribute in foreignKey"); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java ---------------------------------------------------------------------- diff --git a/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java b/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java index 60a09a1..f93a2e8 100644 --- a/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java +++ b/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java @@ -19,11 +19,8 @@ package org.apache.atlas.type; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.typedef.AtlasBaseTypeDef; -import org.apache.atlas.model.typedef.AtlasClassificationDef; -import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.*; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; -import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.type.AtlasTypeRegistry.AtlasTransientTypeRegistry; import org.testng.annotations.Test; @@ -31,6 +28,10 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static org.testng.Assert.*; @@ -82,17 +83,23 @@ public class TestAtlasTypeRegistry { typesDef.getClassificationDefs().add(classifiL2_4); AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; String failureMsg = null; try { + ttr = typeRegistry.lockTypeRegistryForUpdate(); + ttr.addTypes(typesDef); + + commit = true; } catch (AtlasBaseException excp) { failureMsg = excp.getMessage(); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commit); } assertNull(failureMsg); - typeRegistry.commitTransientTypeRegistry(ttr); validateSuperTypes(typeRegistry, "L0", new HashSet<String>()); validateSuperTypes(typeRegistry, "L1-1", new HashSet<>(Arrays.asList("L0"))); @@ -126,13 +133,20 @@ public class TestAtlasTypeRegistry { classifiDef1.addSuperType(classifiDef1.getName()); AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; String failureMsg = null; try { + ttr = typeRegistry.lockTypeRegistryForUpdate(); + ttr.addType(classifiDef1); + + commit = true; } catch (AtlasBaseException excp) { failureMsg = excp.getMessage(); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commit); } assertNotNull(failureMsg, "expected invalid supertype failure"); } @@ -178,13 +192,20 @@ public class TestAtlasTypeRegistry { typesDef.getClassificationDefs().add(classifiL2_4); AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; String failureMsg = null; try { + ttr = typeRegistry.lockTypeRegistryForUpdate(); + ttr.addTypes(typesDef); + + commit = true; } catch (AtlasBaseException excp) { failureMsg = excp.getMessage(); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commit); } assertNotNull(failureMsg, "expected invalid supertype failure"); } @@ -235,18 +256,23 @@ public class TestAtlasTypeRegistry { typesDef.getEntityDefs().add(entL2_4); AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; String failureMsg = null; try { + ttr = typeRegistry.lockTypeRegistryForUpdate(); + ttr.addTypes(typesDef); + + commit = true; } catch (AtlasBaseException excp) { failureMsg = excp.getMessage(); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commit); } assertNull(failureMsg); - typeRegistry.commitTransientTypeRegistry(ttr); - validateSuperTypes(typeRegistry, "L0", new HashSet<String>()); validateSuperTypes(typeRegistry, "L1-1", new HashSet<>(Arrays.asList("L0"))); validateSuperTypes(typeRegistry, "L1-2", new HashSet<>(Arrays.asList("L0"))); @@ -279,13 +305,20 @@ public class TestAtlasTypeRegistry { entDef1.addSuperType(entDef1.getName()); AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; String failureMsg = null; try { + ttr = typeRegistry.lockTypeRegistryForUpdate(); + ttr.addType(entDef1); + + commit = true; } catch (AtlasBaseException excp) { failureMsg = excp.getMessage(); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commit); } assertNotNull(failureMsg, "expected invalid supertype failure"); } @@ -331,17 +364,143 @@ public class TestAtlasTypeRegistry { typesDef.getEntityDefs().add(entL2_4); AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; String failureMsg = null; try { + ttr = typeRegistry.lockTypeRegistryForUpdate(); + ttr.addTypes(typesDef); + + commit = true; } catch (AtlasBaseException excp) { failureMsg = excp.getMessage(); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commit); } assertNotNull(failureMsg, "expected invalid supertype failure"); } + @Test + public void testNestedUpdates() { + AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); + AtlasTransientTypeRegistry ttr = null; + boolean commit = false; + String failureMsg = null; + AtlasClassificationDef testTag1 = new AtlasClassificationDef("testTag1"); + AtlasClassificationDef testTag2 = new AtlasClassificationDef("testTag2"); + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(); + + ttr.addType(testTag1); + + // changes should not be seen in typeRegistry until lock is released + assertFalse(typeRegistry.isRegisteredType(testTag1.getName()), + "type added should be seen in typeRegistry only after commit"); + + boolean isNestedUpdateSuccess = addType(typeRegistry, testTag2); + + assertTrue(isNestedUpdateSuccess); + + // changes made in nested commit, inside addType(), should not be seen in typeRegistry until lock is released here + assertFalse(typeRegistry.isRegisteredType(testTag2.getName()), + "type added within nested commit should be seen in typeRegistry only after outer commit"); + + commit = true; + } catch (AtlasBaseException excp) { + failureMsg = excp.getMessage(); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commit); + } + assertNull(failureMsg); + assertTrue(typeRegistry.isRegisteredType(testTag1.getName())); + assertTrue(typeRegistry.isRegisteredType(testTag2.getName())); + } + + @Test + public void testParallelUpdates() { + final int numOfThreads = 3; + final int numOfTypesPerKind = 30; + final String enumTypePrefix = "testEnum-"; + final String structTypePrefix = "testStruct-"; + final String classificationPrefix = "testTag-"; + final String entityTypePrefix = "testEntity-"; + + ExecutorService executor = Executors.newFixedThreadPool(numOfThreads); + + final AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); + + // update typeRegistry simultaneously in multiple threads + for (int threadIdx = 0; threadIdx < numOfThreads; threadIdx++) { + executor.submit(new Callable<Object>() { + @Override + public Object call() throws Exception { + for (int i = 0; i < numOfTypesPerKind; i++) { + addType(typeRegistry, new AtlasEnumDef(enumTypePrefix + i)); + } + + for (int i = 0; i < numOfTypesPerKind; i++) { + addType(typeRegistry, new AtlasStructDef(structTypePrefix + i)); + } + + for (int i = 0; i < numOfTypesPerKind; i++) { + addType(typeRegistry, new AtlasClassificationDef(classificationPrefix + i)); + } + + for (int i = 0; i < numOfTypesPerKind; i++) { + addType(typeRegistry, new AtlasEntityDef(entityTypePrefix + i)); + } + + return null; + } + }); + } + + executor.shutdown(); + + try { + boolean isCompleted = executor.awaitTermination(60, TimeUnit.SECONDS); + + assertTrue(isCompleted, "threads did not complete updating types"); + } catch (InterruptedException excp) { + // ignore? + } + + // verify that all types added are present in the typeRegistry + for (int i = 0; i < numOfTypesPerKind; i++) { + String enumType = enumTypePrefix + i; + String structType = structTypePrefix + i; + String classificationType = classificationPrefix + i; + String entityType = entityTypePrefix + i; + + assertNotNull(typeRegistry.getEnumDefByName(enumType), enumType + ": enum not found"); + assertNotNull(typeRegistry.getStructDefByName(structType), structType + ": struct not found"); + assertNotNull(typeRegistry.getClassificationDefByName(classificationType), classificationType + ": classification not found"); + assertNotNull(typeRegistry.getEntityDefByName(entityType), entityType + ": entity not found"); + } + } + + private boolean addType(AtlasTypeRegistry typeRegistry, AtlasBaseTypeDef typeDef) { + boolean ret = false; + AtlasTransientTypeRegistry ttr = null; + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(); + + ttr.addType(typeDef); + + ret = true; + } catch (AtlasBaseException excp) { + // ignore + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, ret); + } + + return ret; + } + private void validateSuperTypes(AtlasTypeRegistry typeRegistry, String typeName, Set<String> expectedSuperTypes) { AtlasType type = null; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java index f7c2931..433d09c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java @@ -43,6 +43,7 @@ import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasStructType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry.AtlasTransientTypeRegistry; +import org.apache.atlas.util.AtlasRepositoryConfiguration; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.Predicate; import org.apache.commons.collections.Transformer; @@ -65,14 +66,15 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefGraphStore.class); - private final AtlasTypeRegistry typeRegistry; - + private final AtlasTypeRegistry typeRegistry; private final Set<TypeDefChangeListener> typeDefChangeListeners; + private final int typeUpdateLockMaxWaitTimeSeconds; protected AtlasTypeDefGraphStore(AtlasTypeRegistry typeRegistry, Set<TypeDefChangeListener> typeDefChangeListeners) { - this.typeRegistry = typeRegistry; - this.typeDefChangeListeners = typeDefChangeListeners; + this.typeRegistry = typeRegistry; + this.typeDefChangeListeners = typeDefChangeListeners; + this.typeUpdateLockMaxWaitTimeSeconds = AtlasRepositoryConfiguration.getTypeUpdateLockMaxWaitTimeInSeconds(); } protected abstract AtlasEnumDefStore getEnumDefStore(AtlasTypeRegistry typeRegistry); @@ -85,16 +87,23 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ @Override public void init() throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = null; + boolean commitUpdates = false; - AtlasTypesDef typesDef = new AtlasTypesDef(getEnumDefStore(ttr).getAll(), - getStructDefStore(ttr).getAll(), - getClassificationDefStore(ttr).getAll(), - getEntityDefStore(ttr).getAll()); + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(typeUpdateLockMaxWaitTimeSeconds); - ttr.addTypes(typesDef); + AtlasTypesDef typesDef = new AtlasTypesDef(getEnumDefStore(ttr).getAll(), + getStructDefStore(ttr).getAll(), + getClassificationDefStore(ttr).getAll(), + getEntityDefStore(ttr).getAll()); - typeRegistry.commitTransientTypeRegistry(ttr); + ttr.addTypes(typesDef); + + commitUpdates = true; + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); + } bootstrapTypes(); } @@ -102,7 +111,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ @Override @GraphTransaction public AtlasEnumDef createEnumDef(AtlasEnumDef enumDef) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.addType(enumDef); @@ -110,22 +119,15 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ttr.updateGuid(ret.getName(), ret.getGuid()); - updateTypeRegistryPostCommit(ttr); - return ret; } @Override @GraphTransaction public List<AtlasEnumDef> getAllEnumDefs() throws AtlasBaseException { - List<AtlasEnumDef> ret = null; - Collection<AtlasEnumDef> enumDefs = typeRegistry.getAllEnumDefs(); - ret = CollectionUtils.isNotEmpty(enumDefs) ? - new ArrayList<>(enumDefs) : Collections.<AtlasEnumDef>emptyList(); - - return ret; + return CollectionUtils.isNotEmpty(enumDefs) ? new ArrayList<>(enumDefs) : Collections.<AtlasEnumDef>emptyList(); } @Override @@ -151,70 +153,53 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ @Override @GraphTransaction public AtlasEnumDef updateEnumDefByName(String name, AtlasEnumDef enumDef) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.updateTypeByName(name, enumDef); - AtlasEnumDef ret = getEnumDefStore(ttr).updateByName(name, enumDef); - - updateTypeRegistryPostCommit(ttr); - - return ret; + return getEnumDefStore(ttr).updateByName(name, enumDef); } @Override @GraphTransaction public AtlasEnumDef updateEnumDefByGuid(String guid, AtlasEnumDef enumDef) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.updateTypeByGuid(guid, enumDef); - AtlasEnumDef ret = getEnumDefStore(ttr).updateByGuid(guid, enumDef); - - updateTypeRegistryPostCommit(ttr); - - return ret; + return getEnumDefStore(ttr).updateByGuid(guid, enumDef); } @Override @GraphTransaction public void deleteEnumDefByName(String name) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); - - AtlasEnumDef byName = typeRegistry.getEnumDefByName(name); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.removeTypeByName(name); getEnumDefStore(ttr).deleteByName(name); - - updateTypeRegistryPostCommit(ttr); } @Override @GraphTransaction public void deleteEnumDefByGuid(String guid) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); - - AtlasEnumDef byGuid = typeRegistry.getEnumDefByGuid(guid); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.removeTypeByGuid(guid); getEnumDefStore(ttr).deleteByGuid(guid); - - updateTypeRegistryPostCommit(ttr); } @Override @GraphTransaction public AtlasEnumDefs searchEnumDefs(SearchFilter filter) throws AtlasBaseException { - AtlasEnumDefs search = getEnumDefStore(typeRegistry).search(filter); - return search; + return getEnumDefStore(typeRegistry).search(filter); } @Override @GraphTransaction public AtlasStructDef createStructDef(AtlasStructDef structDef) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.addType(structDef); @@ -222,31 +207,26 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ttr.updateGuid(ret.getName(), ret.getGuid()); - updateTypeRegistryPostCommit(ttr); - return ret; } @Override @GraphTransaction public List<AtlasStructDef> getAllStructDefs() throws AtlasBaseException { - List<AtlasStructDef> ret = null; - Collection<AtlasStructDef> structDefs = typeRegistry.getAllStructDefs(); - ret = CollectionUtils.isNotEmpty(structDefs) ? - new ArrayList<>(structDefs) : Collections.<AtlasStructDef>emptyList(); - - return ret; + return CollectionUtils.isNotEmpty(structDefs) ? new ArrayList<>(structDefs) : Collections.<AtlasStructDef>emptyList(); } @Override @GraphTransaction public AtlasStructDef getStructDefByName(String name) throws AtlasBaseException { AtlasStructDef ret = typeRegistry.getStructDefByName(name); + if (ret == null) { throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); } + return ret; } @@ -254,81 +234,65 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ @GraphTransaction public AtlasStructDef getStructDefByGuid(String guid) throws AtlasBaseException { AtlasStructDef ret = typeRegistry.getStructDefByGuid(guid); + if (ret == null) { throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); } + return ret; } @Override @GraphTransaction public AtlasStructDef updateStructDefByName(String name, AtlasStructDef structDef) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.updateTypeByName(name, structDef); - AtlasStructDef ret = getStructDefStore(ttr).updateByName(name, structDef); - - updateTypeRegistryPostCommit(ttr); - - return ret; + return getStructDefStore(ttr).updateByName(name, structDef); } @Override @GraphTransaction public AtlasStructDef updateStructDefByGuid(String guid, AtlasStructDef structDef) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.updateTypeByGuid(guid, structDef); - AtlasStructDef ret = getStructDefStore(ttr).updateByGuid(guid, structDef); - - updateTypeRegistryPostCommit(ttr); - - return ret; + return getStructDefStore(ttr).updateByGuid(guid, structDef); } @Override @GraphTransaction public void deleteStructDefByName(String name) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); - - AtlasStructDef byName = typeRegistry.getStructDefByName(name); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.removeTypeByName(name); getStructDefStore(ttr).deleteByName(name, null); - - updateTypeRegistryPostCommit(ttr); } @Override @GraphTransaction public void deleteStructDefByGuid(String guid) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); - - AtlasStructDef byGuid = typeRegistry.getStructDefByGuid(guid); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.removeTypeByGuid(guid); getStructDefStore(ttr).deleteByGuid(guid, null); - - updateTypeRegistryPostCommit(ttr); } @Override @GraphTransaction public AtlasStructDefs searchStructDefs(SearchFilter filter) throws AtlasBaseException { - AtlasStructDefs search = getStructDefStore(typeRegistry).search(filter); - - return search; + return getStructDefStore(typeRegistry).search(filter); } @Override @GraphTransaction public AtlasClassificationDef createClassificationDef(AtlasClassificationDef classificationDef) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.addType(classificationDef); @@ -336,22 +300,16 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ttr.updateGuid(ret.getName(), ret.getGuid()); - updateTypeRegistryPostCommit(ttr); - return ret; } @Override @GraphTransaction public List<AtlasClassificationDef> getAllClassificationDefs() throws AtlasBaseException { - List<AtlasClassificationDef> ret = null; - Collection<AtlasClassificationDef> classificationDefs = typeRegistry.getAllClassificationDefs(); - ret = CollectionUtils.isNotEmpty(classificationDefs) ? - new ArrayList<>(classificationDefs) : Collections.<AtlasClassificationDef>emptyList(); - - return ret; + return CollectionUtils.isNotEmpty(classificationDefs) ? new ArrayList<>(classificationDefs) + : Collections.<AtlasClassificationDef>emptyList(); } @Override @@ -362,6 +320,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ if (ret == null) { throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); } + return ret; } @@ -369,9 +328,11 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ @GraphTransaction public AtlasClassificationDef getClassificationDefByGuid(String guid) throws AtlasBaseException { AtlasClassificationDef ret = typeRegistry.getClassificationDefByGuid(guid); + if (ret == null) { throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); } + return ret; } @@ -379,72 +340,54 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ @GraphTransaction public AtlasClassificationDef updateClassificationDefByName(String name, AtlasClassificationDef classificationDef) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.updateTypeByName(name, classificationDef); - AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByName(name, classificationDef); - - updateTypeRegistryPostCommit(ttr); - - return ret; + return getClassificationDefStore(ttr).updateByName(name, classificationDef); } @Override @GraphTransaction public AtlasClassificationDef updateClassificationDefByGuid(String guid, AtlasClassificationDef classificationDef) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.updateTypeByGuid(guid, classificationDef); - AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByGuid(guid, classificationDef); - - updateTypeRegistryPostCommit(ttr); - - return ret; + return getClassificationDefStore(ttr).updateByGuid(guid, classificationDef); } @Override @GraphTransaction public void deleteClassificationDefByName(String name) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); - - AtlasClassificationDef byName = typeRegistry.getClassificationDefByName(name); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.removeTypeByName(name); getClassificationDefStore(ttr).deleteByName(name, null); - - updateTypeRegistryPostCommit(ttr); } @Override @GraphTransaction public void deleteClassificationDefByGuid(String guid) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); - - AtlasClassificationDef byGuid = typeRegistry.getClassificationDefByGuid(guid); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.removeTypeByGuid(guid); getClassificationDefStore(ttr).deleteByGuid(guid, null); - - updateTypeRegistryPostCommit(ttr); } @Override @GraphTransaction public AtlasClassificationDefs searchClassificationDefs(SearchFilter filter) throws AtlasBaseException { - AtlasClassificationDefs search = getClassificationDefStore(typeRegistry).search(filter); - - return search; + return getClassificationDefStore(typeRegistry).search(filter); } @Override @GraphTransaction public AtlasEntityDef createEntityDef(AtlasEntityDef entityDef) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.addType(entityDef); @@ -452,31 +395,26 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ttr.updateGuid(ret.getName(), ret.getGuid()); - updateTypeRegistryPostCommit(ttr); - return ret; } @Override @GraphTransaction public List<AtlasEntityDef> getAllEntityDefs() throws AtlasBaseException { - List<AtlasEntityDef> ret = null; - Collection<AtlasEntityDef> entityDefs = typeRegistry.getAllEntityDefs(); - ret = CollectionUtils.isNotEmpty(entityDefs) ? - new ArrayList<>(entityDefs) : Collections.<AtlasEntityDef>emptyList(); - - return ret; + return CollectionUtils.isNotEmpty(entityDefs) ? new ArrayList<>(entityDefs) : Collections.<AtlasEntityDef>emptyList(); } @Override @GraphTransaction public AtlasEntityDef getEntityDefByName(String name) throws AtlasBaseException { AtlasEntityDef ret = typeRegistry.getEntityDefByName(name); + if (ret == null) { throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); } + return ret; } @@ -484,74 +422,58 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ @GraphTransaction public AtlasEntityDef getEntityDefByGuid(String guid) throws AtlasBaseException { AtlasEntityDef ret = typeRegistry.getEntityDefByGuid(guid); + if (ret == null) { throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); } + return ret; } @Override @GraphTransaction public AtlasEntityDef updateEntityDefByName(String name, AtlasEntityDef entityDef) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.updateTypeByName(name, entityDef); - AtlasEntityDef ret = getEntityDefStore(ttr).updateByName(name, entityDef); - - updateTypeRegistryPostCommit(ttr); - - return ret; + return getEntityDefStore(ttr).updateByName(name, entityDef); } @Override @GraphTransaction public AtlasEntityDef updateEntityDefByGuid(String guid, AtlasEntityDef entityDef) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.updateTypeByGuid(guid, entityDef); - AtlasEntityDef ret = getEntityDefStore(ttr).updateByGuid(guid, entityDef); - - updateTypeRegistryPostCommit(ttr); - - return ret; + return getEntityDefStore(ttr).updateByGuid(guid, entityDef); } @Override @GraphTransaction public void deleteEntityDefByName(String name) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); - - AtlasEntityDef byName = typeRegistry.getEntityDefByName(name); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.removeTypeByName(name); getEntityDefStore(ttr).deleteByName(name, null); - - updateTypeRegistryPostCommit(ttr); } @Override @GraphTransaction public void deleteEntityDefByGuid(String guid) throws AtlasBaseException { - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); - - AtlasEntityDef byGuid = typeRegistry.getEntityDefByGuid(guid); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.removeTypeByGuid(guid); getEntityDefStore(ttr).deleteByGuid(guid, null); - - updateTypeRegistryPostCommit(ttr); } @Override @GraphTransaction public AtlasEntityDefs searchEntityDefs(SearchFilter filter) throws AtlasBaseException { - AtlasEntityDefs search = getEntityDefStore(typeRegistry).search(filter); - - return search; + return getEntityDefStore(typeRegistry).search(filter); } @Override @@ -567,7 +489,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ AtlasTypesDef ret = new AtlasTypesDef(); - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.addTypes(typesDef); @@ -644,8 +566,6 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ } } - updateTypeRegistryPostCommit(ttr); - if (LOG.isDebugEnabled()) { LOG.debug("<== AtlasTypeDefGraphStore.createTypesDef(enums={}, structs={}, classfications={}, entities={})", CollectionUtils.size(typesDef.getEnumDefs()), @@ -670,7 +590,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ AtlasTypesDef ret = new AtlasTypesDef(); - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.updateTypes(typesDef); @@ -703,8 +623,6 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ } } - updateTypeRegistryPostCommit(ttr); - if (LOG.isDebugEnabled()) { LOG.debug("<== AtlasTypeDefGraphStore.updateTypesDef(enums={}, structs={}, classfications={}, entities={})", CollectionUtils.size(typesDef.getEnumDefs()), @@ -728,7 +646,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ CollectionUtils.size(typesDef.getEntityDefs())); } - AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit(); ttr.addTypes(typesDef); @@ -817,8 +735,6 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ } } - updateTypeRegistryPostCommit(ttr); - if (LOG.isDebugEnabled()) { LOG.debug("<== AtlasTypeDefGraphStore.deleteTypesDef(enums={}, structs={}, classfications={}, entities={})", CollectionUtils.size(typesDef.getEnumDefs()), @@ -934,8 +850,12 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ storeInitializer.initializeStore(this, typeRegistry, typesDirName); } - private void updateTypeRegistryPostCommit(AtlasTransientTypeRegistry ttr) { + private AtlasTransientTypeRegistry lockTypeRegistryAndReleasePostCommit() throws AtlasBaseException { + AtlasTransientTypeRegistry ttr = typeRegistry.lockTypeRegistryForUpdate(typeUpdateLockMaxWaitTimeSeconds); + new TypeRegistryUpdateHook(ttr); + + return ttr; } private class TypeRegistryUpdateHook extends GraphTransactionInterceptor.PostTransactionHook { @@ -953,9 +873,9 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ LOG.debug("==> TypeRegistryUpdateHook.onComplete({})", isSuccess); } - if (isSuccess) { - typeRegistry.commitTransientTypeRegistry(ttr); + typeRegistry.releaseTypeRegistryForUpdate(ttr, isSuccess); + if (isSuccess) { notifyListeners(ttr); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java index 287ef09..88197ac 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java @@ -68,7 +68,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { Set<TypeDefChangeListener> typeDefChangeListeners) { super(typeRegistry, typeDefChangeListeners); - LOG.info("==> AtlasTypeDefGraphStoreV1()"); + LOG.debug("==> AtlasTypeDefGraphStoreV1()"); try { init(); @@ -76,7 +76,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { LOG.error("failed to initialize types from graph store", excp); } - LOG.info("<== AtlasTypeDefGraphStoreV1()"); + LOG.debug("<== AtlasTypeDefGraphStoreV1()"); } @Override @@ -101,11 +101,11 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { @Override public void init() throws AtlasBaseException { - LOG.info("==> AtlasTypeDefGraphStoreV1.init()"); + LOG.debug("==> AtlasTypeDefGraphStoreV1.init()"); super.init(); - LOG.info("<== AtlasTypeDefGraphStoreV1.init()"); + LOG.debug("<== AtlasTypeDefGraphStoreV1.init()"); } AtlasGraph getAtlasGraph() { return atlasGraph; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java index 6655085..71c7ff8 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java @@ -48,6 +48,10 @@ public class AtlasRepositoryConfiguration { private static List<String> skippedOperations = null; public static final String SEPARATOR = ":"; + private static final String CONFIG_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS = "atlas.server.type.update.lock.max.wait.time.seconds"; + private static final Integer DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS = Integer.valueOf(15); + private static Integer typeUpdateLockMaxWaitTimeInSeconds = null; + @SuppressWarnings("unchecked") public static Class<? extends TypeCache> getTypeCache() { // Get the type cache implementation class from Atlas configuration. @@ -155,4 +159,21 @@ public class AtlasRepositoryConfiguration { skippedOperations = null; } + public static int getTypeUpdateLockMaxWaitTimeInSeconds() { + Integer ret = typeUpdateLockMaxWaitTimeInSeconds; + + if (ret == null) { + try { + Configuration config = ApplicationProperties.get(); + + ret = config.getInteger(CONFIG_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS, DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS); + + typeUpdateLockMaxWaitTimeInSeconds = ret; + } catch (AtlasException e) { + // ignore + } + } + + return ret == null ? DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS : ret; + } }
