ATLAS-2637: migration-import updates for changes in collection attribute storage

Signed-off-by: Madhan Neethiraj <mad...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/47ec9f7a
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/47ec9f7a
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/47ec9f7a

Branch: refs/heads/master
Commit: 47ec9f7a8cc02346ac167849374bf088f55be74a
Parents: a0269b9
Author: Ashutosh Mestry <ames...@hortonworks.com>
Authored: Tue May 22 18:24:57 2018 -0700
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Tue May 22 23:04:32 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/atlas/AtlasConstants.java   |    2 +-
 .../atlas/repository/graphdb/AtlasGraph.java    |    3 +-
 .../graphdb/janus/AtlasJanusGraph.java          |  262 +-
 .../graphdb/janus/AtlasJanusGraphDatabase.java  |   10 +-
 .../janus/migration/AtlasGraphSONReader.java    |  100 +-
 .../janus/migration/ElementProcessors.java      |  423 ++
 .../janus/migration/GraphSONUtility.java        |  107 +-
 .../janus/migration/JsonNodeProcessManager.java |    1 +
 .../janus/migration/MappedElementCache.java     |   44 -
 .../janus/migration/PostProcessManager.java     |  100 +-
 .../migration/RelationshipCacheGenerator.java   |   98 +
 .../janus/migration/RelationshipTypeCache.java  |   37 -
 .../migration/TypesWithCollectionsFinder.java   |  123 +
 .../postProcess/PostProcessListProperty.java    |   68 +
 .../graphdb/janus/migration/BaseUtils.java      |  107 +-
 .../GraphSONUtilityPostProcessTest.java         |   95 -
 .../janus/migration/GraphSONUtilityTest.java    |  266 +-
 .../janus/migration/JsonNodeParsersTest.java    |   16 +-
 .../janus/migration/MappedElementCacheTest.java |   26 +-
 .../migration/PostProcessListPropertyTest.java  |  142 +
 .../janus/src/test/resources/col-2-legacy.json  |   73 +
 .../janus/src/test/resources/col-3-legacy.json  |   86 +
 .../src/test/resources/edge-legacy-col.json     |   27 +
 .../src/test/resources/edge-legacy-col2.json    |   27 +
 .../src/test/resources/edge-legacy-col3.json    |   27 +
 .../src/test/resources/edge-legacy-col4.json    |   27 +
 .../src/test/resources/edge-legacy-process.json |   27 +
 .../src/test/resources/edge-legacy-tag.json     |   27 +
 .../src/test/resources/lineage-v-98312.json     |   79 +
 .../src/test/resources/table-v-147504.json      |   83 +-
 .../janus/src/test/resources/tag-163856752.json |   35 +
 .../apache/atlas/store/AtlasTypeDefStore.java   |    6 -
 .../test/java/org/apache/atlas/TestUtilsV2.java |    1 -
 .../migration/DataMigrationService.java         |   48 +-
 .../migration/RelationshipCacheGenerator.java   |   73 -
 .../graph/v1/AtlasTypeDefGraphStoreV1.java      |    7 -
 .../migration/ComplexAttributesTest.java        |   61 +
 .../migration/HiveParititionTest.java           |   13 +-
 .../repository/migration/HiveStocksTest.java    |    3 +-
 .../migration/MigrationBaseAsserts.java         |   28 +-
 .../RelationshipCacheGeneratorTest.java         |   96 +
 .../migration/RelationshipMappingTest.java      |   87 -
 .../TypesWithCollectionsFinderTest.java         |   84 +
 .../store/graph/v1/AtlasEntityTestBase.java     |    5 +-
 .../complex-attr_db/atlas-migration-data.json   | 5569 ++++++++++++++++++
 .../atlas-migration-typesdef.json               | 2303 ++++++++
 .../parts_db/atlas-migration-data.json          | 3990 ++++++++++++-
 47 files changed, 14107 insertions(+), 815 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/common/src/main/java/org/apache/atlas/AtlasConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/AtlasConstants.java 
b/common/src/main/java/org/apache/atlas/AtlasConstants.java
index 2b92e6e..2b9f411 100644
--- a/common/src/main/java/org/apache/atlas/AtlasConstants.java
+++ b/common/src/main/java/org/apache/atlas/AtlasConstants.java
@@ -35,6 +35,6 @@ public final class AtlasConstants {
     public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
     public static final String DEFAULT_TYPE_VERSION = "1.0";
 
-    public static final String ATLAS_MIGRATION_MODE_FILENAME = 
"atlas.migration.mode.filename";
+    public static final String ATLAS_MIGRATION_MODE_FILENAME = 
"atlas.migration.data.filename";
     public static final String ATLAS_SERVICES_ENABLED        = 
"atlas.services.enabled";
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
----------------------------------------------------------------------
diff --git 
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java 
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
index 607baf6..e5316d8 100644
--- 
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
+++ 
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
@@ -30,6 +30,7 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.groovy.GroovyExpression;
 import org.apache.atlas.model.impexp.MigrationStatus;
 import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
 
 /**
  * Represents a graph.
@@ -320,7 +321,7 @@ public interface AtlasGraph<V, E> {
      */
     boolean isMultiProperty(String name);
 
-    void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream 
fs) throws AtlasBaseException;
+    void importLegacyGraphSON(AtlasTypeRegistry typeRegistry, InputStream fs) 
throws AtlasBaseException;
 
     MigrationStatus getMigrationStatus();
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
index c0b9c17..b4d6b33 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
@@ -34,10 +34,10 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
 import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.graphdb.GremlinVersion;
-import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager;
 import org.apache.atlas.repository.graphdb.janus.query.AtlasJanusGraphQuery;
 import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter;
 import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.groovy.CompilerCustomizerProvider;
 import org.apache.tinkerpop.gremlin.groovy.DefaultImportCustomizerProvider;
@@ -84,26 +84,20 @@ import static 
org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_P
  */
 public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, 
AtlasJanusEdge> {
 
-    private final ConvertGremlinValueFunction 
GREMLIN_VALUE_CONVERSION_FUNCTION = new ConvertGremlinValueFunction();
-
     private static Configuration APPLICATION_PROPERTIES = null;
 
-    private final class ConvertGremlinValueFunction implements 
Function<Object, Object> {
-        @Override
-        public Object apply(Object input) {
-            return convertGremlinValue(input);
-        }
-    }
-
-    private final Set<String> multiProperties;
+    private final ConvertGremlinValueFunction 
GREMLIN_VALUE_CONVERSION_FUNCTION = new ConvertGremlinValueFunction();
+    private final Set<String>                 multiProperties                  
 = new HashSet<>();
 
     public AtlasJanusGraph() {
         //determine multi-properties once at startup
         JanusGraphManagement mgmt = null;
+
         try {
             mgmt = AtlasJanusGraphDatabase.getGraphInstance().openManagement();
+
             Iterable<PropertyKey> keys = 
mgmt.getRelationTypes(PropertyKey.class);
-            multiProperties = new HashSet<>();
+
             for (PropertyKey key : keys) {
                 if (key.cardinality() != Cardinality.SINGLE) {
                     multiProperties.add(key.name());
@@ -118,13 +112,13 @@ public class AtlasJanusGraph implements 
AtlasGraph<AtlasJanusVertex, AtlasJanusE
 
     @Override
     public AtlasEdge<AtlasJanusVertex, AtlasJanusEdge> 
addEdge(AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> outVertex,
-                                                       
AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> inVertex,
-                                                       String edgeLabel) {
-
+                                                               
AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> inVertex,
+                                                               String 
edgeLabel) {
         try {
-            Vertex oV = outVertex.getV().getWrappedElement();
-            Vertex iV = inVertex.getV().getWrappedElement();
-            Edge edge = oV.addEdge(edgeLabel, iV);
+            Vertex oV   = outVertex.getV().getWrappedElement();
+            Vertex iV   = inVertex.getV().getWrappedElement();
+            Edge   edge = oV.addEdge(edgeLabel, iV);
+
             return GraphDbObjectFactory.createEdge(this, edge);
         } catch (SchemaViolationException e) {
             throw new AtlasSchemaViolationException(e);
@@ -139,42 +133,43 @@ public class AtlasJanusGraph implements 
AtlasGraph<AtlasJanusVertex, AtlasJanusE
     @Override
     public AtlasEdge<AtlasJanusVertex, AtlasJanusEdge> getEdge(String edgeId) {
         Iterator<Edge> it = getGraph().edges(edgeId);
-        Edge e = getSingleElement(it, edgeId);
+        Edge           e  = getSingleElement(it, edgeId);
+
         return GraphDbObjectFactory.createEdge(this, e);
     }
 
     @Override
     public void removeEdge(AtlasEdge<AtlasJanusVertex, AtlasJanusEdge> edge) {
-
         Edge wrapped = edge.getE().getWrappedElement();
-        wrapped.remove();
 
+        wrapped.remove();
     }
 
     @Override
     public void removeVertex(AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> 
vertex) {
         Vertex wrapped = vertex.getV().getWrappedElement();
+
         wrapped.remove();
     }
 
     @Override
     public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> getEdges() {
-
         Iterator<Edge> edges = getGraph().edges();
-        return wrapEdges(edges);
 
+        return wrapEdges(edges);
     }
 
     @Override
     public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> 
getVertices() {
-
         Iterator<Vertex> vertices = getGraph().vertices();
+
         return wrapVertices(vertices);
     }
 
     @Override
     public AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> addVertex() {
         Vertex result = getGraph().addVertex();
+
         return GraphDbObjectFactory.createVertex(this, result);
     }
 
@@ -221,73 +216,32 @@ public class AtlasJanusGraph implements 
AtlasGraph<AtlasJanusVertex, AtlasJanusE
         return getIndexKeys(Vertex.class);
     }
 
-    private Set<String> getIndexKeys(Class<? extends Element> 
janusGraphElementClass) {
-
-        JanusGraphManagement mgmt = getGraph().openManagement();
-        Iterable<JanusGraphIndex> indices = 
mgmt.getGraphIndexes(janusGraphElementClass);
-        Set<String> result = new HashSet<String>();
-        for (JanusGraphIndex index : indices) {
-            result.add(index.name());
-        }
-        mgmt.commit();
-        return result;
-
-    }
-
     @Override
     public AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> getVertex(String 
vertexId) {
-        Iterator<Vertex> it = getGraph().vertices(vertexId);
-        Vertex vertex = getSingleElement(it, vertexId);
-        return GraphDbObjectFactory.createVertex(this, vertex);
-    }
+        Iterator<Vertex> it     = getGraph().vertices(vertexId);
+        Vertex           vertex = getSingleElement(it, vertexId);
 
-    public static <T> T getSingleElement(Iterator<T> it, String id) {
-        if (!it.hasNext()) {
-            return null;
-        }
-        T element = it.next();
-        if (it.hasNext()) {
-            throw new RuntimeException("Multiple items were found with the id 
" + id);
-        }
-        return element;
+        return GraphDbObjectFactory.createVertex(this, vertex);
     }
 
     @Override
     public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> 
getVertices(String key, Object value) {
         AtlasGraphQuery<AtlasJanusVertex, AtlasJanusEdge> query = query();
-        query.has(key, value);
-        return query.vertices();
-    }
 
-    private Object convertGremlinValue(Object rawValue) {
+        query.has(key, value);
 
-        if (rawValue instanceof Vertex) {
-            return GraphDbObjectFactory.createVertex(this, (Vertex) rawValue);
-        } else if (rawValue instanceof Edge) {
-            return GraphDbObjectFactory.createEdge(this, (Edge) rawValue);
-        } else if (rawValue instanceof Map) {
-            Map<String,Object> rowValue = (Map<String,Object>)rawValue;
-            return Maps.transformValues(rowValue, 
GREMLIN_VALUE_CONVERSION_FUNCTION);
-        } else if (rawValue instanceof ImmutablePath) {
-            ImmutablePath path = (ImmutablePath) rawValue;
-            return convertGremlinValue(path.objects());
-        }
-        else if (rawValue instanceof List) {
-            return Lists.transform((List)rawValue, 
GREMLIN_VALUE_CONVERSION_FUNCTION);
-        } else if (rawValue instanceof Collection) {
-            throw new UnsupportedOperationException("Unhandled collection 
type: " + rawValue.getClass());
-        }
-        return rawValue;
+        return query.vertices();
     }
 
     @Override
     public GremlinVersion getSupportedGremlinVersion() {
-
         return GremlinVersion.THREE;
     }
+
     @Override
     public void clear() {
         JanusGraph graph = getGraph();
+
         if (graph.isOpen()) {
             // only a shut down graph can be cleared
             graph.close();
@@ -295,7 +249,8 @@ public class AtlasJanusGraph implements 
AtlasGraph<AtlasJanusVertex, AtlasJanusE
 
         try {
             JanusGraphFactory.drop(graph);
-        } catch (BackendException ignoreEx) {}
+        } catch (BackendException ignoreEx) {
+        }
     }
 
     private JanusGraph getGraph() {
@@ -304,25 +259,27 @@ public class AtlasJanusGraph implements 
AtlasGraph<AtlasJanusVertex, AtlasJanusE
 
     @Override
     public void exportToGson(OutputStream os) throws IOException {
-
-        GraphSONMapper mapper = 
getGraph().io(IoCore.graphson()).mapper().create();
+        GraphSONMapper         mapper  = 
getGraph().io(IoCore.graphson()).mapper().create();
         GraphSONWriter.Builder builder = GraphSONWriter.build();
+
         builder.mapper(mapper);
+
         GraphSONWriter writer = builder.create();
+
         writer.writeGraph(os, getGraph());
     }
 
     @Override
     public GremlinGroovyScriptEngine getGremlinScriptEngine() {
-        Set<String> extraImports = new HashSet<String>();
-        extraImports.add(java.util.function.Function.class.getName());
-
+        Set<String> extraImports       = new HashSet<String>();
         Set<String> extraStaticImports = new HashSet<String>();
+
+        extraImports.add(java.util.function.Function.class.getName());
         extraStaticImports.add(P.class.getName() + ".*");
         extraStaticImports.add(__.class.getName() + ".*");
-        CompilerCustomizerProvider provider = new 
DefaultImportCustomizerProvider(extraImports, extraStaticImports);
 
-        GremlinGroovyScriptEngine scriptEngine = new 
GremlinGroovyScriptEngine(provider);
+        CompilerCustomizerProvider provider     = new 
DefaultImportCustomizerProvider(extraImports, extraStaticImports);
+        GremlinGroovyScriptEngine  scriptEngine = new 
GremlinGroovyScriptEngine(provider);
 
         return scriptEngine;
     }
@@ -341,38 +298,20 @@ public class AtlasJanusGraph implements 
AtlasGraph<AtlasJanusVertex, AtlasJanusE
     @Override
     public Object executeGremlinScript(String query, boolean isPath) throws 
AtlasBaseException {
         Object result = executeGremlinScript(query);
-        return convertGremlinValue(result);
-    }
-
-    private Object executeGremlinScript(String gremlinQuery) throws 
AtlasBaseException {
-        GremlinGroovyScriptEngine scriptEngine = getGremlinScriptEngine();
-
-        try {
-            Bindings bindings = scriptEngine.createBindings();
-
-            bindings.put("graph", getGraph());
-            bindings.put("g", getGraph().traversal());
-
-            Object result = scriptEngine.eval(gremlinQuery, bindings);
 
-            return result;
-        } catch (ScriptException e) {
-            throw new 
AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e, 
gremlinQuery);
-        } finally {
-            releaseGremlinScriptEngine(scriptEngine);
-        }
+        return convertGremlinValue(result);
     }
 
     @Override
-    public Object executeGremlinScript(ScriptEngine scriptEngine,
-            Map<? extends  String, ? extends  Object> userBindings, String 
query, boolean isPath)
-            throws ScriptException {
+    public Object executeGremlinScript(ScriptEngine scriptEngine, Map<? 
extends String, ? extends Object> userBindings,
+                                       String query, boolean isPath) throws 
ScriptException {
         Bindings bindings = scriptEngine.createBindings();
 
         bindings.putAll(userBindings);
         bindings.put("g", getGraph().traversal());
 
         Object result = scriptEngine.eval(query, bindings);
+
         return convertGremlinValue(result);
     }
 
@@ -402,39 +341,42 @@ public class AtlasJanusGraph implements 
AtlasGraph<AtlasJanusVertex, AtlasJanusE
         return expr;
     }
 
-    public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> 
wrapVertices(Iterator<? extends Vertex> it) {
-        Iterable<? extends Vertex> iterable = new 
IteratorToIterableAdapter<>(it);
-        return wrapVertices(iterable);
+    @Override
+    public boolean isMultiProperty(String propertyName) {
+        return multiProperties.contains(propertyName);
     }
 
-    public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> 
wrapVertices(Iterable<? extends Vertex> it) {
+    @Override
+    public void importLegacyGraphSON(AtlasTypeRegistry typeRegistry, 
InputStream fs) throws AtlasBaseException {
+        AtlasJanusGraphDatabase.loadLegacyGraphSON(typeRegistry, fs);
+    }
 
-        return StreamSupport.stream(it.spliterator(), false).map(input -> 
GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, 
input)).collect(Collectors.toList());
+    @Override
+    public MigrationStatus getMigrationStatus() {
+        return AtlasJanusGraphDatabase.getMigrationStatus();
+    }
 
+    public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> 
wrapVertices(Iterable<? extends Vertex> it) {
+        return StreamSupport.stream(it.spliterator(), false).map(input -> 
GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, 
input)).collect(Collectors.toList());
     }
 
     public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> 
wrapEdges(Iterator<? extends Edge> it) {
         Iterable<? extends Edge> iterable = new 
IteratorToIterableAdapter<>(it);
+
         return wrapEdges(iterable);
     }
 
     public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> 
wrapEdges(Iterable<? extends Edge> it) {
-
         return StreamSupport.stream(it.spliterator(), false).map(input -> 
GraphDbObjectFactory.createEdge(AtlasJanusGraph.this, 
input)).collect(Collectors.toList());
-
-    }
-
-    @Override
-    public boolean isMultiProperty(String propertyName) {
-        return multiProperties.contains(propertyName);
     }
 
     public void addMultiProperties(Set<String> names) {
         multiProperties.addAll(names);
     }
 
-    public String getIndexQueryPrefix() {
-        String ret;
+
+    private String getIndexQueryPrefix() {
+        final String ret;
 
         initApplicationProperties();
 
@@ -447,6 +389,82 @@ public class AtlasJanusGraph implements 
AtlasGraph<AtlasJanusVertex, AtlasJanusE
         return ret;
     }
 
+    private Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> 
wrapVertices(Iterator<? extends Vertex> it) {
+        Iterable<? extends Vertex> iterable = new 
IteratorToIterableAdapter<>(it);
+
+        return wrapVertices(iterable);
+    }
+
+    private static <T> T getSingleElement(Iterator<T> it, String id) {
+        if (!it.hasNext()) {
+            return null;
+        }
+
+        T element = it.next();
+
+        if (it.hasNext()) {
+            throw new RuntimeException("Multiple items were found with the id 
" + id);
+        }
+
+        return element;
+    }
+
+    private Object convertGremlinValue(Object rawValue) {
+        if (rawValue instanceof Vertex) {
+            return GraphDbObjectFactory.createVertex(this, (Vertex) rawValue);
+        } else if (rawValue instanceof Edge) {
+            return GraphDbObjectFactory.createEdge(this, (Edge) rawValue);
+        } else if (rawValue instanceof Map) {
+            Map<String,Object> rowValue = (Map<String,Object>)rawValue;
+
+            return Maps.transformValues(rowValue, 
GREMLIN_VALUE_CONVERSION_FUNCTION);
+        } else if (rawValue instanceof ImmutablePath) {
+            ImmutablePath path = (ImmutablePath) rawValue;
+
+            return convertGremlinValue(path.objects());
+        } else if (rawValue instanceof List) {
+            return Lists.transform((List)rawValue, 
GREMLIN_VALUE_CONVERSION_FUNCTION);
+        } else if (rawValue instanceof Collection) {
+            throw new UnsupportedOperationException("Unhandled collection 
type: " + rawValue.getClass());
+        }
+
+        return rawValue;
+    }
+
+    private Set<String> getIndexKeys(Class<? extends Element> 
janusGraphElementClass) {
+        JanusGraphManagement      mgmt    = getGraph().openManagement();
+        Iterable<JanusGraphIndex> indices = 
mgmt.getGraphIndexes(janusGraphElementClass);
+        Set<String>               result  = new HashSet<String>();
+
+        for (JanusGraphIndex index : indices) {
+            result.add(index.name());
+        }
+
+        mgmt.commit();
+
+        return result;
+
+    }
+
+    private Object executeGremlinScript(String gremlinQuery) throws 
AtlasBaseException {
+        GremlinGroovyScriptEngine scriptEngine = getGremlinScriptEngine();
+
+        try {
+            Bindings bindings = scriptEngine.createBindings();
+
+            bindings.put("graph", getGraph());
+            bindings.put("g", getGraph().traversal());
+
+            Object result = scriptEngine.eval(gremlinQuery, bindings);
+
+            return result;
+        } catch (ScriptException e) {
+            throw new 
AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e, 
gremlinQuery);
+        } finally {
+            releaseGremlinScriptEngine(scriptEngine);
+        }
+    }
+
     private void initApplicationProperties() {
         if (APPLICATION_PROPERTIES == null) {
             try {
@@ -457,13 +475,11 @@ public class AtlasJanusGraph implements 
AtlasGraph<AtlasJanusVertex, AtlasJanusE
         }
     }
 
-    @Override
-    public void loadLegacyGraphSON(Map<String, String> relationshipCache, 
InputStream fs) throws AtlasBaseException {
-        AtlasJanusGraphDatabase.loadLegacyGraphSON(relationshipCache, fs);
-    }
 
-    @Override
-    public MigrationStatus getMigrationStatus() {
-        return AtlasJanusGraphDatabase.getMigrationStatus();
+    private final class ConvertGremlinValueFunction implements 
Function<Object, Object> {
+        @Override
+        public Object apply(Object input) {
+            return convertGremlinValue(input);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
index 16aecd5..c9d6067 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
@@ -26,10 +26,12 @@ import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.GraphDatabase;
 import org.apache.atlas.repository.graphdb.janus.migration.AtlasGraphSONReader;
 import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager;
+import org.apache.atlas.repository.graphdb.janus.migration.ElementProcessors;
 import 
org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer;
 import 
org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer;
 import 
org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer;
 import org.apache.atlas.runner.LocalSolrRunner;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
 import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.commons.configuration.Configuration;
@@ -47,7 +49,6 @@ import java.io.InputStream;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Map;
 
 /**
  * Default implementation for Graph Provider that doles out JanusGraph.
@@ -231,7 +232,7 @@ public class AtlasJanusGraphDatabase implements 
GraphDatabase<AtlasJanusVertex,
         return ret;
     }
 
-    public static void loadLegacyGraphSON(Map<String, String> 
relationshipCache, InputStream fs) throws AtlasBaseException {
+    public static void loadLegacyGraphSON(AtlasTypeRegistry typeRegistry, 
InputStream fs) throws AtlasBaseException {
         AtlasPerfTracer perf = null;
 
         try {
@@ -242,9 +243,10 @@ public class AtlasJanusGraphDatabase implements 
GraphDatabase<AtlasJanusVertex,
             }
 
             AtlasGraphSONReader legacyGraphSONReader = 
AtlasGraphSONReader.build().
-                    relationshipCache(relationshipCache).
+                    relationshipCache(new ElementProcessors(typeRegistry)).
                     schemaDB(getGraphInstance()).
-                    bulkLoadingDB(getBulkLoadingGraphInstance()).create();
+                    bulkLoadingDB(getBulkLoadingGraphInstance()).
+                    create();
 
             legacyGraphSONReader.readGraph(fs);
         } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java
index ae119b0..2d5bd8a 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java
@@ -37,44 +37,46 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 public final class AtlasGraphSONReader {
     private static final Logger LOG = 
LoggerFactory.getLogger(AtlasGraphSONReader.class);
 
-    private final ObjectMapper          mapper;
-    private final RelationshipTypeCache relationshipCache;
-    private final Graph                 graph;
-    private final Graph                 bulkLoadGraph;
-    private final int                   numWorkers;
-    private final int                   batchSize;
-    private final long                  suppliedStartIndex;
-    private final String[]              propertiesToPostProcess;
-    private       GraphSONUtility       graphSONUtility;
-    private       ReaderStatusManager   readerStatusManager;
-    private       AtomicLong            counter;
-
-    private AtlasGraphSONReader(ObjectMapper mapper, Map<String, String> 
relationshipLookup, Graph graph,
-                                Graph bulkLoadGraph, String[] 
propertiesToPostProcess, int numWorkers, int batchSize, long 
suppliedStartIndex) {
+    private static String APPLICATION_PROPERTY_MIGRATION_START_INDEX      = 
"atlas.migration.mode.start.index";
+    private static String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = 
"atlas.migration.mode.workers";
+    private static String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE       = 
"atlas.migration.mode.batch.size";
+
+    private final ObjectMapper        mapper;
+    private final ElementProcessors   relationshipCache;
+    private final Graph               graph;
+    private final Graph               bulkLoadGraph;
+    private final int                 numWorkers;
+    private final int                 batchSize;
+    private final long                suppliedStartIndex;
+    private final GraphSONUtility     graphSONUtility;
+    private       ReaderStatusManager readerStatusManager;
+    private       AtomicLong          counter;
+
+    private AtlasGraphSONReader(ObjectMapper mapper, ElementProcessors 
relationshipLookup, Graph graph,
+                                Graph bulkLoadGraph, int numWorkers, int 
batchSize, long suppliedStartIndex) {
         this.mapper                 = mapper;
-        this.relationshipCache      = new 
RelationshipTypeCache(relationshipLookup);
+        this.relationshipCache      = relationshipLookup;
         this.graph                  = graph;
         this.bulkLoadGraph          = bulkLoadGraph;
         this.numWorkers             = numWorkers;
         this.batchSize              = batchSize;
         this.suppliedStartIndex     = suppliedStartIndex;
-        this.propertiesToPostProcess = propertiesToPostProcess;
+        this.graphSONUtility        = new GraphSONUtility(relationshipCache);
     }
 
     public void readGraph(final InputStream inputStream) throws IOException {
-        counter         = new AtomicLong(0);
-        graphSONUtility = new GraphSONUtility(relationshipCache);
+        counter = new AtomicLong(0);
 
         final long        startIndex = initStatusManager();
         final JsonFactory factory    = mapper.getFactory();
 
         LOG.info("AtlasGraphSONReader.readGraph: numWorkers: {}: batchSize: 
{}: startIndex: {}", numWorkers, batchSize, startIndex);
+
         try (JsonParser parser = factory.createParser(inputStream)) {
             if (parser.nextToken() != JsonToken.START_OBJECT) {
                 throw new IOException("Expected data to start with an Object");
@@ -88,6 +90,7 @@ public final class AtlasGraphSONReader {
                 switch (fieldName) {
                     case GraphSONTokensTP2.MODE:
                         parser.nextToken();
+
                         final String mode = parser.getText();
 
                         if (!mode.equals("EXTENDED")) {
@@ -180,8 +183,7 @@ public final class AtlasGraphSONReader {
         LOG.info("postProcess: Starting... : counter at: {}", counter.get());
 
         try {
-            PostProcessManager.WorkItemsManager wim   = 
PostProcessManager.create(bulkLoadGraph, graphSONUtility,
-                                                                    
propertiesToPostProcess, batchSize, numWorkers);
+            PostProcessManager.WorkItemsManager wim   = 
PostProcessManager.create(bulkLoadGraph, 
relationshipCache.getPropertiesToPostProcess(), batchSize, numWorkers);
             GraphTraversal                      query = 
bulkLoadGraph.traversal().V();
 
             while (query.hasNext()) {
@@ -228,30 +230,32 @@ public final class AtlasGraphSONReader {
         }
     }
 
-    public static Builder build() throws AtlasException {
+    public static Builder build() {
         return new Builder();
     }
 
     public final static class Builder {
-        private int                 batchSize = 500;
-        private Map<String, String> relationshipCache;
-        private Graph               graph;
-        private Graph               bulkLoadGraph;
-        private int                 numWorkers;
-        private long                suppliedStartIndex;
-        private String[]            propertiesToPostProcess;
+        private int               batchSize = 500;
+        private ElementProcessors relationshipCache;
+        private Graph             graph;
+        private Graph             bulkLoadGraph;
+        private int               numWorkers;
+        private long              suppliedStartIndex;
 
         private Builder() {
         }
 
-        private void setDefaults() throws AtlasException {
-            
this.startIndex(ApplicationProperties.get().getLong("atlas.migration.mode.start.index",
 0L))
-                
.numWorkers(ApplicationProperties.get().getInt("atlas.migration.mode.workers", 
4))
-                
.batchSize(ApplicationProperties.get().getInt("atlas.migration.mode.batch.size",
 3000))
-                
.propertiesToPostProcess(getPropertiesToPostProcess("atlas.migration.mode.postprocess.properties"));
+        private void setDefaults() {
+            try {
+                
this.startIndex(ApplicationProperties.get().getLong(APPLICATION_PROPERTY_MIGRATION_START_INDEX,
 0L))
+                        
.numWorkers(ApplicationProperties.get().getInt(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS,
 4))
+                        
.batchSize(ApplicationProperties.get().getInt(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE,
 3000));
+            } catch (AtlasException ex) {
+                LOG.error("setDefaults: failed!", ex);
+            }
         }
 
-        public AtlasGraphSONReader create() throws AtlasException {
+        public AtlasGraphSONReader create() {
             setDefaults();
             if(bulkLoadGraph == null) {
                 bulkLoadGraph = graph;
@@ -261,10 +265,10 @@ public final class AtlasGraphSONReader {
             final GraphSONMapper         mapper  = 
builder.typeInfo(TypeInfo.NO_TYPES).create();
 
             return new AtlasGraphSONReader(mapper.createMapper(), 
relationshipCache, graph, bulkLoadGraph,
-                                           propertiesToPostProcess, 
numWorkers, batchSize, suppliedStartIndex);
+                                                                    
numWorkers, batchSize, suppliedStartIndex);
         }
 
-        public Builder relationshipCache(Map<String, String> 
relationshipCache) {
+        public Builder relationshipCache(ElementProcessors relationshipCache) {
             this.relationshipCache = relationshipCache;
 
             return this;
@@ -305,27 +309,5 @@ public final class AtlasGraphSONReader {
 
             return this;
         }
-
-        public Builder propertiesToPostProcess(String[] list) {
-            this.propertiesToPostProcess = list;
-            return this;
-        }
-
-        private static String[] getPropertiesToPostProcess(String 
applicationPropertyKey) throws AtlasException {
-            final String HIVE_COLUMNS_PROPERTY        = "hive_table.columns";
-            final String HIVE_PARTITION_KEYS_PROPERTY = 
"hive_table.partitionKeys";
-            final String PROCESS_INPUT_PROPERTY       = "Process.inputs";
-            final String PROCESS_OUTPUT_PROPERTY      = "Process.outputs";
-            final String USER_PROFILE_OUTPUT_PROPERTY = 
"__AtlasUserProfile.savedSearches";
-
-            String[] defaultProperties = new String[] { HIVE_COLUMNS_PROPERTY, 
HIVE_PARTITION_KEYS_PROPERTY,
-                                                        
PROCESS_INPUT_PROPERTY, PROCESS_OUTPUT_PROPERTY,
-                                                        
USER_PROFILE_OUTPUT_PROPERTY
-                                                      };
-
-            String[] userDefinedList = 
ApplicationProperties.get().getStringArray(applicationPropertyKey);
-
-            return (userDefinedList == null || userDefinedList.length == 0) ? 
defaultProperties : userDefinedList;
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ElementProcessors.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ElementProcessors.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ElementProcessors.java
new file mode 100644
index 0000000..f51080a
--- /dev/null
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ElementProcessors.java
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graphdb.janus.migration;
+
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.lang.StringUtils;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
+import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY;
+import static 
org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
+import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
+
+public class ElementProcessors {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ElementProcessors.class);
+
+    public  static final String   PRIMITIVE_MAP_CATEGORY       = 
"MAP_PRIMITIVE";
+    public  static final String   NON_PRIMITIVE_MAP_CATEGORY   = "MAP";
+    public  static final String   NON_PRIMITIVE_ARRAY_CATEGORY = "ARRAY";
+    private static final String[] NON_PRIMITIVE_KEYS           = { 
ElementProcessors.NON_PRIMITIVE_ARRAY_CATEGORY };
+
+    private final Map<String, RelationshipCacheGenerator.TypeInfo> 
relationshipLookup;
+    private final Map<String, Map<String, List<String>>>        postProcessMap;
+
+    private final NonPrimitiveListPropertyProcessor 
nonPrimitiveListPropertyProcessor = new NonPrimitiveListPropertyProcessor();
+    private final NonPrimitiveMapPropertyProcessor  
nonPrimitiveMapPropertyProcessor  = new NonPrimitiveMapPropertyProcessor();
+    private final PrimitiveMapPropertyProcessor     
primitiveMapPropertyProcessor     = new PrimitiveMapPropertyProcessor();
+    private final EdgeCollectionPropertyProcessor   
edgeCollectionPropertyProcessor   = new EdgeCollectionPropertyProcessor();
+    private final EdgeRelationshipPropertyProcessor 
edgeRelationshipPropertyProcessor = new EdgeRelationshipPropertyProcessor();
+
+    public ElementProcessors(AtlasTypeRegistry typeRegistry) {
+        this(RelationshipCacheGenerator.get(typeRegistry), 
TypesWithCollectionsFinder.getVertexPropertiesForCollectionAttributes(typeRegistry));
+    }
+
+    ElementProcessors(Map<String, RelationshipCacheGenerator.TypeInfo> lookup, 
Map<String, Map<String, List<String>>> postProcessMap) {
+        this.relationshipLookup = lookup;
+        this.postProcessMap     = postProcessMap;
+    }
+
+    public static String[] getNonPrimitiveCategoryKeys() {
+        return NON_PRIMITIVE_KEYS;
+    }
+
+    public Map<String,Map<String, List<String>>> getPropertiesToPostProcess() {
+        return postProcessMap;
+    }
+
+    public String addIndexKeysForCollections(Vertex out, Object edgeId, String 
label, Map<String, Object> edgeProperties) {
+        return edgeCollectionPropertyProcessor.update(out, edgeId, label, 
edgeProperties);
+    }
+
+    public void processCollections(String typeNameKey, Map<String,Object> 
vertexProperties) {
+        if (!vertexProperties.containsKey(typeNameKey)) {
+            return;
+        }
+
+        String typeName = (String) vertexProperties.get(typeNameKey);
+
+        if (!postProcessMap.containsKey(typeName)) {
+            return;
+        }
+
+        primitiveMapPropertyProcessor.update(typeName, vertexProperties);
+        nonPrimitiveMapPropertyProcessor.update(typeName, vertexProperties);
+        nonPrimitiveListPropertyProcessor.update(typeName, vertexProperties);
+    }
+
+    public String updateEdge(Vertex in, Vertex out, Object edgeId, String 
label, Map<String,Object> props) {
+        return edgeRelationshipPropertyProcessor.update(in, out, edgeId, 
label, props);
+    }
+
+    private class NonPrimitiveMapPropertyProcessor {
+        final String category = NON_PRIMITIVE_MAP_CATEGORY;
+
+        public void update(String typeName, Map<String,Object> 
vertexProperties) {
+            if (!postProcessMap.containsKey(typeName)) {
+                return;
+            }
+
+            if (!postProcessMap.get(typeName).containsKey(category)) {
+                return;
+            }
+
+            List<String> propertyTypeList = 
postProcessMap.get(typeName).get(category);
+
+            for (String property : propertyTypeList) {
+                if (!vertexProperties.containsKey(property)) {
+                    continue;
+                }
+
+                List<Object> list = (List<Object>) 
vertexProperties.get(property);
+
+                if (list == null) {
+                    continue;
+                }
+
+                for (Object listEntry : list) {
+                    String key      = (String) listEntry;
+                    String valueKey = getMapKey(property, key);
+
+                    if (vertexProperties.containsKey(valueKey)) {
+                        vertexProperties.remove(valueKey);
+                    }
+                }
+
+                vertexProperties.remove(property);
+            }
+        }
+
+        private String getMapKey(String property, String key) {
+            return String.format("%s.%s", property, key);
+        }
+    }
+
+    private class PrimitiveMapPropertyProcessor {
+        final String category = PRIMITIVE_MAP_CATEGORY;
+
+        public void update(String typeName, Map<String, Object> 
vertexProperties) {
+            if (!postProcessMap.get(typeName).containsKey(category)) {
+                return;
+            }
+
+            List<String> propertyTypeList = 
postProcessMap.get(typeName).get(category);
+
+            for (String property : propertyTypeList) {
+                if (!vertexProperties.containsKey(property)) {
+                    continue;
+                }
+
+                List<Object> list = (List<Object>) 
vertexProperties.get(property);
+
+                if (list == null) {
+                    continue;
+                }
+
+                Map<String, Object> map = getAggregatedMap(vertexProperties, 
property, list);
+
+                vertexProperties.put(property, map);
+            }
+        }
+
+        private Map<String, Object> getAggregatedMap(Map<String, Object> 
vertexProperties, String property, List<Object> list) {
+            Map<String, Object> map = new HashMap<>();
+
+            for (Object listEntry : list) {
+                String key      = (String) listEntry;
+                String valueKey = getMapKey(property, key);
+
+                if (vertexProperties.containsKey(valueKey)) {
+                    Object value = getValueFromProperties(valueKey, 
vertexProperties);
+
+                    vertexProperties.remove(valueKey);
+
+                    map.put(key, value);
+                }
+            }
+
+            return map;
+        }
+
+        private String getMapKey(String property, String key) {
+            return String.format("%s.%s", property, key);
+        }
+
+        private Object getValueFromProperties(String key, Map<String, Object> 
vertexProperties) {
+            if (!vertexProperties.containsKey(key)) {
+                return null;
+            }
+
+            return vertexProperties.get(key);
+        }
+    }
+
+    private class NonPrimitiveListPropertyProcessor {
+        private final String category = NON_PRIMITIVE_ARRAY_CATEGORY;
+
+        private void update(String typeName, Map<String,Object> props) {
+            if(!postProcessMap.get(typeName).containsKey(category)) {
+                return;
+            }
+
+            List<String> propertyTypeList = 
postProcessMap.get(typeName).get(category);
+            for (String property : propertyTypeList) {
+                if(!props.containsKey(property)) {
+                    continue;
+                }
+
+                Map<String, String> listMap = 
getUpdatedEdgeList(props.get(property));
+
+                if(listMap == null) {
+                    continue;
+                }
+
+                props.put(property, listMap);
+            }
+        }
+
+        private Map<String, String> getUpdatedEdgeList(Object o) {
+            Map<String, String> listMap = new HashMap<>();
+
+            if(!(o instanceof List)) {
+                return null;
+            }
+
+            List list = (List) o;
+
+            for (int i = 0; i < list.size(); i++) {
+                listMap.put((String) list.get(i), Integer.toString(i));
+            }
+
+            return listMap;
+        }
+    }
+
+    private class EdgeRelationshipPropertyProcessor {
+        public String update(Vertex in, Vertex out, Object edgeId, String 
label, Map<String, Object> props) {
+            if(addRelationshipTypeForClassification(in, out, label, props)) {
+                label = Constants.CLASSIFICATION_LABEL;
+            } else {
+                addRelationshipTypeName(label, props);
+
+                label = addIndexKeysForCollections(out, edgeId, label, props);
+            }
+
+            addMandatoryRelationshipProperties(label, props);
+
+            return label;
+        }
+
+        private String getRelationshipTypeName(String label) {
+            return relationshipLookup.containsKey(label) ? 
relationshipLookup.get(label).getTypeName() : "";
+        }
+
+        private PropagateTags getDefaultPropagateValue(String label) {
+            return relationshipLookup.containsKey(label) ?
+                    relationshipLookup.get(label).getPropagateTags() :
+                    AtlasRelationshipDef.PropagateTags.NONE;
+        }
+
+        private boolean addRelationshipTypeForClassification(Vertex in, Vertex 
out, String label, Map<String, Object> props) {
+            if (in.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
+                String inTypeName  = (String) 
in.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value();
+
+                if (StringUtils.isNotEmpty(inTypeName)) {
+                    if (inTypeName.equals(label)) {
+                        props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, 
inTypeName);
+
+                        addEntityGuidToTrait(in, out);
+
+                        return true;
+                    }
+                } else {
+                    LOG.info("Could not find typeName for trait: {}", label);
+                }
+            }
+
+            return false;
+        }
+
+        private void addEntityGuidToTrait(Vertex in, Vertex out) {
+            String entityGuid = "";
+
+            if (out.property(Constants.GUID_PROPERTY_KEY).isPresent()) {
+                entityGuid = (String) 
out.property(Constants.GUID_PROPERTY_KEY).value();
+            }
+
+            if(StringUtils.isNotEmpty(entityGuid)) {
+                in.property(CLASSIFICATION_ENTITY_GUID, entityGuid);
+                in.property(CLASSIFICATION_VERTEX_PROPAGATE_KEY, false);
+            }
+        }
+
+        private void addRelationshipTypeName(String edgeLabel, Map<String, 
Object> props) {
+            String typeName = getRelationshipTypeName(edgeLabel);
+
+            if (StringUtils.isNotEmpty(typeName)) {
+                props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Could not find relationship type for: {}", 
edgeLabel);
+                }
+            }
+        }
+
+        private void addMandatoryRelationshipProperties(String label, 
Map<String, Object> props) {
+            props.put(Constants.RELATIONSHIP_GUID_PROPERTY_KEY, 
UUID.randomUUID().toString());
+
+            props.put(RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, 
String.valueOf(getDefaultPropagateValue(label)));
+            props.put(STATE_PROPERTY_KEY, "ACTIVE");
+        }
+    }
+
+    private class EdgeCollectionPropertyProcessor {
+        private static final int LABEL_INDEX = 0;
+        private static final int KEY_INDEX   = 1;
+
+        public String update(Vertex out, Object edgeId, String label, 
Map<String, Object> edgeProperties) {
+            String[] labelKeyPair = getNonPrimitiveArrayFromLabel(out, 
(String) edgeId, label);
+
+            if (labelKeyPair != null) {
+                edgeProperties.put(ATTRIBUTE_INDEX_PROPERTY_KEY, 
Integer.valueOf(labelKeyPair[KEY_INDEX]));
+
+                return label;
+            }
+
+            labelKeyPair = getNonPrimitiveMapKeyFromLabel(out, label);
+
+            if (labelKeyPair != null) {
+                label = labelKeyPair[LABEL_INDEX];
+
+                edgeProperties.put(ATTRIBUTE_KEY_PROPERTY_KEY, 
labelKeyPair[KEY_INDEX]);
+            }
+
+            return label;
+        }
+
+        private String[] getNonPrimitiveArrayFromLabel(Vertex v, String 
edgeId, String label) {
+            if (!v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
+                return null;
+            }
+
+            String typeName     = (String) 
v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value();
+            String propertyName = StringUtils.remove(label, 
Constants.INTERNAL_PROPERTY_KEY_PREFIX);
+
+            if(!containsNonPrimitiveCollectionProperty(typeName, propertyName, 
NON_PRIMITIVE_ARRAY_CATEGORY)) {
+                return null;
+            }
+
+            Map<String, String> edgeIdIndexList = (Map<String, String>) 
v.property(propertyName).value();
+
+            if (edgeIdIndexList.containsKey(edgeId)) {
+                return getLabelKeyPair(label, edgeIdIndexList.get(edgeId));
+            }
+
+            return null;
+        }
+
+        // legacy edge label is in format: __<type name>.<key>
+        //      label: in new format which is type name
+        // this method extracts:
+        //      key: what remains of the legacy label string when '__' and 
type name are removed
+        private String[] getNonPrimitiveMapKeyFromLabel(Vertex v, String 
label) {
+            if (!v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
+                return null;
+            }
+
+            String typeName = (String) 
v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value();
+
+            if(!postProcessMap.containsKey(typeName)) {
+                return null;
+            }
+
+            
if(!postProcessMap.get(typeName).containsKey(NON_PRIMITIVE_MAP_CATEGORY)) {
+                return null;
+            }
+
+            String       propertyName = StringUtils.remove(label, 
Constants.INTERNAL_PROPERTY_KEY_PREFIX);
+            List<String> properties   = 
postProcessMap.get(typeName).get(NON_PRIMITIVE_MAP_CATEGORY);
+
+            for (String p : properties) {
+                if (propertyName.startsWith(p)) {
+                    return getLabelKeyPair(
+                            String.format("%s%s", 
Constants.INTERNAL_PROPERTY_KEY_PREFIX, p),
+                            StringUtils.remove(propertyName, 
p).substring(1).trim());
+                }
+            }
+
+            return null;
+        }
+
+        private boolean containsNonPrimitiveCollectionProperty(String 
typeName, String propertyName, String categoryType) {
+            if (!postProcessMap.containsKey(typeName)) {
+                return false;
+            }
+
+            if (!postProcessMap.get(typeName).containsKey(categoryType)) {
+                return false;
+            }
+
+            List<String> properties = 
postProcessMap.get(typeName).get(categoryType);
+
+            for (String p : properties) {
+                if (p.equals(propertyName)) {
+                    return true;
+                }
+            }
+
+            return false;
+        }
+
+        private String[] getLabelKeyPair(String label, String value) {
+            return new String[] { label, value };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java
index ec320b0..f1bbfcf 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java
@@ -18,10 +18,9 @@
 
 package org.apache.atlas.repository.graphdb.janus.migration;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.repository.Constants;
-import org.apache.atlas.type.AtlasBuiltInTypes;
-import org.apache.commons.lang.StringUtils;
+import org.apache.atlas.type.AtlasBuiltInTypes.AtlasBigDecimalType;
+import org.apache.atlas.type.AtlasBuiltInTypes.AtlasBigIntegerType;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Graph.Features.EdgeFeatures;
@@ -34,21 +33,17 @@ import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
-import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_STATE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
-import static 
org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
-
 class GraphSONUtility {
     private static final Logger LOG = 
LoggerFactory.getLogger(GraphSONUtility.class);
 
-    private static final String EMPTY_STRING = "";
+    private static final String              EMPTY_STRING   = "";
+    private static final AtlasBigIntegerType bigIntegerType = new 
AtlasBigIntegerType();
+    private static final AtlasBigDecimalType bigDecimalType = new 
AtlasBigDecimalType();
 
-    private final RelationshipTypeCache relationshipTypeCache;
-    private static AtlasBuiltInTypes.AtlasBigIntegerType bigIntegerType = new 
AtlasBuiltInTypes.AtlasBigIntegerType();
-    private static AtlasBuiltInTypes.AtlasBigDecimalType bigDecimalType = new 
AtlasBuiltInTypes.AtlasBigDecimalType();
+    private final ElementProcessors elementProcessors;
 
-    public GraphSONUtility(final RelationshipTypeCache relationshipTypeCache) {
-        this.relationshipTypeCache = relationshipTypeCache;
+    public GraphSONUtility(final ElementProcessors elementProcessors) {
+        this.elementProcessors = elementProcessors;
     }
 
     public Map<String, Object> vertexFromJson(Graph g, final JsonNode json) {
@@ -64,6 +59,7 @@ class GraphSONUtility {
         Vertex              vertex         = 
vertexFeatures.willAllowId(vertexId) ? g.addVertex(T.id, vertexId) : 
g.addVertex();
 
         props.put(Constants.VERTEX_ID_IN_IMPORT_KEY, vertexId);
+        
elementProcessors.processCollections(Constants.ENTITY_TYPE_PROPERTY_KEY, props);
 
         for (Map.Entry<String, Object> entry : props.entrySet()) {
             try {
@@ -107,17 +103,11 @@ class GraphSONUtility {
 
             props.put(Constants.EDGE_ID_IN_IMPORT_KEY, edgeId.toString());
 
-            if(addRelationshipTypeForClassification(in, out, label, props)) {
-                label = Constants.CLASSIFICATION_LABEL;
-            } else {
-                addRelationshipTypeName(label, props);
-            }
+            label = elementProcessors.updateEdge(in, out, edgeId, label, 
props);
 
             EdgeFeatures  edgeFeatures = g.features().edge();
             final Edge    edge         = edgeFeatures.willAllowId(edgeId) ? 
out.addEdge(label, in, T.id, edgeId) : out.addEdge(label, in);
 
-            addMandatoryRelationshipProperties(props);
-
             for (Map.Entry<String, Object> entry : props.entrySet()) {
                 try {
                     edge.property(entry.getKey(), entry.getValue());
@@ -153,82 +143,6 @@ class GraphSONUtility {
         return cache.getMappedVertex(gr, inVId);
     }
 
-    private boolean addRelationshipTypeForClassification(Vertex in, Vertex 
out, String label, Map<String, Object> props) {
-        if (in.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
-            String inTypeName  = (String) 
in.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value();
-
-            if (inTypeName.equals(label)) {
-                if (StringUtils.isNotEmpty(inTypeName)) {
-                    props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, inTypeName);
-
-                    addEntityGuidToTrait(in, out);
-
-                    return true;
-                } else {
-                    LOG.info("Could not find typeName for trait: {}", label);
-                }
-            }
-        }
-
-        return false;
-    }
-
-    private void addEntityGuidToTrait(Vertex in, Vertex out) {
-        String entityGuid = "";
-        if (out.property(Constants.GUID_PROPERTY_KEY).isPresent()) {
-            entityGuid = (String) 
out.property(Constants.GUID_PROPERTY_KEY).value();
-        }
-
-        if(StringUtils.isNotEmpty(entityGuid)) {
-            in.property(CLASSIFICATION_ENTITY_GUID, entityGuid);
-        }
-    }
-
-    private void addRelationshipTypeName(String edgeLabel, Map<String, Object> 
props) {
-        String typeName = relationshipTypeCache.get(edgeLabel);
-
-        if (StringUtils.isNotEmpty(typeName)) {
-            props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName);
-        } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Could not find relationship type for: {}", 
edgeLabel);
-            }
-        }
-    }
-
-    private void addMandatoryRelationshipProperties(Map<String, Object> props) 
{
-        props.put(Constants.RELATIONSHIP_GUID_PROPERTY_KEY, 
UUID.randomUUID().toString());
-        props.put(RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, "NONE");
-        props.put(CLASSIFICATION_EDGE_STATE_PROPERTY_KEY, "ACTIVE");
-    }
-
-    public void replaceReferencedEdgeIdForList(Graph g, MappedElementCache 
cache, Vertex v, String propertyName) {
-        try {
-            if (v.property(Constants.TYPENAME_PROPERTY_KEY).isPresent() || 
!v.property(propertyName).isPresent()) {
-                return;
-            }
-
-            List list = (List) v.property(propertyName).value();
-            for (int i = 0; i < list.size(); i++) {
-                String id    = list.get(i).toString();
-                Object newId = cache.getMappedEdge(g, id);
-
-                if (newId == null) {
-                    continue;
-                }
-
-                list.set(i, newId.toString());
-            }
-
-            v.property(propertyName, list);
-        } catch (IllegalArgumentException ex) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("processItem: IllegalArgumentException: v[{}] 
error!", v.id(), ex);
-            }
-        }
-    }
-
-    @VisibleForTesting
     static Map<String, Object> readProperties(final JsonNode node) {
         final Map<String, Object>                   map      = new HashMap<>();
         final Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
@@ -303,7 +217,6 @@ class GraphSONUtility {
         return array;
     }
 
-    @VisibleForTesting
     static Object getTypedValueFromJsonNode(final JsonNode node) {
         Object theValue = null;
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java
index e4b6ee2..e2f418e 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java
@@ -101,6 +101,7 @@ public class JsonNodeProcessManager {
 
         private void commitRegular() {
             commit(graph, nodes.size());
+            cache.clearAll();
         }
 
         private void commit(Graph g, int size) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java
index cca72ad..817e8dc 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java
@@ -18,9 +18,7 @@
 
 package org.apache.atlas.repository.graphdb.janus.migration;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.utils.LruCache;
-import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.slf4j.Logger;
@@ -28,18 +26,13 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
-import static org.apache.atlas.repository.Constants.EDGE_ID_IN_IMPORT_KEY;
 import static org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY;
 
 public class MappedElementCache {
     private static final Logger LOG = 
LoggerFactory.getLogger(MappedElementCache.class);
 
-    @VisibleForTesting
     final Map<Object, Vertex> lruVertexCache = new LruCache<>(500, 100000);
 
-    @VisibleForTesting
-    final Map<String, String> lruEdgeCache   = new LruCache<>(500, 100000);
-
     public Vertex getMappedVertex(Graph gr, Object key) {
         try {
             Vertex ret = lruVertexCache.get(key);
@@ -62,32 +55,6 @@ public class MappedElementCache {
         }
     }
 
-    public String getMappedEdge(Graph gr, String key) {
-        try {
-            String ret = lruEdgeCache.get(key);
-
-            if (ret == null) {
-                synchronized (lruEdgeCache) {
-                    ret = lruEdgeCache.get(key);
-
-                    if (ret == null) {
-                        Edge e = fetchEdge(gr, key);
-
-                        ret = e.id().toString();
-
-                        lruEdgeCache.put(key, ret);
-                    }
-                }
-            }
-
-            return ret;
-        } catch (Exception ex) {
-            LOG.error("getMappedEdge: {}", key, ex);
-            return null;
-        }
-    }
-
-    @VisibleForTesting
     Vertex fetchVertex(Graph gr, Object key) {
         try {
             return gr.traversal().V().has(VERTEX_ID_IN_IMPORT_KEY, key).next();
@@ -97,18 +64,7 @@ public class MappedElementCache {
         }
     }
 
-    @VisibleForTesting
-    Edge fetchEdge(Graph gr, String key) {
-        try {
-            return gr.traversal().E().has(EDGE_ID_IN_IMPORT_KEY, key).next();
-        } catch (Exception ex) {
-            LOG.error("fetchEdge: fetchFromDB failed: {}", key);
-            return null;
-        }
-    }
-
     public void clearAll() {
         lruVertexCache.clear();
-        lruEdgeCache.clear();
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java
index d0a65f7..7046f8c 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java
@@ -21,36 +21,42 @@ package org.apache.atlas.repository.graphdb.janus.migration;
 import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemBuilder;
 import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer;
 import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemManager;
+import 
org.apache.atlas.repository.graphdb.janus.migration.postProcess.PostProcessListProperty;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 
+import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TYPENAME_PROPERTY_KEY;
+
 public class PostProcessManager {
     static class Consumer extends WorkItemConsumer<Object> {
         private static final Logger LOG = 
LoggerFactory.getLogger(Consumer.class);
 
-        private final Graph              bulkLoadGraph;
-        private final GraphSONUtility    utility;
-        private final String[]           properties;
-        private final MappedElementCache cache;
-        private final int                batchSize;
-        private       long               counter;
-        private       long               batchCounter;
+        private final Graph                                  bulkLoadGraph;
+        private final Map<String, Map<String, List<String>>> typePropertiesMap;
+        private final int                                    batchSize;
+        private       long                                   counter;
+        private       long                                   batchCounter;
+        private final PostProcessListProperty                processor;
+        private final String[]                               
nonPrimitiveCategoryKeys;
 
-        public Consumer(BlockingQueue<Object> queue, Graph bulkLoadGraph, 
GraphSONUtility utility,
-                        String[] properties, MappedElementCache cache, int 
batchSize) {
+        public Consumer(BlockingQueue<Object> queue, Graph bulkLoadGraph, 
Map<String, Map<String, List<String>>> typePropertiesMap, int batchSize) {
             super(queue);
 
-            this.bulkLoadGraph = bulkLoadGraph;
-            this.utility         = utility;
-            this.properties      = properties;
-            this.cache           = cache;
-            this.batchSize       = batchSize;
-            this.counter         = 0;
-            this.batchCounter    = 0;
+            this.bulkLoadGraph            = bulkLoadGraph;
+            this.typePropertiesMap        = typePropertiesMap;
+            this.batchSize                = batchSize;
+            this.counter                  = 0;
+            this.batchCounter             = 0;
+            this.processor                = new PostProcessListProperty();
+            this.nonPrimitiveCategoryKeys = 
ElementProcessors.getNonPrimitiveCategoryKeys();
         }
 
         @Override
@@ -59,23 +65,44 @@ public class PostProcessManager {
             counter++;
 
             try {
-                Vertex v = bulkLoadGraph.traversal().V(vertexId).next();
-
-                for (String p : properties) {
-                    utility.replaceReferencedEdgeIdForList(bulkLoadGraph, 
cache, v, p);
+                Vertex         vertex           = 
bulkLoadGraph.traversal().V(vertexId).next();
+                boolean        isTypeVertex     = 
vertex.property(TYPENAME_PROPERTY_KEY).isPresent();
+                VertexProperty typeNameProperty = 
vertex.property(ENTITY_TYPE_PROPERTY_KEY);
+
+                if (!isTypeVertex && typeNameProperty.isPresent()) {
+                    String typeName = (String) typeNameProperty.value();
+                    if (!typePropertiesMap.containsKey(typeName)) {
+                        return;
+                    }
+
+                    Map<String, List<String>> collectionTypeProperties = 
typePropertiesMap.get(typeName);
+                    for (String key : nonPrimitiveCategoryKeys) {
+                        if (!collectionTypeProperties.containsKey(key)) {
+                            continue;
+                        }
+
+                        for(String propertyName : 
collectionTypeProperties.get(key)) {
+                            processor.process(vertex, typeName, propertyName);
+                        }
+                    }
                 }
 
-                if (batchCounter >= batchSize) {
-                    LOG.info("[{}]: batch: {}: commit", counter, batchCounter);
-                    commit();
-                    batchCounter = 0;
-                }
-            }
-            catch (Exception ex) {
+                commitBatch();
+            } catch (Exception ex) {
                 LOG.error("processItem: v[{}] error!", vertexId, ex);
             }
         }
 
+        private void commitBatch() {
+            if (batchCounter >= batchSize) {
+                LOG.info("[{}]: batch: {}: commit", counter, batchCounter);
+
+                commit();
+
+                batchCounter = 0;
+            }
+        }
+
         @Override
         protected void doCommit() {
             bulkLoadGraph.tx().commit();
@@ -83,23 +110,19 @@ public class PostProcessManager {
     }
 
     private static class ConsumerBuilder implements WorkItemBuilder<Consumer, 
Object> {
-        private final Graph              bulkLoadGraph;
-        private final GraphSONUtility    utility;
-        private final int                batchSize;
-        private final MappedElementCache cache;
-        private final String[]           vertexPropertiesToPostProcess;
+        private final Graph                                  bulkLoadGraph;
+        private final int                                    batchSize;
+        private final Map<String, Map<String, List<String>>> 
vertexPropertiesToPostProcess;
 
-        public ConsumerBuilder(Graph bulkLoadGraph, GraphSONUtility utility, 
String[] propertiesToPostProcess, int batchSize) {
+        public ConsumerBuilder(Graph bulkLoadGraph, Map<String, Map<String, 
List<String>>> propertiesToPostProcess, int batchSize) {
             this.bulkLoadGraph                 = bulkLoadGraph;
-            this.utility                       = utility;
             this.batchSize                     = batchSize;
-            this.cache                         = new MappedElementCache();
             this.vertexPropertiesToPostProcess = propertiesToPostProcess;
         }
 
         @Override
         public Consumer build(BlockingQueue<Object> queue) {
-            return new Consumer(queue, bulkLoadGraph, utility, 
vertexPropertiesToPostProcess, cache, batchSize);
+            return new Consumer(queue, bulkLoadGraph, 
vertexPropertiesToPostProcess, batchSize);
         }
     }
 
@@ -109,8 +132,9 @@ public class PostProcessManager {
         }
     }
 
-    public static WorkItemsManager create(Graph bGraph, GraphSONUtility 
utility, String[] propertiesToPostProcess, int batchSize, int numWorkers) {
-        ConsumerBuilder cb = new ConsumerBuilder(bGraph, utility, 
propertiesToPostProcess, batchSize);
+    public static WorkItemsManager create(Graph bGraph, Map<String, 
Map<String, List<String>>> propertiesToPostProcess,
+                                          int batchSize, int numWorkers) {
+        ConsumerBuilder cb = new ConsumerBuilder(bGraph, 
propertiesToPostProcess, batchSize);
 
         return new WorkItemsManager(cb, batchSize, numWorkers);
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipCacheGenerator.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipCacheGenerator.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipCacheGenerator.java
new file mode 100644
index 0000000..c4ee179
--- /dev/null
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipCacheGenerator.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graphdb.janus.migration;
+
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.type.AtlasRelationshipType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RelationshipCacheGenerator {
+
+    public static class TypeInfo extends TypesUtil.Pair<String, PropagateTags> 
{
+
+        public TypeInfo(String typeName, PropagateTags propagateTags) {
+            super(typeName, propagateTags);
+        }
+
+        public String getTypeName() {
+            return left;
+        }
+
+        public PropagateTags getPropagateTags() {
+            return right;
+        }
+    }
+
+
+    public static Map<String, TypeInfo> get(AtlasTypeRegistry typeRegistry) {
+        Map<String, TypeInfo> ret = new HashMap<>();
+
+        for (AtlasRelationshipType relType : 
typeRegistry.getAllRelationshipTypes()) {
+            AtlasRelationshipDef relDef      = relType.getRelationshipDef();
+            String               relTypeName = relType.getTypeName();
+
+            add(ret, getKey(relDef.getEndDef1()), relTypeName, 
relDef.getPropagateTags());
+            add(ret, getKey(relDef.getEndDef2()), relTypeName, 
getEnd2PropagateTag(relDef.getPropagateTags()));
+        }
+
+        return ret;
+    }
+
+    private static String getKey(AtlasRelationshipEndDef endDef) {
+        return getKey(endDef.getIsLegacyAttribute(), endDef.getType(), 
endDef.getName());
+    }
+
+    private static String getKey(String lhs, String rhs) {
+        return String.format("%s%s.%s", 
Constants.INTERNAL_PROPERTY_KEY_PREFIX, lhs, rhs);
+    }
+
+    private static String getKey(boolean isLegacy, String entityTypeName, 
String relEndName) {
+        if (!isLegacy) {
+            return "";
+        }
+
+        return getKey(entityTypeName, relEndName);
+    }
+
+    private static void add(Map<String, TypeInfo> map, String key, String 
relationTypeName, PropagateTags propagateTags) {
+        if (StringUtils.isEmpty(key) || map.containsKey(key)) {
+            return;
+        }
+
+        map.put(key, new TypeInfo(relationTypeName, propagateTags));
+    }
+
+    private static PropagateTags getEnd2PropagateTag(PropagateTags 
end1PropagateTags) {
+        if (end1PropagateTags == PropagateTags.ONE_TO_TWO) {
+            return PropagateTags.TWO_TO_ONE;
+        } else if (end1PropagateTags == PropagateTags.TWO_TO_ONE) {
+            return PropagateTags.ONE_TO_TWO;
+        } else {
+            return end1PropagateTags;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java
deleted file mode 100644
index e4e8264..0000000
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.repository.graphdb.janus.migration;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class RelationshipTypeCache {
-    private static final Logger LOG = 
LoggerFactory.getLogger(RelationshipTypeCache.class);
-    private final Map<String, String> relationshipLookup;
-
-    public RelationshipTypeCache(Map<String, String> lookup) {
-        relationshipLookup = lookup;
-    }
-
-    public String get(String label) {
-        return relationshipLookup.get(label);
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/TypesWithCollectionsFinder.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/TypesWithCollectionsFinder.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/TypesWithCollectionsFinder.java
new file mode 100644
index 0000000..55aa9c9
--- /dev/null
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/TypesWithCollectionsFinder.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graphdb.janus.migration;
+
+import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.type.AtlasArrayType;
+import org.apache.atlas.type.AtlasMapType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.model.TypeCategory.*;
+
+public class TypesWithCollectionsFinder {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TypesWithCollectionsFinder.class);
+
+    static final EnumSet<TypeCategory> nonPrimitives = EnumSet.of(ENTITY, 
STRUCT, OBJECT_ID_TYPE);
+
+    public static Map<String, Map<String, List<String>>> 
getVertexPropertiesForCollectionAttributes(AtlasTypeRegistry typeRegistry) {
+        Map<String, Map<String, List<String>>> ret = new HashMap<>();
+
+        
addVertexPropertiesForCollectionAttributes(typeRegistry.getAllEntityTypes(), 
ret);
+        
addVertexPropertiesForCollectionAttributes(typeRegistry.getAllStructTypes(), 
ret);
+
+        displayInfo("types with properties: ", ret);
+
+        return ret;
+    }
+
+    private static void 
addVertexPropertiesForCollectionAttributes(Collection<? extends 
AtlasStructType> types, Map<String, Map<String, List<String>>> typeAttrMap) {
+        for (AtlasStructType type : types) {
+            Map<String, List<String>> collectionProperties = 
getVertexPropertiesForCollectionAttributes(type);
+
+            if(collectionProperties != null && collectionProperties.size() > 
0) {
+                typeAttrMap.put(type.getTypeName(), collectionProperties);
+            }
+        }
+    }
+
+    static Map<String, List<String>> 
getVertexPropertiesForCollectionAttributes(AtlasStructType type) {
+        try {
+            Map<String, List<String>> collectionProperties = new HashMap<>();
+
+            for (AtlasAttribute attr : type.getAllAttributes().values()) {
+                addIfCollectionAttribute(attr, collectionProperties);
+            }
+
+            return collectionProperties;
+        } catch (Exception e) {
+            LOG.error("addVertexPropertiesForCollectionAttributes", e);
+        }
+
+        return null;
+    }
+
+    private static void addIfCollectionAttribute(AtlasAttribute attr, 
Map<String, List<String>> collectionProperties) {
+        AtlasType    attrType         = attr.getAttributeType();
+        TypeCategory attrTypeCategory = attrType.getTypeCategory();
+
+        switch (attrTypeCategory) {
+            case ARRAY: {
+                TypeCategory arrayElementType = ((AtlasArrayType) 
attrType).getElementType().getTypeCategory();
+
+                if (nonPrimitives.contains(arrayElementType)) {
+                    addVertexProperty(attrTypeCategory.toString(), 
attr.getVertexPropertyName(), collectionProperties);
+                }
+            }
+            break;
+
+            case MAP: {
+                TypeCategory mapValueType = ((AtlasMapType) 
attrType).getValueType().getTypeCategory();
+
+                if (nonPrimitives.contains(mapValueType)) {
+                    addVertexProperty(attrTypeCategory.toString(), 
attr.getVertexPropertyName(), collectionProperties);
+                } else {
+                    addVertexProperty(attrTypeCategory.toString() + 
"_PRIMITIVE", attr.getVertexPropertyName(), collectionProperties);
+                }
+            }
+            break;
+        }
+    }
+
+    private static void addVertexProperty(String collectionType, String 
propertyName, Map<String, List<String>> collectionProperties) {
+        if(!collectionProperties.containsKey(collectionType)) {
+            collectionProperties.put(collectionType, new ArrayList<>());
+        }
+
+        collectionProperties.get(collectionType).add(propertyName);
+    }
+
+    static void displayInfo(String message, Map<String, Map<String, 
List<String>>> map) {
+        LOG.info(message);
+        for (Map.Entry<String, Map<String, List<String>>> e : map.entrySet()) {
+            LOG.info("  type: {} : {}", e.getKey(), e.getValue());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/47ec9f7a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/postProcess/PostProcessListProperty.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/postProcess/PostProcessListProperty.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/postProcess/PostProcessListProperty.java
new file mode 100644
index 0000000..8270ae9
--- /dev/null
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/postProcess/PostProcessListProperty.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graphdb.janus.migration.postProcess;
+
+import org.apache.atlas.repository.Constants;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PostProcessListProperty {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostProcessListProperty.class);
+
+    public void process(Vertex v, String typeName, String propertyName) {
+        try {
+            if (doesNotHaveProperty(v, typeName) || !hasProperty(v, 
propertyName)) {
+                return;
+            }
+
+            removeProperty(v, propertyName);
+        } catch (IllegalArgumentException ex) {
+            LOG.error("process: IllegalArgumentException: v[{}] error!", 
v.id(), ex);
+        }
+    }
+
+    protected void removeProperty(Vertex v, String propertyName) {
+        v.property(propertyName).remove();
+    }
+
+    protected boolean doesNotHaveProperty(Vertex v, String typeName) {
+        return v.property(Constants.TYPENAME_PROPERTY_KEY).isPresent() || 
!isInstanceVertexOfType(v, typeName);
+    }
+
+    private boolean hasProperty(Vertex v, String propertyName) {
+        try {
+            return v.property(propertyName).isPresent();
+        } catch(Exception ex) {
+            // ...
+        }
+
+        return false;
+    }
+
+    private boolean isInstanceVertexOfType(Vertex v, String typeName) {
+        if(v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
+            String s = (String) 
v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value();
+
+            return s.equals(typeName);
+        }
+
+        return false;
+    }
+}

Reply via email to