ATLAS-2637: migration-import updates for changes in collection attribute storage
Signed-off-by: Madhan Neethiraj <mad...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/47ec9f7a Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/47ec9f7a Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/47ec9f7a Branch: refs/heads/master Commit: 47ec9f7a8cc02346ac167849374bf088f55be74a Parents: a0269b9 Author: Ashutosh Mestry <ames...@hortonworks.com> Authored: Tue May 22 18:24:57 2018 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Tue May 22 23:04:32 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/atlas/AtlasConstants.java | 2 +- .../atlas/repository/graphdb/AtlasGraph.java | 3 +- .../graphdb/janus/AtlasJanusGraph.java | 262 +- .../graphdb/janus/AtlasJanusGraphDatabase.java | 10 +- .../janus/migration/AtlasGraphSONReader.java | 100 +- .../janus/migration/ElementProcessors.java | 423 ++ .../janus/migration/GraphSONUtility.java | 107 +- .../janus/migration/JsonNodeProcessManager.java | 1 + .../janus/migration/MappedElementCache.java | 44 - .../janus/migration/PostProcessManager.java | 100 +- .../migration/RelationshipCacheGenerator.java | 98 + .../janus/migration/RelationshipTypeCache.java | 37 - .../migration/TypesWithCollectionsFinder.java | 123 + .../postProcess/PostProcessListProperty.java | 68 + .../graphdb/janus/migration/BaseUtils.java | 107 +- .../GraphSONUtilityPostProcessTest.java | 95 - .../janus/migration/GraphSONUtilityTest.java | 266 +- .../janus/migration/JsonNodeParsersTest.java | 16 +- .../janus/migration/MappedElementCacheTest.java | 26 +- .../migration/PostProcessListPropertyTest.java | 142 + .../janus/src/test/resources/col-2-legacy.json | 73 + .../janus/src/test/resources/col-3-legacy.json | 86 + .../src/test/resources/edge-legacy-col.json | 27 + .../src/test/resources/edge-legacy-col2.json | 27 + .../src/test/resources/edge-legacy-col3.json | 27 + .../src/test/resources/edge-legacy-col4.json | 27 + .../src/test/resources/edge-legacy-process.json | 27 + .../src/test/resources/edge-legacy-tag.json | 27 + .../src/test/resources/lineage-v-98312.json | 79 + .../src/test/resources/table-v-147504.json | 83 +- .../janus/src/test/resources/tag-163856752.json | 35 + .../apache/atlas/store/AtlasTypeDefStore.java | 6 - .../test/java/org/apache/atlas/TestUtilsV2.java | 1 - .../migration/DataMigrationService.java | 48 +- .../migration/RelationshipCacheGenerator.java | 73 - .../graph/v1/AtlasTypeDefGraphStoreV1.java | 7 - .../migration/ComplexAttributesTest.java | 61 + .../migration/HiveParititionTest.java | 13 +- .../repository/migration/HiveStocksTest.java | 3 +- .../migration/MigrationBaseAsserts.java | 28 +- .../RelationshipCacheGeneratorTest.java | 96 + .../migration/RelationshipMappingTest.java | 87 - .../TypesWithCollectionsFinderTest.java | 84 + .../store/graph/v1/AtlasEntityTestBase.java | 5 +- .../complex-attr_db/atlas-migration-data.json | 5569 ++++++++++++++++++ .../atlas-migration-typesdef.json | 2303 ++++++++ .../parts_db/atlas-migration-data.json | 3990 ++++++++++++- 47 files changed, 14107 insertions(+), 815 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/common/src/main/java/org/apache/atlas/AtlasConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/AtlasConstants.java b/common/src/main/java/org/apache/atlas/AtlasConstants.java index 2b92e6e..2b9f411 100644 --- a/common/src/main/java/org/apache/atlas/AtlasConstants.java +++ b/common/src/main/java/org/apache/atlas/AtlasConstants.java @@ -35,6 +35,6 @@ public final class AtlasConstants { public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30; public static final String DEFAULT_TYPE_VERSION = "1.0"; - public static final String ATLAS_MIGRATION_MODE_FILENAME = "atlas.migration.mode.filename"; + public static final String ATLAS_MIGRATION_MODE_FILENAME = "atlas.migration.data.filename"; public static final String ATLAS_SERVICES_ENABLED = "atlas.services.enabled"; } http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java ---------------------------------------------------------------------- diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java index 607baf6..e5316d8 100644 --- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java +++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java @@ -30,6 +30,7 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.groovy.GroovyExpression; import org.apache.atlas.model.impexp.MigrationStatus; import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; /** * Represents a graph. @@ -320,7 +321,7 @@ public interface AtlasGraph<V, E> { */ boolean isMultiProperty(String name); - void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException; + void importLegacyGraphSON(AtlasTypeRegistry typeRegistry, InputStream fs) throws AtlasBaseException; MigrationStatus getMigrationStatus(); } http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java index c0b9c17..b4d6b33 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java @@ -34,10 +34,10 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery; import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.GremlinVersion; -import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager; import org.apache.atlas.repository.graphdb.janus.query.AtlasJanusGraphQuery; import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter; import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.configuration.Configuration; import org.apache.tinkerpop.gremlin.groovy.CompilerCustomizerProvider; import org.apache.tinkerpop.gremlin.groovy.DefaultImportCustomizerProvider; @@ -84,26 +84,20 @@ import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_P */ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusEdge> { - private final ConvertGremlinValueFunction GREMLIN_VALUE_CONVERSION_FUNCTION = new ConvertGremlinValueFunction(); - private static Configuration APPLICATION_PROPERTIES = null; - private final class ConvertGremlinValueFunction implements Function<Object, Object> { - @Override - public Object apply(Object input) { - return convertGremlinValue(input); - } - } - - private final Set<String> multiProperties; + private final ConvertGremlinValueFunction GREMLIN_VALUE_CONVERSION_FUNCTION = new ConvertGremlinValueFunction(); + private final Set<String> multiProperties = new HashSet<>(); public AtlasJanusGraph() { //determine multi-properties once at startup JanusGraphManagement mgmt = null; + try { mgmt = AtlasJanusGraphDatabase.getGraphInstance().openManagement(); + Iterable<PropertyKey> keys = mgmt.getRelationTypes(PropertyKey.class); - multiProperties = new HashSet<>(); + for (PropertyKey key : keys) { if (key.cardinality() != Cardinality.SINGLE) { multiProperties.add(key.name()); @@ -118,13 +112,13 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE @Override public AtlasEdge<AtlasJanusVertex, AtlasJanusEdge> addEdge(AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> outVertex, - AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> inVertex, - String edgeLabel) { - + AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> inVertex, + String edgeLabel) { try { - Vertex oV = outVertex.getV().getWrappedElement(); - Vertex iV = inVertex.getV().getWrappedElement(); - Edge edge = oV.addEdge(edgeLabel, iV); + Vertex oV = outVertex.getV().getWrappedElement(); + Vertex iV = inVertex.getV().getWrappedElement(); + Edge edge = oV.addEdge(edgeLabel, iV); + return GraphDbObjectFactory.createEdge(this, edge); } catch (SchemaViolationException e) { throw new AtlasSchemaViolationException(e); @@ -139,42 +133,43 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE @Override public AtlasEdge<AtlasJanusVertex, AtlasJanusEdge> getEdge(String edgeId) { Iterator<Edge> it = getGraph().edges(edgeId); - Edge e = getSingleElement(it, edgeId); + Edge e = getSingleElement(it, edgeId); + return GraphDbObjectFactory.createEdge(this, e); } @Override public void removeEdge(AtlasEdge<AtlasJanusVertex, AtlasJanusEdge> edge) { - Edge wrapped = edge.getE().getWrappedElement(); - wrapped.remove(); + wrapped.remove(); } @Override public void removeVertex(AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> vertex) { Vertex wrapped = vertex.getV().getWrappedElement(); + wrapped.remove(); } @Override public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> getEdges() { - Iterator<Edge> edges = getGraph().edges(); - return wrapEdges(edges); + return wrapEdges(edges); } @Override public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> getVertices() { - Iterator<Vertex> vertices = getGraph().vertices(); + return wrapVertices(vertices); } @Override public AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> addVertex() { Vertex result = getGraph().addVertex(); + return GraphDbObjectFactory.createVertex(this, result); } @@ -221,73 +216,32 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE return getIndexKeys(Vertex.class); } - private Set<String> getIndexKeys(Class<? extends Element> janusGraphElementClass) { - - JanusGraphManagement mgmt = getGraph().openManagement(); - Iterable<JanusGraphIndex> indices = mgmt.getGraphIndexes(janusGraphElementClass); - Set<String> result = new HashSet<String>(); - for (JanusGraphIndex index : indices) { - result.add(index.name()); - } - mgmt.commit(); - return result; - - } - @Override public AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> getVertex(String vertexId) { - Iterator<Vertex> it = getGraph().vertices(vertexId); - Vertex vertex = getSingleElement(it, vertexId); - return GraphDbObjectFactory.createVertex(this, vertex); - } + Iterator<Vertex> it = getGraph().vertices(vertexId); + Vertex vertex = getSingleElement(it, vertexId); - public static <T> T getSingleElement(Iterator<T> it, String id) { - if (!it.hasNext()) { - return null; - } - T element = it.next(); - if (it.hasNext()) { - throw new RuntimeException("Multiple items were found with the id " + id); - } - return element; + return GraphDbObjectFactory.createVertex(this, vertex); } @Override public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> getVertices(String key, Object value) { AtlasGraphQuery<AtlasJanusVertex, AtlasJanusEdge> query = query(); - query.has(key, value); - return query.vertices(); - } - private Object convertGremlinValue(Object rawValue) { + query.has(key, value); - if (rawValue instanceof Vertex) { - return GraphDbObjectFactory.createVertex(this, (Vertex) rawValue); - } else if (rawValue instanceof Edge) { - return GraphDbObjectFactory.createEdge(this, (Edge) rawValue); - } else if (rawValue instanceof Map) { - Map<String,Object> rowValue = (Map<String,Object>)rawValue; - return Maps.transformValues(rowValue, GREMLIN_VALUE_CONVERSION_FUNCTION); - } else if (rawValue instanceof ImmutablePath) { - ImmutablePath path = (ImmutablePath) rawValue; - return convertGremlinValue(path.objects()); - } - else if (rawValue instanceof List) { - return Lists.transform((List)rawValue, GREMLIN_VALUE_CONVERSION_FUNCTION); - } else if (rawValue instanceof Collection) { - throw new UnsupportedOperationException("Unhandled collection type: " + rawValue.getClass()); - } - return rawValue; + return query.vertices(); } @Override public GremlinVersion getSupportedGremlinVersion() { - return GremlinVersion.THREE; } + @Override public void clear() { JanusGraph graph = getGraph(); + if (graph.isOpen()) { // only a shut down graph can be cleared graph.close(); @@ -295,7 +249,8 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE try { JanusGraphFactory.drop(graph); - } catch (BackendException ignoreEx) {} + } catch (BackendException ignoreEx) { + } } private JanusGraph getGraph() { @@ -304,25 +259,27 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE @Override public void exportToGson(OutputStream os) throws IOException { - - GraphSONMapper mapper = getGraph().io(IoCore.graphson()).mapper().create(); + GraphSONMapper mapper = getGraph().io(IoCore.graphson()).mapper().create(); GraphSONWriter.Builder builder = GraphSONWriter.build(); + builder.mapper(mapper); + GraphSONWriter writer = builder.create(); + writer.writeGraph(os, getGraph()); } @Override public GremlinGroovyScriptEngine getGremlinScriptEngine() { - Set<String> extraImports = new HashSet<String>(); - extraImports.add(java.util.function.Function.class.getName()); - + Set<String> extraImports = new HashSet<String>(); Set<String> extraStaticImports = new HashSet<String>(); + + extraImports.add(java.util.function.Function.class.getName()); extraStaticImports.add(P.class.getName() + ".*"); extraStaticImports.add(__.class.getName() + ".*"); - CompilerCustomizerProvider provider = new DefaultImportCustomizerProvider(extraImports, extraStaticImports); - GremlinGroovyScriptEngine scriptEngine = new GremlinGroovyScriptEngine(provider); + CompilerCustomizerProvider provider = new DefaultImportCustomizerProvider(extraImports, extraStaticImports); + GremlinGroovyScriptEngine scriptEngine = new GremlinGroovyScriptEngine(provider); return scriptEngine; } @@ -341,38 +298,20 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE @Override public Object executeGremlinScript(String query, boolean isPath) throws AtlasBaseException { Object result = executeGremlinScript(query); - return convertGremlinValue(result); - } - - private Object executeGremlinScript(String gremlinQuery) throws AtlasBaseException { - GremlinGroovyScriptEngine scriptEngine = getGremlinScriptEngine(); - - try { - Bindings bindings = scriptEngine.createBindings(); - - bindings.put("graph", getGraph()); - bindings.put("g", getGraph().traversal()); - - Object result = scriptEngine.eval(gremlinQuery, bindings); - return result; - } catch (ScriptException e) { - throw new AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e, gremlinQuery); - } finally { - releaseGremlinScriptEngine(scriptEngine); - } + return convertGremlinValue(result); } @Override - public Object executeGremlinScript(ScriptEngine scriptEngine, - Map<? extends String, ? extends Object> userBindings, String query, boolean isPath) - throws ScriptException { + public Object executeGremlinScript(ScriptEngine scriptEngine, Map<? extends String, ? extends Object> userBindings, + String query, boolean isPath) throws ScriptException { Bindings bindings = scriptEngine.createBindings(); bindings.putAll(userBindings); bindings.put("g", getGraph().traversal()); Object result = scriptEngine.eval(query, bindings); + return convertGremlinValue(result); } @@ -402,39 +341,42 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE return expr; } - public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> wrapVertices(Iterator<? extends Vertex> it) { - Iterable<? extends Vertex> iterable = new IteratorToIterableAdapter<>(it); - return wrapVertices(iterable); + @Override + public boolean isMultiProperty(String propertyName) { + return multiProperties.contains(propertyName); } - public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> wrapVertices(Iterable<? extends Vertex> it) { + @Override + public void importLegacyGraphSON(AtlasTypeRegistry typeRegistry, InputStream fs) throws AtlasBaseException { + AtlasJanusGraphDatabase.loadLegacyGraphSON(typeRegistry, fs); + } - return StreamSupport.stream(it.spliterator(), false).map(input -> GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input)).collect(Collectors.toList()); + @Override + public MigrationStatus getMigrationStatus() { + return AtlasJanusGraphDatabase.getMigrationStatus(); + } + public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> wrapVertices(Iterable<? extends Vertex> it) { + return StreamSupport.stream(it.spliterator(), false).map(input -> GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input)).collect(Collectors.toList()); } public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> wrapEdges(Iterator<? extends Edge> it) { Iterable<? extends Edge> iterable = new IteratorToIterableAdapter<>(it); + return wrapEdges(iterable); } public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> wrapEdges(Iterable<? extends Edge> it) { - return StreamSupport.stream(it.spliterator(), false).map(input -> GraphDbObjectFactory.createEdge(AtlasJanusGraph.this, input)).collect(Collectors.toList()); - - } - - @Override - public boolean isMultiProperty(String propertyName) { - return multiProperties.contains(propertyName); } public void addMultiProperties(Set<String> names) { multiProperties.addAll(names); } - public String getIndexQueryPrefix() { - String ret; + + private String getIndexQueryPrefix() { + final String ret; initApplicationProperties(); @@ -447,6 +389,82 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE return ret; } + private Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> wrapVertices(Iterator<? extends Vertex> it) { + Iterable<? extends Vertex> iterable = new IteratorToIterableAdapter<>(it); + + return wrapVertices(iterable); + } + + private static <T> T getSingleElement(Iterator<T> it, String id) { + if (!it.hasNext()) { + return null; + } + + T element = it.next(); + + if (it.hasNext()) { + throw new RuntimeException("Multiple items were found with the id " + id); + } + + return element; + } + + private Object convertGremlinValue(Object rawValue) { + if (rawValue instanceof Vertex) { + return GraphDbObjectFactory.createVertex(this, (Vertex) rawValue); + } else if (rawValue instanceof Edge) { + return GraphDbObjectFactory.createEdge(this, (Edge) rawValue); + } else if (rawValue instanceof Map) { + Map<String,Object> rowValue = (Map<String,Object>)rawValue; + + return Maps.transformValues(rowValue, GREMLIN_VALUE_CONVERSION_FUNCTION); + } else if (rawValue instanceof ImmutablePath) { + ImmutablePath path = (ImmutablePath) rawValue; + + return convertGremlinValue(path.objects()); + } else if (rawValue instanceof List) { + return Lists.transform((List)rawValue, GREMLIN_VALUE_CONVERSION_FUNCTION); + } else if (rawValue instanceof Collection) { + throw new UnsupportedOperationException("Unhandled collection type: " + rawValue.getClass()); + } + + return rawValue; + } + + private Set<String> getIndexKeys(Class<? extends Element> janusGraphElementClass) { + JanusGraphManagement mgmt = getGraph().openManagement(); + Iterable<JanusGraphIndex> indices = mgmt.getGraphIndexes(janusGraphElementClass); + Set<String> result = new HashSet<String>(); + + for (JanusGraphIndex index : indices) { + result.add(index.name()); + } + + mgmt.commit(); + + return result; + + } + + private Object executeGremlinScript(String gremlinQuery) throws AtlasBaseException { + GremlinGroovyScriptEngine scriptEngine = getGremlinScriptEngine(); + + try { + Bindings bindings = scriptEngine.createBindings(); + + bindings.put("graph", getGraph()); + bindings.put("g", getGraph().traversal()); + + Object result = scriptEngine.eval(gremlinQuery, bindings); + + return result; + } catch (ScriptException e) { + throw new AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e, gremlinQuery); + } finally { + releaseGremlinScriptEngine(scriptEngine); + } + } + private void initApplicationProperties() { if (APPLICATION_PROPERTIES == null) { try { @@ -457,13 +475,11 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE } } - @Override - public void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException { - AtlasJanusGraphDatabase.loadLegacyGraphSON(relationshipCache, fs); - } - @Override - public MigrationStatus getMigrationStatus() { - return AtlasJanusGraphDatabase.getMigrationStatus(); + private final class ConvertGremlinValueFunction implements Function<Object, Object> { + @Override + public Object apply(Object input) { + return convertGremlinValue(input); + } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java index 16aecd5..c9d6067 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java @@ -26,10 +26,12 @@ import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.GraphDatabase; import org.apache.atlas.repository.graphdb.janus.migration.AtlasGraphSONReader; import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager; +import org.apache.atlas.repository.graphdb.janus.migration.ElementProcessors; import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer; import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer; import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer; import org.apache.atlas.runner.LocalSolrRunner; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.commons.configuration.Configuration; @@ -47,7 +49,6 @@ import java.io.InputStream; import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; -import java.util.Map; /** * Default implementation for Graph Provider that doles out JanusGraph. @@ -231,7 +232,7 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, return ret; } - public static void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException { + public static void loadLegacyGraphSON(AtlasTypeRegistry typeRegistry, InputStream fs) throws AtlasBaseException { AtlasPerfTracer perf = null; try { @@ -242,9 +243,10 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, } AtlasGraphSONReader legacyGraphSONReader = AtlasGraphSONReader.build(). - relationshipCache(relationshipCache). + relationshipCache(new ElementProcessors(typeRegistry)). schemaDB(getGraphInstance()). - bulkLoadingDB(getBulkLoadingGraphInstance()).create(); + bulkLoadingDB(getBulkLoadingGraphInstance()). + create(); legacyGraphSONReader.readGraph(fs); } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java index ae119b0..2d5bd8a 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java @@ -37,44 +37,46 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; public final class AtlasGraphSONReader { private static final Logger LOG = LoggerFactory.getLogger(AtlasGraphSONReader.class); - private final ObjectMapper mapper; - private final RelationshipTypeCache relationshipCache; - private final Graph graph; - private final Graph bulkLoadGraph; - private final int numWorkers; - private final int batchSize; - private final long suppliedStartIndex; - private final String[] propertiesToPostProcess; - private GraphSONUtility graphSONUtility; - private ReaderStatusManager readerStatusManager; - private AtomicLong counter; - - private AtlasGraphSONReader(ObjectMapper mapper, Map<String, String> relationshipLookup, Graph graph, - Graph bulkLoadGraph, String[] propertiesToPostProcess, int numWorkers, int batchSize, long suppliedStartIndex) { + private static String APPLICATION_PROPERTY_MIGRATION_START_INDEX = "atlas.migration.mode.start.index"; + private static String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers"; + private static String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size"; + + private final ObjectMapper mapper; + private final ElementProcessors relationshipCache; + private final Graph graph; + private final Graph bulkLoadGraph; + private final int numWorkers; + private final int batchSize; + private final long suppliedStartIndex; + private final GraphSONUtility graphSONUtility; + private ReaderStatusManager readerStatusManager; + private AtomicLong counter; + + private AtlasGraphSONReader(ObjectMapper mapper, ElementProcessors relationshipLookup, Graph graph, + Graph bulkLoadGraph, int numWorkers, int batchSize, long suppliedStartIndex) { this.mapper = mapper; - this.relationshipCache = new RelationshipTypeCache(relationshipLookup); + this.relationshipCache = relationshipLookup; this.graph = graph; this.bulkLoadGraph = bulkLoadGraph; this.numWorkers = numWorkers; this.batchSize = batchSize; this.suppliedStartIndex = suppliedStartIndex; - this.propertiesToPostProcess = propertiesToPostProcess; + this.graphSONUtility = new GraphSONUtility(relationshipCache); } public void readGraph(final InputStream inputStream) throws IOException { - counter = new AtomicLong(0); - graphSONUtility = new GraphSONUtility(relationshipCache); + counter = new AtomicLong(0); final long startIndex = initStatusManager(); final JsonFactory factory = mapper.getFactory(); LOG.info("AtlasGraphSONReader.readGraph: numWorkers: {}: batchSize: {}: startIndex: {}", numWorkers, batchSize, startIndex); + try (JsonParser parser = factory.createParser(inputStream)) { if (parser.nextToken() != JsonToken.START_OBJECT) { throw new IOException("Expected data to start with an Object"); @@ -88,6 +90,7 @@ public final class AtlasGraphSONReader { switch (fieldName) { case GraphSONTokensTP2.MODE: parser.nextToken(); + final String mode = parser.getText(); if (!mode.equals("EXTENDED")) { @@ -180,8 +183,7 @@ public final class AtlasGraphSONReader { LOG.info("postProcess: Starting... : counter at: {}", counter.get()); try { - PostProcessManager.WorkItemsManager wim = PostProcessManager.create(bulkLoadGraph, graphSONUtility, - propertiesToPostProcess, batchSize, numWorkers); + PostProcessManager.WorkItemsManager wim = PostProcessManager.create(bulkLoadGraph, relationshipCache.getPropertiesToPostProcess(), batchSize, numWorkers); GraphTraversal query = bulkLoadGraph.traversal().V(); while (query.hasNext()) { @@ -228,30 +230,32 @@ public final class AtlasGraphSONReader { } } - public static Builder build() throws AtlasException { + public static Builder build() { return new Builder(); } public final static class Builder { - private int batchSize = 500; - private Map<String, String> relationshipCache; - private Graph graph; - private Graph bulkLoadGraph; - private int numWorkers; - private long suppliedStartIndex; - private String[] propertiesToPostProcess; + private int batchSize = 500; + private ElementProcessors relationshipCache; + private Graph graph; + private Graph bulkLoadGraph; + private int numWorkers; + private long suppliedStartIndex; private Builder() { } - private void setDefaults() throws AtlasException { - this.startIndex(ApplicationProperties.get().getLong("atlas.migration.mode.start.index", 0L)) - .numWorkers(ApplicationProperties.get().getInt("atlas.migration.mode.workers", 4)) - .batchSize(ApplicationProperties.get().getInt("atlas.migration.mode.batch.size", 3000)) - .propertiesToPostProcess(getPropertiesToPostProcess("atlas.migration.mode.postprocess.properties")); + private void setDefaults() { + try { + this.startIndex(ApplicationProperties.get().getLong(APPLICATION_PROPERTY_MIGRATION_START_INDEX, 0L)) + .numWorkers(ApplicationProperties.get().getInt(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, 4)) + .batchSize(ApplicationProperties.get().getInt(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, 3000)); + } catch (AtlasException ex) { + LOG.error("setDefaults: failed!", ex); + } } - public AtlasGraphSONReader create() throws AtlasException { + public AtlasGraphSONReader create() { setDefaults(); if(bulkLoadGraph == null) { bulkLoadGraph = graph; @@ -261,10 +265,10 @@ public final class AtlasGraphSONReader { final GraphSONMapper mapper = builder.typeInfo(TypeInfo.NO_TYPES).create(); return new AtlasGraphSONReader(mapper.createMapper(), relationshipCache, graph, bulkLoadGraph, - propertiesToPostProcess, numWorkers, batchSize, suppliedStartIndex); + numWorkers, batchSize, suppliedStartIndex); } - public Builder relationshipCache(Map<String, String> relationshipCache) { + public Builder relationshipCache(ElementProcessors relationshipCache) { this.relationshipCache = relationshipCache; return this; @@ -305,27 +309,5 @@ public final class AtlasGraphSONReader { return this; } - - public Builder propertiesToPostProcess(String[] list) { - this.propertiesToPostProcess = list; - return this; - } - - private static String[] getPropertiesToPostProcess(String applicationPropertyKey) throws AtlasException { - final String HIVE_COLUMNS_PROPERTY = "hive_table.columns"; - final String HIVE_PARTITION_KEYS_PROPERTY = "hive_table.partitionKeys"; - final String PROCESS_INPUT_PROPERTY = "Process.inputs"; - final String PROCESS_OUTPUT_PROPERTY = "Process.outputs"; - final String USER_PROFILE_OUTPUT_PROPERTY = "__AtlasUserProfile.savedSearches"; - - String[] defaultProperties = new String[] { HIVE_COLUMNS_PROPERTY, HIVE_PARTITION_KEYS_PROPERTY, - PROCESS_INPUT_PROPERTY, PROCESS_OUTPUT_PROPERTY, - USER_PROFILE_OUTPUT_PROPERTY - }; - - String[] userDefinedList = ApplicationProperties.get().getStringArray(applicationPropertyKey); - - return (userDefinedList == null || userDefinedList.length == 0) ? defaultProperties : userDefinedList; - } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ElementProcessors.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ElementProcessors.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ElementProcessors.java new file mode 100644 index 0000000..f51080a --- /dev/null +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ElementProcessors.java @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.janus.migration; + +import org.apache.atlas.model.typedef.AtlasRelationshipDef; +import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.lang.StringUtils; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY; +import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY; +import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; + +public class ElementProcessors { + private static final Logger LOG = LoggerFactory.getLogger(ElementProcessors.class); + + public static final String PRIMITIVE_MAP_CATEGORY = "MAP_PRIMITIVE"; + public static final String NON_PRIMITIVE_MAP_CATEGORY = "MAP"; + public static final String NON_PRIMITIVE_ARRAY_CATEGORY = "ARRAY"; + private static final String[] NON_PRIMITIVE_KEYS = { ElementProcessors.NON_PRIMITIVE_ARRAY_CATEGORY }; + + private final Map<String, RelationshipCacheGenerator.TypeInfo> relationshipLookup; + private final Map<String, Map<String, List<String>>> postProcessMap; + + private final NonPrimitiveListPropertyProcessor nonPrimitiveListPropertyProcessor = new NonPrimitiveListPropertyProcessor(); + private final NonPrimitiveMapPropertyProcessor nonPrimitiveMapPropertyProcessor = new NonPrimitiveMapPropertyProcessor(); + private final PrimitiveMapPropertyProcessor primitiveMapPropertyProcessor = new PrimitiveMapPropertyProcessor(); + private final EdgeCollectionPropertyProcessor edgeCollectionPropertyProcessor = new EdgeCollectionPropertyProcessor(); + private final EdgeRelationshipPropertyProcessor edgeRelationshipPropertyProcessor = new EdgeRelationshipPropertyProcessor(); + + public ElementProcessors(AtlasTypeRegistry typeRegistry) { + this(RelationshipCacheGenerator.get(typeRegistry), TypesWithCollectionsFinder.getVertexPropertiesForCollectionAttributes(typeRegistry)); + } + + ElementProcessors(Map<String, RelationshipCacheGenerator.TypeInfo> lookup, Map<String, Map<String, List<String>>> postProcessMap) { + this.relationshipLookup = lookup; + this.postProcessMap = postProcessMap; + } + + public static String[] getNonPrimitiveCategoryKeys() { + return NON_PRIMITIVE_KEYS; + } + + public Map<String,Map<String, List<String>>> getPropertiesToPostProcess() { + return postProcessMap; + } + + public String addIndexKeysForCollections(Vertex out, Object edgeId, String label, Map<String, Object> edgeProperties) { + return edgeCollectionPropertyProcessor.update(out, edgeId, label, edgeProperties); + } + + public void processCollections(String typeNameKey, Map<String,Object> vertexProperties) { + if (!vertexProperties.containsKey(typeNameKey)) { + return; + } + + String typeName = (String) vertexProperties.get(typeNameKey); + + if (!postProcessMap.containsKey(typeName)) { + return; + } + + primitiveMapPropertyProcessor.update(typeName, vertexProperties); + nonPrimitiveMapPropertyProcessor.update(typeName, vertexProperties); + nonPrimitiveListPropertyProcessor.update(typeName, vertexProperties); + } + + public String updateEdge(Vertex in, Vertex out, Object edgeId, String label, Map<String,Object> props) { + return edgeRelationshipPropertyProcessor.update(in, out, edgeId, label, props); + } + + private class NonPrimitiveMapPropertyProcessor { + final String category = NON_PRIMITIVE_MAP_CATEGORY; + + public void update(String typeName, Map<String,Object> vertexProperties) { + if (!postProcessMap.containsKey(typeName)) { + return; + } + + if (!postProcessMap.get(typeName).containsKey(category)) { + return; + } + + List<String> propertyTypeList = postProcessMap.get(typeName).get(category); + + for (String property : propertyTypeList) { + if (!vertexProperties.containsKey(property)) { + continue; + } + + List<Object> list = (List<Object>) vertexProperties.get(property); + + if (list == null) { + continue; + } + + for (Object listEntry : list) { + String key = (String) listEntry; + String valueKey = getMapKey(property, key); + + if (vertexProperties.containsKey(valueKey)) { + vertexProperties.remove(valueKey); + } + } + + vertexProperties.remove(property); + } + } + + private String getMapKey(String property, String key) { + return String.format("%s.%s", property, key); + } + } + + private class PrimitiveMapPropertyProcessor { + final String category = PRIMITIVE_MAP_CATEGORY; + + public void update(String typeName, Map<String, Object> vertexProperties) { + if (!postProcessMap.get(typeName).containsKey(category)) { + return; + } + + List<String> propertyTypeList = postProcessMap.get(typeName).get(category); + + for (String property : propertyTypeList) { + if (!vertexProperties.containsKey(property)) { + continue; + } + + List<Object> list = (List<Object>) vertexProperties.get(property); + + if (list == null) { + continue; + } + + Map<String, Object> map = getAggregatedMap(vertexProperties, property, list); + + vertexProperties.put(property, map); + } + } + + private Map<String, Object> getAggregatedMap(Map<String, Object> vertexProperties, String property, List<Object> list) { + Map<String, Object> map = new HashMap<>(); + + for (Object listEntry : list) { + String key = (String) listEntry; + String valueKey = getMapKey(property, key); + + if (vertexProperties.containsKey(valueKey)) { + Object value = getValueFromProperties(valueKey, vertexProperties); + + vertexProperties.remove(valueKey); + + map.put(key, value); + } + } + + return map; + } + + private String getMapKey(String property, String key) { + return String.format("%s.%s", property, key); + } + + private Object getValueFromProperties(String key, Map<String, Object> vertexProperties) { + if (!vertexProperties.containsKey(key)) { + return null; + } + + return vertexProperties.get(key); + } + } + + private class NonPrimitiveListPropertyProcessor { + private final String category = NON_PRIMITIVE_ARRAY_CATEGORY; + + private void update(String typeName, Map<String,Object> props) { + if(!postProcessMap.get(typeName).containsKey(category)) { + return; + } + + List<String> propertyTypeList = postProcessMap.get(typeName).get(category); + for (String property : propertyTypeList) { + if(!props.containsKey(property)) { + continue; + } + + Map<String, String> listMap = getUpdatedEdgeList(props.get(property)); + + if(listMap == null) { + continue; + } + + props.put(property, listMap); + } + } + + private Map<String, String> getUpdatedEdgeList(Object o) { + Map<String, String> listMap = new HashMap<>(); + + if(!(o instanceof List)) { + return null; + } + + List list = (List) o; + + for (int i = 0; i < list.size(); i++) { + listMap.put((String) list.get(i), Integer.toString(i)); + } + + return listMap; + } + } + + private class EdgeRelationshipPropertyProcessor { + public String update(Vertex in, Vertex out, Object edgeId, String label, Map<String, Object> props) { + if(addRelationshipTypeForClassification(in, out, label, props)) { + label = Constants.CLASSIFICATION_LABEL; + } else { + addRelationshipTypeName(label, props); + + label = addIndexKeysForCollections(out, edgeId, label, props); + } + + addMandatoryRelationshipProperties(label, props); + + return label; + } + + private String getRelationshipTypeName(String label) { + return relationshipLookup.containsKey(label) ? relationshipLookup.get(label).getTypeName() : ""; + } + + private PropagateTags getDefaultPropagateValue(String label) { + return relationshipLookup.containsKey(label) ? + relationshipLookup.get(label).getPropagateTags() : + AtlasRelationshipDef.PropagateTags.NONE; + } + + private boolean addRelationshipTypeForClassification(Vertex in, Vertex out, String label, Map<String, Object> props) { + if (in.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) { + String inTypeName = (String) in.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value(); + + if (StringUtils.isNotEmpty(inTypeName)) { + if (inTypeName.equals(label)) { + props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, inTypeName); + + addEntityGuidToTrait(in, out); + + return true; + } + } else { + LOG.info("Could not find typeName for trait: {}", label); + } + } + + return false; + } + + private void addEntityGuidToTrait(Vertex in, Vertex out) { + String entityGuid = ""; + + if (out.property(Constants.GUID_PROPERTY_KEY).isPresent()) { + entityGuid = (String) out.property(Constants.GUID_PROPERTY_KEY).value(); + } + + if(StringUtils.isNotEmpty(entityGuid)) { + in.property(CLASSIFICATION_ENTITY_GUID, entityGuid); + in.property(CLASSIFICATION_VERTEX_PROPAGATE_KEY, false); + } + } + + private void addRelationshipTypeName(String edgeLabel, Map<String, Object> props) { + String typeName = getRelationshipTypeName(edgeLabel); + + if (StringUtils.isNotEmpty(typeName)) { + props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Could not find relationship type for: {}", edgeLabel); + } + } + } + + private void addMandatoryRelationshipProperties(String label, Map<String, Object> props) { + props.put(Constants.RELATIONSHIP_GUID_PROPERTY_KEY, UUID.randomUUID().toString()); + + props.put(RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, String.valueOf(getDefaultPropagateValue(label))); + props.put(STATE_PROPERTY_KEY, "ACTIVE"); + } + } + + private class EdgeCollectionPropertyProcessor { + private static final int LABEL_INDEX = 0; + private static final int KEY_INDEX = 1; + + public String update(Vertex out, Object edgeId, String label, Map<String, Object> edgeProperties) { + String[] labelKeyPair = getNonPrimitiveArrayFromLabel(out, (String) edgeId, label); + + if (labelKeyPair != null) { + edgeProperties.put(ATTRIBUTE_INDEX_PROPERTY_KEY, Integer.valueOf(labelKeyPair[KEY_INDEX])); + + return label; + } + + labelKeyPair = getNonPrimitiveMapKeyFromLabel(out, label); + + if (labelKeyPair != null) { + label = labelKeyPair[LABEL_INDEX]; + + edgeProperties.put(ATTRIBUTE_KEY_PROPERTY_KEY, labelKeyPair[KEY_INDEX]); + } + + return label; + } + + private String[] getNonPrimitiveArrayFromLabel(Vertex v, String edgeId, String label) { + if (!v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) { + return null; + } + + String typeName = (String) v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value(); + String propertyName = StringUtils.remove(label, Constants.INTERNAL_PROPERTY_KEY_PREFIX); + + if(!containsNonPrimitiveCollectionProperty(typeName, propertyName, NON_PRIMITIVE_ARRAY_CATEGORY)) { + return null; + } + + Map<String, String> edgeIdIndexList = (Map<String, String>) v.property(propertyName).value(); + + if (edgeIdIndexList.containsKey(edgeId)) { + return getLabelKeyPair(label, edgeIdIndexList.get(edgeId)); + } + + return null; + } + + // legacy edge label is in format: __<type name>.<key> + // label: in new format which is type name + // this method extracts: + // key: what remains of the legacy label string when '__' and type name are removed + private String[] getNonPrimitiveMapKeyFromLabel(Vertex v, String label) { + if (!v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) { + return null; + } + + String typeName = (String) v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value(); + + if(!postProcessMap.containsKey(typeName)) { + return null; + } + + if(!postProcessMap.get(typeName).containsKey(NON_PRIMITIVE_MAP_CATEGORY)) { + return null; + } + + String propertyName = StringUtils.remove(label, Constants.INTERNAL_PROPERTY_KEY_PREFIX); + List<String> properties = postProcessMap.get(typeName).get(NON_PRIMITIVE_MAP_CATEGORY); + + for (String p : properties) { + if (propertyName.startsWith(p)) { + return getLabelKeyPair( + String.format("%s%s", Constants.INTERNAL_PROPERTY_KEY_PREFIX, p), + StringUtils.remove(propertyName, p).substring(1).trim()); + } + } + + return null; + } + + private boolean containsNonPrimitiveCollectionProperty(String typeName, String propertyName, String categoryType) { + if (!postProcessMap.containsKey(typeName)) { + return false; + } + + if (!postProcessMap.get(typeName).containsKey(categoryType)) { + return false; + } + + List<String> properties = postProcessMap.get(typeName).get(categoryType); + + for (String p : properties) { + if (p.equals(propertyName)) { + return true; + } + } + + return false; + } + + private String[] getLabelKeyPair(String label, String value) { + return new String[] { label, value }; + } + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java index ec320b0..f1bbfcf 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java @@ -18,10 +18,9 @@ package org.apache.atlas.repository.graphdb.janus.migration; -import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.repository.Constants; -import org.apache.atlas.type.AtlasBuiltInTypes; -import org.apache.commons.lang.StringUtils; +import org.apache.atlas.type.AtlasBuiltInTypes.AtlasBigDecimalType; +import org.apache.atlas.type.AtlasBuiltInTypes.AtlasBigIntegerType; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Graph.Features.EdgeFeatures; @@ -34,21 +33,17 @@ import org.slf4j.LoggerFactory; import java.util.*; -import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_STATE_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID; -import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY; - class GraphSONUtility { private static final Logger LOG = LoggerFactory.getLogger(GraphSONUtility.class); - private static final String EMPTY_STRING = ""; + private static final String EMPTY_STRING = ""; + private static final AtlasBigIntegerType bigIntegerType = new AtlasBigIntegerType(); + private static final AtlasBigDecimalType bigDecimalType = new AtlasBigDecimalType(); - private final RelationshipTypeCache relationshipTypeCache; - private static AtlasBuiltInTypes.AtlasBigIntegerType bigIntegerType = new AtlasBuiltInTypes.AtlasBigIntegerType(); - private static AtlasBuiltInTypes.AtlasBigDecimalType bigDecimalType = new AtlasBuiltInTypes.AtlasBigDecimalType(); + private final ElementProcessors elementProcessors; - public GraphSONUtility(final RelationshipTypeCache relationshipTypeCache) { - this.relationshipTypeCache = relationshipTypeCache; + public GraphSONUtility(final ElementProcessors elementProcessors) { + this.elementProcessors = elementProcessors; } public Map<String, Object> vertexFromJson(Graph g, final JsonNode json) { @@ -64,6 +59,7 @@ class GraphSONUtility { Vertex vertex = vertexFeatures.willAllowId(vertexId) ? g.addVertex(T.id, vertexId) : g.addVertex(); props.put(Constants.VERTEX_ID_IN_IMPORT_KEY, vertexId); + elementProcessors.processCollections(Constants.ENTITY_TYPE_PROPERTY_KEY, props); for (Map.Entry<String, Object> entry : props.entrySet()) { try { @@ -107,17 +103,11 @@ class GraphSONUtility { props.put(Constants.EDGE_ID_IN_IMPORT_KEY, edgeId.toString()); - if(addRelationshipTypeForClassification(in, out, label, props)) { - label = Constants.CLASSIFICATION_LABEL; - } else { - addRelationshipTypeName(label, props); - } + label = elementProcessors.updateEdge(in, out, edgeId, label, props); EdgeFeatures edgeFeatures = g.features().edge(); final Edge edge = edgeFeatures.willAllowId(edgeId) ? out.addEdge(label, in, T.id, edgeId) : out.addEdge(label, in); - addMandatoryRelationshipProperties(props); - for (Map.Entry<String, Object> entry : props.entrySet()) { try { edge.property(entry.getKey(), entry.getValue()); @@ -153,82 +143,6 @@ class GraphSONUtility { return cache.getMappedVertex(gr, inVId); } - private boolean addRelationshipTypeForClassification(Vertex in, Vertex out, String label, Map<String, Object> props) { - if (in.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) { - String inTypeName = (String) in.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value(); - - if (inTypeName.equals(label)) { - if (StringUtils.isNotEmpty(inTypeName)) { - props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, inTypeName); - - addEntityGuidToTrait(in, out); - - return true; - } else { - LOG.info("Could not find typeName for trait: {}", label); - } - } - } - - return false; - } - - private void addEntityGuidToTrait(Vertex in, Vertex out) { - String entityGuid = ""; - if (out.property(Constants.GUID_PROPERTY_KEY).isPresent()) { - entityGuid = (String) out.property(Constants.GUID_PROPERTY_KEY).value(); - } - - if(StringUtils.isNotEmpty(entityGuid)) { - in.property(CLASSIFICATION_ENTITY_GUID, entityGuid); - } - } - - private void addRelationshipTypeName(String edgeLabel, Map<String, Object> props) { - String typeName = relationshipTypeCache.get(edgeLabel); - - if (StringUtils.isNotEmpty(typeName)) { - props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Could not find relationship type for: {}", edgeLabel); - } - } - } - - private void addMandatoryRelationshipProperties(Map<String, Object> props) { - props.put(Constants.RELATIONSHIP_GUID_PROPERTY_KEY, UUID.randomUUID().toString()); - props.put(RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, "NONE"); - props.put(CLASSIFICATION_EDGE_STATE_PROPERTY_KEY, "ACTIVE"); - } - - public void replaceReferencedEdgeIdForList(Graph g, MappedElementCache cache, Vertex v, String propertyName) { - try { - if (v.property(Constants.TYPENAME_PROPERTY_KEY).isPresent() || !v.property(propertyName).isPresent()) { - return; - } - - List list = (List) v.property(propertyName).value(); - for (int i = 0; i < list.size(); i++) { - String id = list.get(i).toString(); - Object newId = cache.getMappedEdge(g, id); - - if (newId == null) { - continue; - } - - list.set(i, newId.toString()); - } - - v.property(propertyName, list); - } catch (IllegalArgumentException ex) { - if (LOG.isDebugEnabled()) { - LOG.debug("processItem: IllegalArgumentException: v[{}] error!", v.id(), ex); - } - } - } - - @VisibleForTesting static Map<String, Object> readProperties(final JsonNode node) { final Map<String, Object> map = new HashMap<>(); final Iterator<Map.Entry<String, JsonNode>> iterator = node.fields(); @@ -303,7 +217,6 @@ class GraphSONUtility { return array; } - @VisibleForTesting static Object getTypedValueFromJsonNode(final JsonNode node) { Object theValue = null; http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java index e4b6ee2..e2f418e 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java @@ -101,6 +101,7 @@ public class JsonNodeProcessManager { private void commitRegular() { commit(graph, nodes.size()); + cache.clearAll(); } private void commit(Graph g, int size) { http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java index cca72ad..817e8dc 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java @@ -18,9 +18,7 @@ package org.apache.atlas.repository.graphdb.janus.migration; -import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.utils.LruCache; -import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.slf4j.Logger; @@ -28,18 +26,13 @@ import org.slf4j.LoggerFactory; import java.util.Map; -import static org.apache.atlas.repository.Constants.EDGE_ID_IN_IMPORT_KEY; import static org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY; public class MappedElementCache { private static final Logger LOG = LoggerFactory.getLogger(MappedElementCache.class); - @VisibleForTesting final Map<Object, Vertex> lruVertexCache = new LruCache<>(500, 100000); - @VisibleForTesting - final Map<String, String> lruEdgeCache = new LruCache<>(500, 100000); - public Vertex getMappedVertex(Graph gr, Object key) { try { Vertex ret = lruVertexCache.get(key); @@ -62,32 +55,6 @@ public class MappedElementCache { } } - public String getMappedEdge(Graph gr, String key) { - try { - String ret = lruEdgeCache.get(key); - - if (ret == null) { - synchronized (lruEdgeCache) { - ret = lruEdgeCache.get(key); - - if (ret == null) { - Edge e = fetchEdge(gr, key); - - ret = e.id().toString(); - - lruEdgeCache.put(key, ret); - } - } - } - - return ret; - } catch (Exception ex) { - LOG.error("getMappedEdge: {}", key, ex); - return null; - } - } - - @VisibleForTesting Vertex fetchVertex(Graph gr, Object key) { try { return gr.traversal().V().has(VERTEX_ID_IN_IMPORT_KEY, key).next(); @@ -97,18 +64,7 @@ public class MappedElementCache { } } - @VisibleForTesting - Edge fetchEdge(Graph gr, String key) { - try { - return gr.traversal().E().has(EDGE_ID_IN_IMPORT_KEY, key).next(); - } catch (Exception ex) { - LOG.error("fetchEdge: fetchFromDB failed: {}", key); - return null; - } - } - public void clearAll() { lruVertexCache.clear(); - lruEdgeCache.clear(); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java index d0a65f7..7046f8c 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java @@ -21,36 +21,42 @@ package org.apache.atlas.repository.graphdb.janus.migration; import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemBuilder; import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer; import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemManager; +import org.apache.atlas.repository.graphdb.janus.migration.postProcess.PostProcessListProperty; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.VertexProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; +import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.TYPENAME_PROPERTY_KEY; + public class PostProcessManager { static class Consumer extends WorkItemConsumer<Object> { private static final Logger LOG = LoggerFactory.getLogger(Consumer.class); - private final Graph bulkLoadGraph; - private final GraphSONUtility utility; - private final String[] properties; - private final MappedElementCache cache; - private final int batchSize; - private long counter; - private long batchCounter; + private final Graph bulkLoadGraph; + private final Map<String, Map<String, List<String>>> typePropertiesMap; + private final int batchSize; + private long counter; + private long batchCounter; + private final PostProcessListProperty processor; + private final String[] nonPrimitiveCategoryKeys; - public Consumer(BlockingQueue<Object> queue, Graph bulkLoadGraph, GraphSONUtility utility, - String[] properties, MappedElementCache cache, int batchSize) { + public Consumer(BlockingQueue<Object> queue, Graph bulkLoadGraph, Map<String, Map<String, List<String>>> typePropertiesMap, int batchSize) { super(queue); - this.bulkLoadGraph = bulkLoadGraph; - this.utility = utility; - this.properties = properties; - this.cache = cache; - this.batchSize = batchSize; - this.counter = 0; - this.batchCounter = 0; + this.bulkLoadGraph = bulkLoadGraph; + this.typePropertiesMap = typePropertiesMap; + this.batchSize = batchSize; + this.counter = 0; + this.batchCounter = 0; + this.processor = new PostProcessListProperty(); + this.nonPrimitiveCategoryKeys = ElementProcessors.getNonPrimitiveCategoryKeys(); } @Override @@ -59,23 +65,44 @@ public class PostProcessManager { counter++; try { - Vertex v = bulkLoadGraph.traversal().V(vertexId).next(); - - for (String p : properties) { - utility.replaceReferencedEdgeIdForList(bulkLoadGraph, cache, v, p); + Vertex vertex = bulkLoadGraph.traversal().V(vertexId).next(); + boolean isTypeVertex = vertex.property(TYPENAME_PROPERTY_KEY).isPresent(); + VertexProperty typeNameProperty = vertex.property(ENTITY_TYPE_PROPERTY_KEY); + + if (!isTypeVertex && typeNameProperty.isPresent()) { + String typeName = (String) typeNameProperty.value(); + if (!typePropertiesMap.containsKey(typeName)) { + return; + } + + Map<String, List<String>> collectionTypeProperties = typePropertiesMap.get(typeName); + for (String key : nonPrimitiveCategoryKeys) { + if (!collectionTypeProperties.containsKey(key)) { + continue; + } + + for(String propertyName : collectionTypeProperties.get(key)) { + processor.process(vertex, typeName, propertyName); + } + } } - if (batchCounter >= batchSize) { - LOG.info("[{}]: batch: {}: commit", counter, batchCounter); - commit(); - batchCounter = 0; - } - } - catch (Exception ex) { + commitBatch(); + } catch (Exception ex) { LOG.error("processItem: v[{}] error!", vertexId, ex); } } + private void commitBatch() { + if (batchCounter >= batchSize) { + LOG.info("[{}]: batch: {}: commit", counter, batchCounter); + + commit(); + + batchCounter = 0; + } + } + @Override protected void doCommit() { bulkLoadGraph.tx().commit(); @@ -83,23 +110,19 @@ public class PostProcessManager { } private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Object> { - private final Graph bulkLoadGraph; - private final GraphSONUtility utility; - private final int batchSize; - private final MappedElementCache cache; - private final String[] vertexPropertiesToPostProcess; + private final Graph bulkLoadGraph; + private final int batchSize; + private final Map<String, Map<String, List<String>>> vertexPropertiesToPostProcess; - public ConsumerBuilder(Graph bulkLoadGraph, GraphSONUtility utility, String[] propertiesToPostProcess, int batchSize) { + public ConsumerBuilder(Graph bulkLoadGraph, Map<String, Map<String, List<String>>> propertiesToPostProcess, int batchSize) { this.bulkLoadGraph = bulkLoadGraph; - this.utility = utility; this.batchSize = batchSize; - this.cache = new MappedElementCache(); this.vertexPropertiesToPostProcess = propertiesToPostProcess; } @Override public Consumer build(BlockingQueue<Object> queue) { - return new Consumer(queue, bulkLoadGraph, utility, vertexPropertiesToPostProcess, cache, batchSize); + return new Consumer(queue, bulkLoadGraph, vertexPropertiesToPostProcess, batchSize); } } @@ -109,8 +132,9 @@ public class PostProcessManager { } } - public static WorkItemsManager create(Graph bGraph, GraphSONUtility utility, String[] propertiesToPostProcess, int batchSize, int numWorkers) { - ConsumerBuilder cb = new ConsumerBuilder(bGraph, utility, propertiesToPostProcess, batchSize); + public static WorkItemsManager create(Graph bGraph, Map<String, Map<String, List<String>>> propertiesToPostProcess, + int batchSize, int numWorkers) { + ConsumerBuilder cb = new ConsumerBuilder(bGraph, propertiesToPostProcess, batchSize); return new WorkItemsManager(cb, batchSize, numWorkers); } http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipCacheGenerator.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipCacheGenerator.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipCacheGenerator.java new file mode 100644 index 0000000..c4ee179 --- /dev/null +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipCacheGenerator.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.janus.migration; + +import org.apache.atlas.model.typedef.AtlasRelationshipDef; +import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags; +import org.apache.atlas.model.typedef.AtlasRelationshipEndDef; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.type.AtlasRelationshipType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.v1.typesystem.types.utils.TypesUtil; +import org.apache.commons.lang.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +public class RelationshipCacheGenerator { + + public static class TypeInfo extends TypesUtil.Pair<String, PropagateTags> { + + public TypeInfo(String typeName, PropagateTags propagateTags) { + super(typeName, propagateTags); + } + + public String getTypeName() { + return left; + } + + public PropagateTags getPropagateTags() { + return right; + } + } + + + public static Map<String, TypeInfo> get(AtlasTypeRegistry typeRegistry) { + Map<String, TypeInfo> ret = new HashMap<>(); + + for (AtlasRelationshipType relType : typeRegistry.getAllRelationshipTypes()) { + AtlasRelationshipDef relDef = relType.getRelationshipDef(); + String relTypeName = relType.getTypeName(); + + add(ret, getKey(relDef.getEndDef1()), relTypeName, relDef.getPropagateTags()); + add(ret, getKey(relDef.getEndDef2()), relTypeName, getEnd2PropagateTag(relDef.getPropagateTags())); + } + + return ret; + } + + private static String getKey(AtlasRelationshipEndDef endDef) { + return getKey(endDef.getIsLegacyAttribute(), endDef.getType(), endDef.getName()); + } + + private static String getKey(String lhs, String rhs) { + return String.format("%s%s.%s", Constants.INTERNAL_PROPERTY_KEY_PREFIX, lhs, rhs); + } + + private static String getKey(boolean isLegacy, String entityTypeName, String relEndName) { + if (!isLegacy) { + return ""; + } + + return getKey(entityTypeName, relEndName); + } + + private static void add(Map<String, TypeInfo> map, String key, String relationTypeName, PropagateTags propagateTags) { + if (StringUtils.isEmpty(key) || map.containsKey(key)) { + return; + } + + map.put(key, new TypeInfo(relationTypeName, propagateTags)); + } + + private static PropagateTags getEnd2PropagateTag(PropagateTags end1PropagateTags) { + if (end1PropagateTags == PropagateTags.ONE_TO_TWO) { + return PropagateTags.TWO_TO_ONE; + } else if (end1PropagateTags == PropagateTags.TWO_TO_ONE) { + return PropagateTags.ONE_TO_TWO; + } else { + return end1PropagateTags; + } + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java deleted file mode 100644 index e4e8264..0000000 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.graphdb.janus.migration; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public class RelationshipTypeCache { - private static final Logger LOG = LoggerFactory.getLogger(RelationshipTypeCache.class); - private final Map<String, String> relationshipLookup; - - public RelationshipTypeCache(Map<String, String> lookup) { - relationshipLookup = lookup; - } - - public String get(String label) { - return relationshipLookup.get(label); - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/TypesWithCollectionsFinder.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/TypesWithCollectionsFinder.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/TypesWithCollectionsFinder.java new file mode 100644 index 0000000..55aa9c9 --- /dev/null +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/TypesWithCollectionsFinder.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.janus.migration; + +import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.type.AtlasArrayType; +import org.apache.atlas.type.AtlasMapType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.atlas.model.TypeCategory.*; + +public class TypesWithCollectionsFinder { + private static final Logger LOG = LoggerFactory.getLogger(TypesWithCollectionsFinder.class); + + static final EnumSet<TypeCategory> nonPrimitives = EnumSet.of(ENTITY, STRUCT, OBJECT_ID_TYPE); + + public static Map<String, Map<String, List<String>>> getVertexPropertiesForCollectionAttributes(AtlasTypeRegistry typeRegistry) { + Map<String, Map<String, List<String>>> ret = new HashMap<>(); + + addVertexPropertiesForCollectionAttributes(typeRegistry.getAllEntityTypes(), ret); + addVertexPropertiesForCollectionAttributes(typeRegistry.getAllStructTypes(), ret); + + displayInfo("types with properties: ", ret); + + return ret; + } + + private static void addVertexPropertiesForCollectionAttributes(Collection<? extends AtlasStructType> types, Map<String, Map<String, List<String>>> typeAttrMap) { + for (AtlasStructType type : types) { + Map<String, List<String>> collectionProperties = getVertexPropertiesForCollectionAttributes(type); + + if(collectionProperties != null && collectionProperties.size() > 0) { + typeAttrMap.put(type.getTypeName(), collectionProperties); + } + } + } + + static Map<String, List<String>> getVertexPropertiesForCollectionAttributes(AtlasStructType type) { + try { + Map<String, List<String>> collectionProperties = new HashMap<>(); + + for (AtlasAttribute attr : type.getAllAttributes().values()) { + addIfCollectionAttribute(attr, collectionProperties); + } + + return collectionProperties; + } catch (Exception e) { + LOG.error("addVertexPropertiesForCollectionAttributes", e); + } + + return null; + } + + private static void addIfCollectionAttribute(AtlasAttribute attr, Map<String, List<String>> collectionProperties) { + AtlasType attrType = attr.getAttributeType(); + TypeCategory attrTypeCategory = attrType.getTypeCategory(); + + switch (attrTypeCategory) { + case ARRAY: { + TypeCategory arrayElementType = ((AtlasArrayType) attrType).getElementType().getTypeCategory(); + + if (nonPrimitives.contains(arrayElementType)) { + addVertexProperty(attrTypeCategory.toString(), attr.getVertexPropertyName(), collectionProperties); + } + } + break; + + case MAP: { + TypeCategory mapValueType = ((AtlasMapType) attrType).getValueType().getTypeCategory(); + + if (nonPrimitives.contains(mapValueType)) { + addVertexProperty(attrTypeCategory.toString(), attr.getVertexPropertyName(), collectionProperties); + } else { + addVertexProperty(attrTypeCategory.toString() + "_PRIMITIVE", attr.getVertexPropertyName(), collectionProperties); + } + } + break; + } + } + + private static void addVertexProperty(String collectionType, String propertyName, Map<String, List<String>> collectionProperties) { + if(!collectionProperties.containsKey(collectionType)) { + collectionProperties.put(collectionType, new ArrayList<>()); + } + + collectionProperties.get(collectionType).add(propertyName); + } + + static void displayInfo(String message, Map<String, Map<String, List<String>>> map) { + LOG.info(message); + for (Map.Entry<String, Map<String, List<String>>> e : map.entrySet()) { + LOG.info(" type: {} : {}", e.getKey(), e.getValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/postProcess/PostProcessListProperty.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/postProcess/PostProcessListProperty.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/postProcess/PostProcessListProperty.java new file mode 100644 index 0000000..8270ae9 --- /dev/null +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/postProcess/PostProcessListProperty.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.janus.migration.postProcess; + +import org.apache.atlas.repository.Constants; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PostProcessListProperty { + private static final Logger LOG = LoggerFactory.getLogger(PostProcessListProperty.class); + + public void process(Vertex v, String typeName, String propertyName) { + try { + if (doesNotHaveProperty(v, typeName) || !hasProperty(v, propertyName)) { + return; + } + + removeProperty(v, propertyName); + } catch (IllegalArgumentException ex) { + LOG.error("process: IllegalArgumentException: v[{}] error!", v.id(), ex); + } + } + + protected void removeProperty(Vertex v, String propertyName) { + v.property(propertyName).remove(); + } + + protected boolean doesNotHaveProperty(Vertex v, String typeName) { + return v.property(Constants.TYPENAME_PROPERTY_KEY).isPresent() || !isInstanceVertexOfType(v, typeName); + } + + private boolean hasProperty(Vertex v, String propertyName) { + try { + return v.property(propertyName).isPresent(); + } catch(Exception ex) { + // ... + } + + return false; + } + + private boolean isInstanceVertexOfType(Vertex v, String typeName) { + if(v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) { + String s = (String) v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value(); + + return s.equals(typeName); + } + + return false; + } +}