This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 024f5d52d0a972616362ac9b15ed7a417384026f
Author: nixonrodrigues <ni...@apache.org>
AuthorDate: Wed Feb 26 14:51:37 2020 +0530

    Revert "ATLAS-3320: Import Service. Support concurrent ingest."
    
    This reverts commit a2ccfb9f3577e911103041d8d4b91c169697f6a1.
---
 .../repository/graphdb/janus/AtlasJanusGraph.java  |   2 +-
 .../java/org/apache/atlas/AtlasConfiguration.java  |   1 -
 .../atlas/model/impexp/AtlasImportRequest.java     |  43 +---
 .../java/org/apache/atlas/pc/WorkItemConsumer.java |  11 +-
 .../java/org/apache/atlas/pc/WorkItemManager.java  |   9 +-
 .../apache/atlas/GraphTransactionInterceptor.java  |   4 -
 .../atlas/repository/impexp/AuditsWriter.java      |   3 +-
 .../atlas/repository/impexp/ImportService.java     |  22 +-
 .../repository/impexp/ZipExportFileNames.java      |   4 -
 .../atlas/repository/impexp/ZipSourceDirect.java   | 269 ---------------------
 .../migration/ZipFileMigrationImporter.java        |  58 +----
 .../repository/patches/UniqueAttributePatch.java   |   4 +-
 .../repository/store/graph/AtlasEntityStore.java   |   8 -
 .../store/graph/v2/AtlasEntityStoreV2.java         |  11 +-
 .../store/graph/v2/AtlasRelationshipStoreV2.java   |   4 -
 .../store/graph/v2/BulkImporterImpl.java           | 228 +++++++++++++----
 .../store/graph/v2/EntityGraphMapper.java          |  41 +---
 .../store/graph/v2/bulkimport/ImportStrategy.java  |  28 ---
 .../store/graph/v2/bulkimport/MigrationImport.java | 122 ----------
 .../store/graph/v2/bulkimport/RegularImport.java   | 261 --------------------
 .../graph/v2/bulkimport/pc/EntityConsumer.java     | 213 ----------------
 .../v2/bulkimport/pc/EntityConsumerBuilder.java    |  50 ----
 .../v2/bulkimport/pc/EntityCreationManager.java    | 130 ----------
 .../graph/v2/bulkimport/pc/StatusReporter.java     | 131 ----------
 .../atlas/repository/impexp/ImportServiceTest.java |  16 --
 .../repository/impexp/MigrationImportTest.java     |  77 ------
 .../repository/impexp/StatusReporterTest.java      |  99 --------
 .../atlas/repository/impexp/ZipDirectTest.java     |  61 -----
 .../impexp/ZipFileResourceTestUtils.java           |   7 +-
 repository/src/test/resources/zip-direct-1.zip     | Bin 22 -> 0 bytes
 repository/src/test/resources/zip-direct-2.zip     | Bin 1720553 -> 0 bytes
 31 files changed, 227 insertions(+), 1690 deletions(-)

diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
index 0176ba7..4acb371 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
@@ -116,7 +116,7 @@ public class AtlasJanusGraph implements 
AtlasGraph<AtlasJanusVertex, AtlasJanusE
             }
         }
 
-        janusGraph = (StandardJanusGraph) graphInstance;
+        janusGraph = (StandardJanusGraph) 
AtlasJanusGraphDatabase.getGraphInstance();
     }
 
     @Override
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java 
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index f8d7f8c..1a0d0cc 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -64,7 +64,6 @@ public enum AtlasConfiguration {
     
CUSTOM_ATTRIBUTE_VALUE_MAX_LENGTH("atlas.custom.attribute.value.max.length", 
500),
     LABEL_MAX_LENGTH("atlas.entity.label.max.length", 50),
     IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""),
-    MIGRATION_IMPORT_START_POSITION("atlas.migration.import.start.position", 
0),
     LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false);
 
     private static final Configuration APPLICATION_PROPERTIES;
diff --git 
a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java 
b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 0ad3673..0b3ede9 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -44,16 +44,10 @@ public class AtlasImportRequest implements Serializable {
     public  static final String TRANSFORMS_KEY             = "transforms";
     public  static final String TRANSFORMERS_KEY           = "transformers";
     public  static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
-    public  static final String OPTION_KEY_MIGRATION       = "migration";
-    public  static final String OPTION_KEY_NUM_WORKERS     = "numWorkers";
-    public  static final String OPTION_KEY_BATCH_SIZE      = "batchSize";
-    public  static final String OPTION_KEY_FORMAT          = "format";
-    public  static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
-    public  static final String START_POSITION_KEY         = "startPosition";
+    private static final String START_POSITION_KEY         = "startPosition";
     private static final String START_GUID_KEY             = "startGuid";
     private static final String FILE_NAME_KEY              = "fileName";
     private static final String UPDATE_TYPE_DEFINITION_KEY = 
"updateTypeDefinition";
-    private static final String OPTION_KEY_STREAM_SIZE     = "size";
 
     private Map<String, String> options;
 
@@ -114,7 +108,7 @@ public class AtlasImportRequest implements Serializable {
             return null;
         }
 
-        return this.options.get(key);
+        return (String) this.options.get(key);
     }
 
     @JsonIgnore
@@ -127,41 +121,10 @@ public class AtlasImportRequest implements Serializable {
         return isReplicationOptionSet() ? 
options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY;
     }
 
-    @JsonIgnore
-    public int getOptionKeyNumWorkers() {
-        return getOptionsValue(OPTION_KEY_NUM_WORKERS, 1);
-    }
-
-    @JsonIgnore
-    public int getOptionKeyBatchSize() {
-        return getOptionsValue(OPTION_KEY_BATCH_SIZE, 1);
-    }
-
-    private int getOptionsValue(String optionKeyBatchSize, int defaultValue) {
-        String optionsValue = getOptionForKey(optionKeyBatchSize);
-
-        return StringUtils.isEmpty(optionsValue) ?
-                defaultValue :
-                Integer.valueOf(optionsValue);
-    }
-
     @JsonAnySetter
     public void setOption(String key, String value) {
         if (null == options) {
             options = new HashMap<>();
         }
         options.put(key, value);
-    }
-
-    public void setSizeOption(int size) {
-        setOption(OPTION_KEY_STREAM_SIZE, Integer.toString(size));
-    }
-
-    public int getSizeOption() {
-        if (!this.options.containsKey(OPTION_KEY_STREAM_SIZE)) {
-            return 1;
-        }
-
-        return Integer.valueOf(this.options.get(OPTION_KEY_STREAM_SIZE));
-    }
-}
+    }}
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java 
b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
index dd76697..9ba4bf4 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
@@ -21,7 +21,6 @@ package org.apache.atlas.pc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -38,7 +37,7 @@ public abstract class WorkItemConsumer<T> implements Runnable 
{
     private final AtomicBoolean    isDirty           = new 
AtomicBoolean(false);
     private final AtomicLong       maxCommitTimeInMs = new 
AtomicLong(DEFAULT_COMMIT_TIME_IN_MS);
     private CountDownLatch         countdownLatch;
-    private Queue<Object>          results;
+    private BlockingQueue<Object>  results;
 
     public WorkItemConsumer(BlockingQueue<T> queue) {
         this.queue          = queue;
@@ -102,7 +101,11 @@ public abstract class WorkItemConsumer<T> implements 
Runnable {
     protected abstract void processItem(T item);
 
     protected void addResult(Object value) {
-        results.add(value);
+        try {
+            results.put(value);
+        } catch (InterruptedException e) {
+            LOG.error("Interrupted while adding result: {}", value);
+        }
     }
 
     protected void updateCommitTime(long commitTime) {
@@ -115,7 +118,7 @@ public abstract class WorkItemConsumer<T> implements 
Runnable {
         this.countdownLatch = countdownLatch;
     }
 
-    public <V> void setResults(Queue<Object> queue) {
+    public <V> void setResults(BlockingQueue<Object> queue) {
         this.results = queue;
     }
 }
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java 
b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
index 351421e..a7ba67c 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.*;
 
 public class WorkItemManager<T, U extends WorkItemConsumer> {
@@ -34,7 +33,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
     private final ExecutorService  service;
     private final List<U>          consumers = new ArrayList<>();
     private CountDownLatch         countdownLatch;
-    private Queue<Object>          resultsQueue;
+    private BlockingQueue<Object>  resultsQueue;
 
     public WorkItemManager(WorkItemBuilder builder, String namePrefix, int 
batchSize, int numWorkers, boolean collectResults) {
         this.numWorkers = numWorkers;
@@ -50,13 +49,13 @@ public class WorkItemManager<T, U extends WorkItemConsumer> 
{
         this(builder, "workItemConsumer", batchSize, numWorkers, false);
     }
 
-    public void setResultsCollection(Queue<Object> resultsQueue) {
+    public void setResultsCollection(BlockingQueue<Object> resultsQueue) {
         this.resultsQueue = resultsQueue;
     }
 
     private void createConsumers(WorkItemBuilder builder, int numWorkers, 
boolean collectResults) {
         if (collectResults) {
-            setResultsCollection(new ConcurrentLinkedQueue<>());
+            setResultsCollection(new LinkedBlockingQueue<>());
         }
 
         for (int i = 0; i < numWorkers; i++) {
@@ -125,7 +124,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> 
{
         LOG.info("WorkItemManager: Shutdown done!");
     }
 
-    public Queue getResults() {
+    public BlockingQueue getResults() {
         return this.resultsQueue;
     }
 
diff --git 
a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java 
b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index 57e454a..bbe0dc5 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -199,10 +199,6 @@ public class GraphTransactionInterceptor implements 
MethodInterceptor {
         return cache.get(guid);
     }
 
-    public static void clearCache() {
-        guidVertexCache.get().clear();
-    }
-
     boolean logException(Throwable t) {
         if (t instanceof AtlasBaseException) {
             Response.Status httpCode = ((AtlasBaseException) 
t).getAtlasErrorCode().getHttpCode();
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java 
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index 373921d..55990f7 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -247,8 +247,7 @@ public class AuditsWriter {
             }
 
             updateReplicationAttribute(replicationOptionState, 
sourceServerName, sourceServerFullName, entityGuids,
-                    Constants.ATTR_NAME_REPLICATED_FROM,
-                    (result.getExportResult() != null) ? 
result.getExportResult().getChangeMarker() : 0);
+                    Constants.ATTR_NAME_REPLICATED_FROM, 
result.getExportResult().getChangeMarker());
         }
 
         public void add(String userName, String sourceCluster, long startTime,
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index cd1deab..27001e3 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -92,7 +92,7 @@ public class ImportService {
             request = new AtlasImportRequest();
         }
 
-        EntityImportStream source = createZipSource(request, inputStream, 
AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
+        EntityImportStream source = createZipSource(inputStream, 
AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
         return run(source, request, userName, hostName, requestingIP);
     }
 
@@ -248,18 +248,8 @@ public class ImportService {
         return (int) (endTime - startTime);
     }
 
-    private EntityImportStream createZipSource(AtlasImportRequest request, 
InputStream inputStream, String configuredTemporaryDirectory) throws 
AtlasBaseException {
+    private EntityImportStream createZipSource(InputStream inputStream, String 
configuredTemporaryDirectory) throws AtlasBaseException {
         try {
-            if 
(request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) {
-                LOG.info("Migration mode: Detected...", 
request.getOptions().get("size"));
-                return getZipDirectEntityImportStream(request, inputStream);
-            }
-
-            if 
(request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
-                    
request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT)
 ) {
-                return getZipDirectEntityImportStream(request, inputStream);
-            }
-
             if (StringUtils.isEmpty(configuredTemporaryDirectory)) {
                 return new ZipSource(inputStream);
             }
@@ -270,15 +260,9 @@ public class ImportService {
         }
     }
 
-    private EntityImportStream 
getZipDirectEntityImportStream(AtlasImportRequest request, InputStream 
inputStream) throws IOException, AtlasBaseException {
-        ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, 
request.getSizeOption());
-        LOG.info("Using ZipSourceDirect: Size: {} entities", 
zipSourceDirect.size());
-        return zipSourceDirect;
-    }
-
     @VisibleForTesting
     boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest 
importRequest, AtlasExportRequest exportRequest) {
-        if (exportRequest == null || 
CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
+        if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
             return false;
         }
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
index 8347b91..351b475 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
@@ -31,8 +31,4 @@ public enum ZipExportFileNames {
     public String toString() {
         return this.name;
     }
-
-    public String toEntryFileName() {
-        return this.name + ".json";
-    }
 }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
deleted file mode 100644
index 260c4af..0000000
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.repository.impexp;
-
-import org.apache.atlas.entitytransform.BaseEntityHandler;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasExportResult;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.typedef.AtlasTypesDef;
-import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
-import org.apache.atlas.type.AtlasType;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
-
-import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP;
-
-public class ZipSourceDirect implements EntityImportStream {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ZipSourceDirect.class);
-
-    private final ZipInputStream zipInputStream;
-    private int currentPosition;
-
-    private ImportTransforms importTransform;
-    private List<BaseEntityHandler> entityHandlers;
-    private AtlasTypesDef typesDef;
-    private ZipEntry zipEntryNext;
-    private int streamSize = 1;
-
-    public ZipSourceDirect(InputStream inputStream, int streamSize) throws 
IOException, AtlasBaseException {
-        this.zipInputStream = new ZipInputStream(inputStream);
-        this.streamSize = streamSize;
-        prepareStreamForFetch();
-    }
-
-    @Override
-    public ImportTransforms getImportTransform() { return 
this.importTransform; }
-
-    @Override
-    public void setImportTransform(ImportTransforms importTransform) {
-        this.importTransform = importTransform;
-    }
-
-    @Override
-    public List<BaseEntityHandler> getEntityHandlers() {
-        return entityHandlers;
-    }
-
-    @Override
-    public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
-        this.entityHandlers = entityHandlers;
-    }
-
-    @Override
-    public AtlasTypesDef getTypesDef() throws AtlasBaseException {
-        return this.typesDef;
-    }
-
-    @Override
-    public
-    AtlasExportResult getExportResult() throws AtlasBaseException {
-        return new AtlasExportResult();
-    }
-
-    @Override
-    public List<String> getCreationOrder() {
-        return new ArrayList<>();
-    }
-
-    @Override
-    public int getPosition() {
-        return currentPosition;
-    }
-
-    @Override
-    public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String 
json) throws AtlasBaseException {
-        if (StringUtils.isEmpty(json)) {
-            return null;
-        }
-
-        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = 
convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json);
-
-        if (importTransform != null) {
-            entityWithExtInfo = importTransform.apply(entityWithExtInfo);
-        }
-
-        if (entityHandlers != null) {
-            applyTransformers(entityWithExtInfo);
-        }
-
-        return entityWithExtInfo;
-    }
-
-    @Override
-    public boolean hasNext() {
-        return (this.zipEntryNext != null
-                && 
!zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toEntryFileName())
-                && 
!zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toEntryFileName()));
-    }
-
-    @Override
-    public AtlasEntity next() {
-        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = 
getNextEntityWithExtInfo();
-
-        return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : 
null;
-    }
-
-    @Override
-    public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
-        try {
-            if (hasNext()) {
-                String json = moveNext();
-                return getEntityWithExtInfo(json);
-            }
-        } catch (AtlasBaseException e) {
-            LOG.error("getNextEntityWithExtInfo", e);
-        }
-        return null;
-    }
-
-    @Override
-    public void reset() {
-        currentPosition = 0;
-    }
-
-    @Override
-    public AtlasEntity getByGuid(String guid) {
-        try {
-            return getEntity(guid);
-        } catch (AtlasBaseException e) {
-            LOG.error("getByGuid: {} failed!", guid, e);
-            return null;
-        }
-    }
-
-    @Override
-    public void onImportComplete(String guid) {
-    }
-
-    @Override
-    public void setPosition(int index) {
-        try {
-            for (int i = 0; i < index; i++) {
-                moveNextEntry();
-            }
-        }
-        catch (IOException e) {
-            LOG.error("Error setting position: {}. Position may be beyond the 
stream size.", index);
-        }
-    }
-
-    @Override
-    public void setPositionUsingEntityGuid(String guid) {
-    }
-
-    @Override
-    public void close() {
-    }
-
-    private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo) {
-        if (entityWithExtInfo == null) {
-            return;
-        }
-
-        transform(entityWithExtInfo.getEntity());
-
-        if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
-            for (AtlasEntity e : 
entityWithExtInfo.getReferredEntities().values()) {
-                transform(e);
-            }
-        }
-    }
-
-    private void transform(AtlasEntity e) {
-        for (BaseEntityHandler handler : entityHandlers) {
-            handler.transform(e);
-        }
-    }
-
-    private <T> T convertFromJson(Class<T> clazz, String jsonData) throws 
AtlasBaseException {
-        try {
-            return AtlasType.fromJson(jsonData, clazz);
-
-        } catch (Exception e) {
-            throw new AtlasBaseException("Error converting file to JSON.", e);
-        }
-    }
-
-    private AtlasEntity getEntity(String guid) throws AtlasBaseException {
-        AtlasEntity.AtlasEntityWithExtInfo extInfo = 
getEntityWithExtInfo(guid);
-        return (extInfo != null) ? extInfo.getEntity() : null;
-    }
-
-    public int size() {
-        return this.streamSize;
-    }
-
-    private String moveNext() {
-        try {
-            moveNextEntry();
-            return getJsonPayloadFromZipEntryStream(this.zipInputStream);
-        } catch (IOException e) {
-            LOG.error("moveNext failed!", e);
-        }
-
-        return null;
-    }
-
-    private void moveNextEntry() throws IOException {
-        this.zipEntryNext = this.zipInputStream.getNextEntry();
-        this.currentPosition++;
-    }
-
-    private void prepareStreamForFetch() throws AtlasBaseException, 
IOException {
-        moveNextEntry();
-        if (this.zipEntryNext == null) {
-            throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, 
"Attempting to import empty ZIP.");
-        }
-
-        if 
(this.zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toEntryFileName()))
 {
-            String json = 
getJsonPayloadFromZipEntryStream(this.zipInputStream);
-            this.typesDef = AtlasType.fromJson(json, AtlasTypesDef.class);
-        }
-    }
-
-    private String getJsonPayloadFromZipEntryStream(ZipInputStream 
zipInputStream) {
-        try {
-            final int BUFFER_LENGTH = 4096;
-            byte[] buf = new byte[BUFFER_LENGTH];
-
-            int n = 0;
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            while ((n = zipInputStream.read(buf, 0, BUFFER_LENGTH)) > -1) {
-                bos.write(buf, 0, n);
-            }
-
-            return bos.toString();
-        } catch (IOException ex) {
-            LOG.error("Error fetching string from entry!", ex);
-        }
-
-        return null;
-    }
-}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
 
b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index 69d78cd..ca0bc41 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -24,7 +24,6 @@ import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.repository.impexp.ImportService;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,20 +32,11 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
-import java.util.zip.ZipFile;
-
-import static 
org.apache.atlas.AtlasConfiguration.MIGRATION_IMPORT_START_POSITION;
 
 public class ZipFileMigrationImporter implements Runnable {
     private static final Logger LOG = 
LoggerFactory.getLogger(ZipFileMigrationImporter.class);
 
-    private static final String 
APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = 
"atlas.migration.mode.workers";
-    private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE      
 = "atlas.migration.mode.batch.size";
-    private static final String DEFAULT_NUMBER_OF_WORKDERS = "4";
-    private static final String DEFAULT_BATCH_SIZE = "100";
-    private static final String ZIP_FILE_COMMENT = "streamSize";
-
-    private final static String ENV_USER_NAME = "user.name";
+    private static String ENV_USER_NAME = "user.name";
 
     private final ImportService importService;
     private final String fileToImport;
@@ -62,8 +52,7 @@ public class ZipFileMigrationImporter implements Runnable {
             FileWatcher fileWatcher = new FileWatcher(fileToImport);
             fileWatcher.start();
 
-            int streamSize = getStreamSizeFromComment(fileToImport);
-            performImport(new FileInputStream(new File(fileToImport)), 
streamSize);
+            performImport(new FileInputStream(new File(fileToImport)));
         } catch (IOException e) {
             LOG.error("Migration Import: IO Error!", e);
         } catch (AtlasBaseException e) {
@@ -71,44 +60,19 @@ public class ZipFileMigrationImporter implements Runnable {
         }
     }
 
-    private int getStreamSizeFromComment(String fileToImport) {
-        int ret = 1;
-        try {
-            ZipFile zipFile = new ZipFile(fileToImport);
-            String streamSizeComment = zipFile.getComment();
-            ret = processZipFileStreamSizeComment(streamSizeComment);
-            zipFile.close();
-        } catch (IOException e) {
-            LOG.error("Error opening ZIP file: {}", fileToImport, e);
-        }
-
-        return ret;
-    }
-
-    private int processZipFileStreamSizeComment(String streamSizeComment) {
-        if (!StringUtils.isNotEmpty(streamSizeComment) || 
!StringUtils.startsWith(streamSizeComment, ZIP_FILE_COMMENT)) {
-            return 1;
-        }
-
-        String s = StringUtils.substringAfter(streamSizeComment, ":");
-        LOG.debug("ZipFileMigrationImporter: streamSize: {}", 
streamSizeComment);
-
-        return Integer.valueOf(s);
-    }
-
-    private void performImport(InputStream fs, int streamSize) throws 
AtlasBaseException {
+    private void performImport(InputStream fs) throws AtlasBaseException {
         try {
             LOG.info("Migration Import: {}: Starting...", fileToImport);
 
             RequestContext.get().setUser(getUserNameFromEnvironment(), null);
 
-            importService.run(fs, getImportRequest(streamSize),
+            importService.run(fs, getImportRequest(),
                     getUserNameFromEnvironment(),
                     InetAddress.getLocalHost().getHostName(),
                     InetAddress.getLocalHost().getHostAddress());
 
         } catch (Exception ex) {
-            LOG.error("Migration Import: Error loading zip for migration!", 
ex);
+            LOG.error("Error loading zip for migration", ex);
             throw new AtlasBaseException(ex);
         } finally {
             LOG.info("Migration Import: {}: Done!", fileToImport);
@@ -119,16 +83,8 @@ public class ZipFileMigrationImporter implements Runnable {
         return System.getProperty(ENV_USER_NAME);
     }
 
-    private AtlasImportRequest getImportRequest(int streamSize) throws 
AtlasException {
-        AtlasImportRequest request = new AtlasImportRequest();
-
-        request.setSizeOption(streamSize);
-        request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
-        request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, 
getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, 
DEFAULT_NUMBER_OF_WORKDERS));
-        request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, 
getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, 
DEFAULT_BATCH_SIZE));
-        request.setOption(AtlasImportRequest.START_POSITION_KEY, 
Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt()));
-
-        return request;
+    private AtlasImportRequest getImportRequest() throws AtlasException {
+        return new AtlasImportRequest();
     }
 
     private String getPropertyValue(String property, String defaultValue) 
throws AtlasException {
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
 
b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index bee6378..2b58119 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -81,9 +81,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
             AtlasGraph        graph        = getGraph();
 
             for (AtlasEntityType entityType : 
typeRegistry.getAllEntityTypes()) {
-                LOG.info("finding entities of type: {}", 
entityType.getTypeName());
+                LOG.info("finding entities of type {}", 
entityType.getTypeName());
+
                 Iterable<Object> iterable = 
graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, 
entityType.getTypeName()).vertexIds();
-                LOG.info("found entities of type: {}", 
entityType.getTypeName());
                 int              count    = 0;
 
                 for (Iterator<Object> iter = iterable.iterator(); 
iter.hasNext(); ) {
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index 805531c..39ea3f8 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -150,14 +150,6 @@ public interface AtlasEntityStore {
     EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) 
throws AtlasBaseException;
 
     /**
-     * Create or update  entities with parameters necessary for import process 
without commit. Caller will have to do take care of commit.
-     * @param entityStream AtlasEntityStream
-     * @return EntityMutationResponse Entity mutations operations with the 
corresponding set of entities on which these operations were performed
-     * @throws AtlasBaseException
-     */
-    EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream 
entityStream) throws AtlasBaseException;
-
-    /**
      * Update a single entity
      * @param objectId     ID of the entity
      * @param updatedEntityInfo updated entity information
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 6f6ee17..30f5e5a 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -332,11 +332,6 @@ public class AtlasEntityStoreV2 implements 
AtlasEntityStore {
     }
 
     @Override
-    public EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream 
entityStream) throws AtlasBaseException {
-        return createOrUpdate(entityStream, false, true, true);
-    }
-
-    @Override
     @GraphTransaction
     public EntityMutationResponse updateEntity(AtlasObjectId objectId, 
AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws 
AtlasBaseException {
         if (LOG.isDebugEnabled()) {
@@ -1215,10 +1210,8 @@ public class AtlasEntityStoreV2 implements 
AtlasEntityStore {
 
             ret.setGuidAssignments(context.getGuidAssignments());
 
-            if (!RequestContext.get().isImportInProgress()) {
-                // Notify the change listeners
-                entityChangeNotifier.onEntitiesMutated(ret, 
RequestContext.get().isImportInProgress());
-            }
+            // Notify the change listeners
+            entityChangeNotifier.onEntitiesMutated(ret, 
RequestContext.get().isImportInProgress());
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("<== createOrUpdate()");
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
index 857b709..fdf117a 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
@@ -929,10 +929,6 @@ public class AtlasRelationshipStoreV2 implements 
AtlasRelationshipStore {
     }
 
     private void sendNotifications(AtlasRelationship ret, OperationType 
relationshipUpdate) throws AtlasBaseException {
-        if (entityChangeNotifier == null) {
-            return;
-        }
-
         entityChangeNotifier.notifyPropagatedEntities();
         if (notificationsEnabled){
             entityChangeNotifier.notifyRelationshipMutation(ret, 
relationshipUpdate);
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
index 4526002..54c32c5 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
@@ -18,30 +18,33 @@
 package org.apache.atlas.repository.store.graph.v2;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.graph.AtlasGraphProvider;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.BulkImporter;
-import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy;
-import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport;
-import org.apache.atlas.repository.store.graph.v2.bulkimport.RegularImport;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.type.Constants;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -52,24 +55,131 @@ public class BulkImporterImpl implements BulkImporter {
     private static final Logger LOG = 
LoggerFactory.getLogger(AtlasEntityStoreV2.class);
 
     private final AtlasEntityStore entityStore;
+    private final EntityGraphRetriever entityGraphRetriever;
     private AtlasTypeRegistry typeRegistry;
+    private final int MAX_ATTEMPTS = 2;
+    private boolean directoryBasedImportConfigured;
 
     @Inject
     public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry 
typeRegistry) {
         this.entityStore = entityStore;
+        this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
         this.typeRegistry = typeRegistry;
+        this.directoryBasedImportConfigured = 
StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
     }
 
     @Override
     public EntityMutationResponse bulkImport(EntityImportStream entityStream, 
AtlasImportResult importResult) throws AtlasBaseException {
-        ImportStrategy importStrategy =
-                (importResult.getRequest().getOptions() != null &&
-                        
importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION))
-                ? new MigrationImport(new AtlasGraphProvider(), 
this.typeRegistry)
-                : new RegularImport(this.entityStore, this.typeRegistry);
-
-        LOG.info("BulkImportImpl: {}", 
importStrategy.getClass().getSimpleName());
-        return importStrategy.run(entityStream, importResult);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> bulkImport()");
+        }
+
+        if (entityStream == null || !entityStream.hasNext()) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"no entities to create/update.");
+        }
+
+        EntityMutationResponse ret = new EntityMutationResponse();
+        ret.setGuidAssignments(new HashMap<>());
+
+        Set<String>  processedGuids = new HashSet<>();
+        float        currentPercent = 0f;
+        List<String> residualList   = new ArrayList<>();
+
+        EntityImportStreamWithResidualList entityImportStreamWithResidualList 
= new EntityImportStreamWithResidualList(entityStream, residualList);
+
+        while (entityImportStreamWithResidualList.hasNext()) {
+            AtlasEntityWithExtInfo entityWithExtInfo = 
entityImportStreamWithResidualList.getNextEntityWithExtInfo();
+            AtlasEntity            entity            = entityWithExtInfo != 
null ? entityWithExtInfo.getEntity() : null;
+
+            if (entity == null) {
+                continue;
+            }
+
+            for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
+                try {
+                    AtlasEntityStreamForImport oneEntityStream = new 
AtlasEntityStreamForImport(entityWithExtInfo, null);
+                    EntityMutationResponse resp = 
entityStore.createOrUpdateForImport(oneEntityStream);
+
+                    if (resp.getGuidAssignments() != null) {
+                        
ret.getGuidAssignments().putAll(resp.getGuidAssignments());
+                    }
+
+                    currentPercent = updateImportMetrics(entityWithExtInfo, 
resp, importResult, processedGuids,
+                            entityStream.getPosition(),
+                            entityImportStreamWithResidualList.getStreamSize(),
+                            currentPercent);
+
+                    entityStream.onImportComplete(entity.getGuid());
+                    break;
+                } catch (AtlasBaseException e) {
+                    if (!updateResidualList(e, residualList, 
entityWithExtInfo.getEntity().getGuid())) {
+                        throw e;
+                    }
+                    break;
+                } catch (AtlasSchemaViolationException e) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Entity: {}", entity.getGuid(), e);
+                    }
+
+                    if (attempt == 0) {
+                        updateVertexGuid(entity);
+                    } else {
+                        LOG.error("Guid update failed: {}", 
entityWithExtInfo.getEntity().getGuid());
+                        throw e;
+                    }
+                } catch (Throwable e) {
+                    AtlasBaseException abe = new AtlasBaseException(e);
+                    if (!updateResidualList(abe, residualList, 
entityWithExtInfo.getEntity().getGuid())) {
+                        throw abe;
+                    }
+
+                    LOG.warn("Exception: {}", entity.getGuid(), e);
+                    break;
+                } finally {
+                    RequestContext.get().clearCache();
+                }
+            }
+        }
+
+        importResult.getProcessedEntities().addAll(processedGuids);
+        LOG.info("bulkImport(): done. Total number of entities (including 
referred entities) imported: {}", processedGuids.size());
+
+        return ret;
+    }
+
+    @GraphTransaction
+    public void updateVertexGuid(AtlasEntity entity) {
+        String entityGuid = entity.getGuid();
+        AtlasObjectId objectId = 
entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
+
+        AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(entity.getTypeName());
+        String vertexGuid = null;
+        try {
+            vertexGuid = 
AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, 
objectId.getUniqueAttributes());
+        } catch (AtlasBaseException e) {
+            LOG.warn("Entity: {}: Does not exist!", objectId);
+            return;
+        }
+
+        if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
+            return;
+        }
+
+        AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
+        if (v == null) {
+            return;
+        }
+
+        addHistoricalGuid(v, vertexGuid);
+        AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, 
entityGuid);
+
+        LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, 
vertexGuid, entity.getGuid());
+    }
+
+    private void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
+        String existingJson = AtlasGraphUtilsV2.getProperty(v, 
HISTORICAL_GUID_PROPERTY_KEY, String.class);
+
+        AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, 
getJsonArray(existingJson, vertexGuid));
     }
 
     @VisibleForTesting
@@ -83,16 +193,38 @@ public class BulkImporterImpl implements BulkImporter {
         return json;
     }
 
+    private boolean updateResidualList(AtlasBaseException e, List<String> 
lineageList, String guid) {
+        if 
(!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode()))
 {
+            return false;
+        }
+
+        lineageList.add(guid);
+
+        return true;
+    }
+
+    private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo 
currentEntity,
+                                      EntityMutationResponse             resp,
+                                      AtlasImportResult                  
importResult,
+                                      Set<String>                        
processedGuids,
+                                      int currentIndex, int streamSize, float 
currentPercent) {
+        if (!directoryBasedImportConfigured) {
+            updateImportMetrics("entity:%s:created", 
resp.getCreatedEntities(), processedGuids, importResult);
+            updateImportMetrics("entity:%s:updated", 
resp.getUpdatedEntities(), processedGuids, importResult);
+            updateImportMetrics("entity:%s:deleted", 
resp.getDeletedEntities(), processedGuids, importResult);
+        }
+
+        String lastEntityImported = 
String.format("entity:last-imported:%s:[%s]:(%s)", 
currentEntity.getEntity().getTypeName(), currentIndex, 
currentEntity.getEntity().getGuid());
+
+        return updateImportProgress(LOG, currentIndex, streamSize, 
currentPercent, lastEntityImported);
+    }
+
     @VisibleForTesting
-    public static float updateImportProgress(Logger log, int currentIndex, int 
streamSize, float currentPercent, String additionalInfo) {
+    static float updateImportProgress(Logger log, int currentIndex, int 
streamSize, float currentPercent, String additionalInfo) {
         final double tolerance   = 0.000001;
         final int    MAX_PERCENT = 100;
 
         int     maxSize        = (currentIndex <= streamSize) ? streamSize : 
currentIndex;
-        if (maxSize <= 0) {
-            return currentPercent;
-        }
-
         float   percent        = (float) ((currentIndex * MAX_PERCENT) / 
maxSize);
         boolean updateLog      = Double.compare(percent, currentPercent) > 
tolerance;
         float   updatedPercent = (MAX_PERCENT < maxSize) ? percent : 
((updateLog) ? ++currentPercent : currentPercent);
@@ -104,7 +236,7 @@ public class BulkImporterImpl implements BulkImporter {
         return updatedPercent;
     }
 
-    public static void updateImportMetrics(String prefix, 
List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult 
importResult) {
+    private static void updateImportMetrics(String prefix, 
List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult 
importResult) {
         if (list == null) {
             return;
         }
@@ -119,37 +251,41 @@ public class BulkImporterImpl implements BulkImporter {
         }
     }
 
-    public static void updateVertexGuid(AtlasTypeRegistry typeRegistry, 
EntityGraphRetriever entityGraphRetriever, AtlasEntity entity) {
-        String entityGuid = entity.getGuid();
-        AtlasObjectId objectId = 
entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
+    private static class EntityImportStreamWithResidualList {
+        private final EntityImportStream stream;
+        private final List<String>       residualList;
+        private       boolean            navigateResidualList;
+        private       int                currentResidualListIndex;
 
-        AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(entity.getTypeName());
-        String vertexGuid = null;
-        try {
-            vertexGuid = 
AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, 
objectId.getUniqueAttributes());
-        } catch (AtlasBaseException e) {
-            LOG.warn("Entity: {}: Does not exist!", objectId);
-            return;
-        }
 
-        if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
-            return;
+        public EntityImportStreamWithResidualList(EntityImportStream stream, 
List<String> residualList) {
+            this.stream                   = stream;
+            this.residualList             = residualList;
+            this.navigateResidualList     = false;
+            this.currentResidualListIndex = 0;
         }
 
-        AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
-        if (v == null) {
-            return;
+        public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+            if (navigateResidualList == false) {
+                return stream.getNextEntityWithExtInfo();
+            } else {
+                
stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
+                return stream.getNextEntityWithExtInfo();
+            }
         }
 
-        addHistoricalGuid(v, vertexGuid);
-        AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, 
entityGuid);
-
-        LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, 
vertexGuid, entity.getGuid());
-    }
-
-    public static void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
-        String existingJson = AtlasGraphUtilsV2.getProperty(v, 
HISTORICAL_GUID_PROPERTY_KEY, String.class);
+        public boolean hasNext() {
+            if (!navigateResidualList) {
+                boolean streamHasNext = stream.hasNext();
+                navigateResidualList = (streamHasNext == false);
+                return streamHasNext ? streamHasNext : 
(currentResidualListIndex < residualList.size());
+            } else {
+                return (currentResidualListIndex < residualList.size());
+            }
+        }
 
-        AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, 
getJsonArray(existingJson, vertexGuid));
+        public int getStreamSize() {
+            return stream.size() + residualList.size();
+        }
     }
 }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index e76b341..2f3aad0 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -361,9 +361,7 @@ public class EntityGraphMapper {
 
         updateLabels(vertex, labels);
 
-        if (entityChangeNotifier != null) {
-            
entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), 
addedLabels, removedLabels);
-        }
+        
entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), 
addedLabels, removedLabels);
     }
 
     public void addLabels(AtlasVertex vertex, Set<String> labels) throws 
AtlasBaseException {
@@ -380,10 +378,7 @@ public class EntityGraphMapper {
             if (!updatedLabels.equals(existingLabels)) {
                 updateLabels(vertex, updatedLabels);
                 updatedLabels.removeAll(existingLabels);
-
-                if (entityChangeNotifier != null) {
-                    
entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), 
updatedLabels, null);
-                }
+                
entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), 
updatedLabels, null);
             }
         }
     }
@@ -400,10 +395,7 @@ public class EntityGraphMapper {
                 if (!updatedLabels.equals(existingLabels)) {
                     updateLabels(vertex, updatedLabels);
                     existingLabels.removeAll(updatedLabels);
-
-                    if (entityChangeNotifier != null) {
-                        
entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), 
null, existingLabels);
-                    }
+                    
entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), 
null, existingLabels);
                 }
             }
         }
@@ -1956,9 +1948,7 @@ public class EntityGraphMapper {
                 Set<AtlasVertex>  vertices           = 
addedClassifications.get(classification);
                 List<AtlasEntity> propagatedEntities = 
updateClassificationText(classification, vertices);
 
-                if (entityChangeNotifier != null) {
-                    
entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, 
Collections.singletonList(classification));
-                }
+                
entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, 
Collections.singletonList(classification));
             }
 
             RequestContext.get().endMetricRecord(metric);
@@ -2066,10 +2056,7 @@ public class EntityGraphMapper {
             AtlasEntity entity = updateClassificationText(entry.getKey());
 
             List<AtlasClassification> deletedClassificationNames = 
entry.getValue();
-
-            if (entityChangeNotifier != null) {
-                entityChangeNotifier.onClassificationDeletedFromEntity(entity, 
deletedClassificationNames);
-            }
+            entityChangeNotifier.onClassificationDeletedFromEntity(entity, 
deletedClassificationNames);
         }
     }
 
@@ -2296,19 +2283,17 @@ public class EntityGraphMapper {
             notificationVertices.addAll(entitiesToPropagateTo);
         }
 
-        if (entityChangeNotifier != null) {
-            for (AtlasVertex vertex : notificationVertices) {
-                String entityGuid = GraphHelper.getGuid(vertex);
-                AtlasEntity entity = 
instanceConverter.getAndCacheEntity(entityGuid, 
ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
+        for (AtlasVertex vertex : notificationVertices) {
+            String      entityGuid = GraphHelper.getGuid(vertex);
+            AtlasEntity entity     = 
instanceConverter.getAndCacheEntity(entityGuid, 
ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
 
-                if (isActive(entity)) {
-                    vertex.setProperty(CLASSIFICATION_TEXT_KEY, 
fullTextMapperV2.getClassificationTextForEntity(entity));
-                    
entityChangeNotifier.onClassificationUpdatedToEntity(entity, 
updatedClassifications);
-                }
+            if (isActive(entity)) {
+                vertex.setProperty(CLASSIFICATION_TEXT_KEY, 
fullTextMapperV2.getClassificationTextForEntity(entity));
+                entityChangeNotifier.onClassificationUpdatedToEntity(entity, 
updatedClassifications);
             }
         }
 
-        if (entityChangeNotifier != null && 
MapUtils.isNotEmpty(removedPropagations)) {
+        if (MapUtils.isNotEmpty(removedPropagations)) {
             for (AtlasClassification classification : 
removedPropagations.keySet()) {
                 List<AtlasVertex> propagatedVertices = 
removedPropagations.get(classification);
                 List<AtlasEntity> propagatedEntities = 
updateClassificationText(classification, propagatedVertices);
@@ -2541,7 +2526,7 @@ public class EntityGraphMapper {
     private List<AtlasEntity> updateClassificationText(AtlasClassification 
classification, Collection<AtlasVertex> propagatedVertices) throws 
AtlasBaseException {
         List<AtlasEntity> propagatedEntities = new ArrayList<>();
 
-        if (fullTextMapperV2 != null && 
CollectionUtils.isNotEmpty(propagatedVertices)) {
+        if(CollectionUtils.isNotEmpty(propagatedVertices)) {
             for(AtlasVertex vertex : propagatedVertices) {
                 AtlasEntity entity = 
instanceConverter.getAndCacheEntity(GraphHelper.getGuid(vertex), 
ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
deleted file mode 100644
index 6b70eab..0000000
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.repository.store.graph.v2.bulkimport;
-
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasImportResult;
-import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
-
-public abstract class ImportStrategy {
-    public abstract EntityMutationResponse run(EntityImportStream 
entityStream, AtlasImportResult importResult) throws AtlasBaseException;
-}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
deleted file mode 100644
index 8c66656..0000000
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.repository.store.graph.v2.bulkimport;
-
-import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasImportResult;
-import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.converters.AtlasFormatConverters;
-import org.apache.atlas.repository.converters.AtlasInstanceConverter;
-import org.apache.atlas.repository.graph.AtlasGraphProvider;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
-import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
-import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
-import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2;
-import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
-import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
-import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
-import 
org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityConsumerBuilder;
-import 
org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityCreationManager;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MigrationImport extends ImportStrategy {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MigrationImport.class);
-
-    private final AtlasTypeRegistry typeRegistry;
-    private AtlasGraph atlasGraph;
-    private EntityGraphRetriever entityGraphRetriever;
-    private EntityGraphMapper entityGraphMapper;
-    private AtlasEntityStore entityStore;
-
-    public MigrationImport(AtlasGraphProvider atlasGraphProvider, 
AtlasTypeRegistry typeRegistry) {
-        this.typeRegistry = typeRegistry;
-        setupEntityStore(atlasGraphProvider, typeRegistry);
-        LOG.info("MigrationImport: Using bulkLoading...");
-    }
-
-    public EntityMutationResponse run(EntityImportStream entityStream, 
AtlasImportResult importResult) throws AtlasBaseException {
-        if (entityStream == null || !entityStream.hasNext()) {
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"no entities to create/update.");
-        }
-
-        if (importResult.getRequest() == null) {
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"importResult should contain request");
-        }
-
-        int index = 0;
-        int streamSize = entityStream.size();
-        EntityMutationResponse ret = new EntityMutationResponse();
-        EntityCreationManager creationManager = 
createEntityCreationManager(atlasGraph, importResult, streamSize);
-
-        try {
-            LOG.info("Migration Import: Size: {}: Starting...", streamSize);
-            index = creationManager.read(entityStream);
-            creationManager.drain();
-            creationManager.extractResults();
-        } catch (Exception ex) {
-            LOG.error("Migration Import: Error: Current position: {}", index, 
ex);
-        } finally {
-            shutdownEntityCreationManager(creationManager);
-        }
-
-        LOG.info("Migration Import: Size: {}: Done!", streamSize);
-        return ret;
-    }
-
-    private EntityCreationManager createEntityCreationManager(AtlasGraph 
threadedAtlasGraph, AtlasImportResult importResult, int streamSize) {
-        int batchSize = importResult.getRequest().getOptionKeyBatchSize();
-        int numWorkers = 
getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
-
-        EntityConsumerBuilder consumerBuilder =
-                new EntityConsumerBuilder(threadedAtlasGraph, entityStore, 
entityGraphRetriever, typeRegistry, batchSize);
-
-        return new EntityCreationManager(consumerBuilder, batchSize, 
numWorkers, importResult, streamSize);
-    }
-
-    private static int getNumWorkers(int numWorkersFromOptions) {
-        int ret = (numWorkersFromOptions > 0) ? numWorkersFromOptions : 1;
-        LOG.info("Migration Import: Setting numWorkers: {}", ret);
-        return ret;
-    }
-
-    private void setupEntityStore(AtlasGraphProvider atlasGraphProvider, 
AtlasTypeRegistry typeRegistry) {
-        this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
-        this.atlasGraph = atlasGraphProvider.getBulkLoading();
-        DeleteHandlerDelegate deleteDelegate = new 
DeleteHandlerDelegate(typeRegistry);
-
-        AtlasRelationshipStore relationshipStore = new 
AtlasRelationshipStoreV2(typeRegistry, deleteDelegate, null);
-        AtlasFormatConverters formatConverters = new 
AtlasFormatConverters(typeRegistry);
-        AtlasInstanceConverter instanceConverter = new 
AtlasInstanceConverter(typeRegistry, formatConverters);
-        this.entityGraphMapper = new EntityGraphMapper(deleteDelegate, 
typeRegistry, atlasGraph, relationshipStore, null, instanceConverter, null);
-        this.entityStore = new AtlasEntityStoreV2(deleteDelegate, 
typeRegistry, null, entityGraphMapper);
-    }
-
-    private void shutdownEntityCreationManager(EntityCreationManager 
creationManager) {
-        try {
-            creationManager.shutdown();
-        } catch (InterruptedException e) {
-            LOG.error("Migration Import: Shutdown: Interrupted!", e);
-        }
-    }
-}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
deleted file mode 100644
index 4cc8ed4..0000000
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.repository.store.graph.v2.bulkimport;
-
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.atlas.AtlasConfiguration;
-import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.RequestContext;
-import org.apache.atlas.annotation.GraphTransaction;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasImportResult;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
-import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
-import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
-import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
-import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
-import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
-import org.apache.atlas.type.AtlasEntityType;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static 
org.apache.atlas.repository.Constants.HISTORICAL_GUID_PROPERTY_KEY;
-import static 
org.apache.atlas.repository.store.graph.v2.BulkImporterImpl.updateImportProgress;
-
-public class RegularImport extends ImportStrategy {
-    private static final Logger LOG = 
LoggerFactory.getLogger(RegularImport.class);
-    private static final int MAX_ATTEMPTS = 3;
-    private final AtlasEntityStore entityStore;
-    private final AtlasTypeRegistry typeRegistry;
-    private final EntityGraphRetriever entityGraphRetriever;
-    private boolean directoryBasedImportConfigured;
-
-    public RegularImport(AtlasEntityStore entityStore, AtlasTypeRegistry 
typeRegistry) {
-        this.entityStore = entityStore;
-        this.typeRegistry = typeRegistry;
-        this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
-        this.directoryBasedImportConfigured = 
StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
-    }
-
-    @Override
-    public EntityMutationResponse run(EntityImportStream entityStream, 
AtlasImportResult importResult) throws AtlasBaseException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> bulkImport()");
-        }
-
-        if (entityStream == null || !entityStream.hasNext()) {
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"no entities to create/update.");
-        }
-
-        EntityMutationResponse ret = new EntityMutationResponse();
-        ret.setGuidAssignments(new HashMap<>());
-
-        Set<String> processedGuids = new HashSet<>();
-        float        currentPercent = 0f;
-        List<String> residualList   = new ArrayList<>();
-
-        EntityImportStreamWithResidualList entityImportStreamWithResidualList 
= new EntityImportStreamWithResidualList(entityStream, residualList);
-
-        while (entityImportStreamWithResidualList.hasNext()) {
-            AtlasEntityWithExtInfo entityWithExtInfo = 
entityImportStreamWithResidualList.getNextEntityWithExtInfo();
-            AtlasEntity            entity            = entityWithExtInfo != 
null ? entityWithExtInfo.getEntity() : null;
-
-            if (entity == null) {
-                continue;
-            }
-
-            for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
-                try {
-                    AtlasEntityStreamForImport oneEntityStream = new 
AtlasEntityStreamForImport(entityWithExtInfo, null);
-                    EntityMutationResponse resp = 
entityStore.createOrUpdateForImport(oneEntityStream);
-
-                    if (resp.getGuidAssignments() != null) {
-                        
ret.getGuidAssignments().putAll(resp.getGuidAssignments());
-                    }
-
-                    currentPercent = updateImportMetrics(entityWithExtInfo, 
resp, importResult, processedGuids,
-                            entityStream.getPosition(),
-                            entityImportStreamWithResidualList.getStreamSize(),
-                            currentPercent);
-
-                    entityStream.onImportComplete(entity.getGuid());
-                    break;
-                } catch (AtlasBaseException e) {
-                    if (!updateResidualList(e, residualList, 
entityWithExtInfo.getEntity().getGuid())) {
-                        throw e;
-                    }
-                    break;
-                } catch (AtlasSchemaViolationException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Entity: {}", entity.getGuid(), e);
-                    }
-
-                    if (attempt == 0) {
-                        updateVertexGuid(entity);
-                    } else {
-                        LOG.error("Guid update failed: {}", 
entityWithExtInfo.getEntity().getGuid());
-                        throw e;
-                    }
-                } catch (Throwable e) {
-                    AtlasBaseException abe = new AtlasBaseException(e);
-                    if (!updateResidualList(abe, residualList, 
entityWithExtInfo.getEntity().getGuid())) {
-                        throw abe;
-                    }
-
-                    LOG.warn("Exception: {}", entity.getGuid(), e);
-                    break;
-                } finally {
-                    RequestContext.get().clearCache();
-                }
-            }
-        }
-
-        importResult.getProcessedEntities().addAll(processedGuids);
-        LOG.info("bulkImport(): done. Total number of entities (including 
referred entities) imported: {}", processedGuids.size());
-
-        return ret;
-    }
-
-    @GraphTransaction
-    public void updateVertexGuid(AtlasEntity entity) {
-        String entityGuid = entity.getGuid();
-        AtlasObjectId objectId = 
entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
-
-        AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(entity.getTypeName());
-        String vertexGuid = null;
-        try {
-            vertexGuid = 
AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, 
objectId.getUniqueAttributes());
-        } catch (AtlasBaseException e) {
-            LOG.warn("Entity: {}: Does not exist!", objectId);
-            return;
-        }
-
-        if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
-            return;
-        }
-
-        AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
-        if (v == null) {
-            return;
-        }
-
-        addHistoricalGuid(v, vertexGuid);
-        AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, 
entityGuid);
-
-        LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, 
vertexGuid, entity.getGuid());
-    }
-
-    private void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
-        String existingJson = AtlasGraphUtilsV2.getProperty(v, 
HISTORICAL_GUID_PROPERTY_KEY, String.class);
-
-        AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, 
getJsonArray(existingJson, vertexGuid));
-    }
-
-    @VisibleForTesting
-    static String getJsonArray(String json, String vertexGuid) {
-        String quotedGuid = String.format("\"%s\"", vertexGuid);
-        if (StringUtils.isEmpty(json)) {
-            json = String.format("[%s]", quotedGuid);
-        } else {
-            json = json.replace("]", 
"").concat(",").concat(quotedGuid).concat("]");
-        }
-        return json;
-    }
-
-    private boolean updateResidualList(AtlasBaseException e, List<String> 
lineageList, String guid) {
-        if 
(!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode()))
 {
-            return false;
-        }
-
-        lineageList.add(guid);
-
-        return true;
-    }
-
-    private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo 
currentEntity,
-                                      EntityMutationResponse             resp,
-                                      AtlasImportResult                  
importResult,
-                                      Set<String>                        
processedGuids,
-                                      int currentIndex, int streamSize, float 
currentPercent) {
-        if (!directoryBasedImportConfigured) {
-            BulkImporterImpl.updateImportMetrics("entity:%s:created", 
resp.getCreatedEntities(), processedGuids, importResult);
-            BulkImporterImpl.updateImportMetrics("entity:%s:updated", 
resp.getUpdatedEntities(), processedGuids, importResult);
-            BulkImporterImpl.updateImportMetrics("entity:%s:deleted", 
resp.getDeletedEntities(), processedGuids, importResult);
-        }
-
-        String lastEntityImported = 
String.format("entity:last-imported:%s:[%s]:(%s)", 
currentEntity.getEntity().getTypeName(), currentIndex, 
currentEntity.getEntity().getGuid());
-
-        return updateImportProgress(LOG, currentIndex, streamSize, 
currentPercent, lastEntityImported);
-    }
-
-    private static class EntityImportStreamWithResidualList {
-        private final EntityImportStream stream;
-        private final List<String>       residualList;
-        private       boolean            navigateResidualList;
-        private       int                currentResidualListIndex;
-
-
-        public EntityImportStreamWithResidualList(EntityImportStream stream, 
List<String> residualList) {
-            this.stream                   = stream;
-            this.residualList             = residualList;
-            this.navigateResidualList     = false;
-            this.currentResidualListIndex = 0;
-        }
-
-        public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
-            if (navigateResidualList == false) {
-                return stream.getNextEntityWithExtInfo();
-            } else {
-                
stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
-                return stream.getNextEntityWithExtInfo();
-            }
-        }
-
-        public boolean hasNext() {
-            if (!navigateResidualList) {
-                boolean streamHasNext = stream.hasNext();
-                navigateResidualList = (streamHasNext == false);
-                return streamHasNext ? streamHasNext : 
(currentResidualListIndex < residualList.size());
-            } else {
-                return (currentResidualListIndex < residualList.size());
-            }
-        }
-
-        public int getStreamSize() {
-            return stream.size() + residualList.size();
-        }
-    }
-}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
deleted file mode 100644
index bb74205..0000000
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
-
-import org.apache.atlas.GraphTransactionInterceptor;
-import org.apache.atlas.RequestContext;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
-import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.pc.WorkItemConsumer;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
-import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
-import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
-import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class EntityConsumer extends 
WorkItemConsumer<AtlasEntity.AtlasEntityWithExtInfo> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(EntityConsumer.class);
-    private static final int MAX_COMMIT_RETRY_COUNT = 3;
-
-    private final int batchSize;
-    private AtomicLong counter = new AtomicLong(1);
-    private AtomicLong currentBatch = new AtomicLong(1);
-
-    private final AtlasGraph atlasGraph;
-    private final AtlasEntityStore entityStoreV2;
-    private final AtlasTypeRegistry typeRegistry;
-    private final EntityGraphRetriever entityGraphRetriever;
-
-    private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new 
ArrayList<>();
-    private List<EntityMutationResponse> localResults = new ArrayList<>();
-
-    public EntityConsumer(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
-                          EntityGraphRetriever entityGraphRetriever, 
AtlasTypeRegistry typeRegistry,
-                          BlockingQueue queue, int batchSize) {
-        super(queue);
-
-        this.atlasGraph = atlasGraph;
-        this.entityStoreV2 = entityStore;
-        this.entityGraphRetriever = entityGraphRetriever;
-        this.typeRegistry = typeRegistry;
-        this.batchSize = batchSize;
-    }
-
-    @Override
-    protected void processItem(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo) {
-        int delta = (MapUtils.isEmpty(entityWithExtInfo.getReferredEntities())
-                ? 1
-                : entityWithExtInfo.getReferredEntities().size()) + 1;
-
-        long currentCount = counter.addAndGet(delta);
-        currentBatch.addAndGet(delta);
-        entityBuffer.add(entityWithExtInfo);
-
-        try {
-            processEntity(entityWithExtInfo, currentCount);
-            attemptCommit();
-        } catch (Exception e) {
-            LOG.info("Data loss: Please re-submit!", e);
-        }
-    }
-
-    private void processEntity(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo, long currentCount) {
-        try {
-            RequestContext.get().setImportInProgress(true);
-            AtlasEntityStreamForImport oneEntityStream = new 
AtlasEntityStreamForImport(entityWithExtInfo, null);
-
-            LOG.debug("Processing: {}", currentCount);
-            EntityMutationResponse result = 
entityStoreV2.createOrUpdateForImportNoCommit(oneEntityStream);
-            localResults.add(result);
-        } catch (AtlasBaseException e) {
-            addResult(entityWithExtInfo.getEntity().getGuid());
-            LOG.warn("Exception: {}", entityWithExtInfo.getEntity().getGuid(), 
e);
-        } catch (AtlasSchemaViolationException e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Entity: {}", 
entityWithExtInfo.getEntity().getGuid(), e);
-            }
-
-            BulkImporterImpl.updateVertexGuid(typeRegistry, 
entityGraphRetriever, entityWithExtInfo.getEntity());
-        }
-    }
-
-    private void attemptCommit() {
-        if (currentBatch.get() < batchSize) {
-            return;
-        }
-
-        doCommit();
-    }
-
-    @Override
-    protected void doCommit() {
-        for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; 
retryCount++) {
-            if (commitWithRetry(retryCount)) {
-                return;
-            }
-        }
-
-        LOG.error("Retries exceeded! Potential data loss! Please correct data 
and re-attempt. Buffer: {}: Counter: {}", entityBuffer.size(), counter.get());
-        clear();
-    }
-
-    @Override
-    protected void commitDirty() {
-        super.commitDirty();
-        LOG.info("Total: Commit: {}", counter.get());
-        counter.set(0);
-    }
-
-    private boolean commitWithRetry(int retryCount) {
-        try {
-            atlasGraph.commit();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Commit: Done!: Buffer: {}: Batch: {}: Counter: {}", 
entityBuffer.size(), currentBatch.get(), counter.get());
-            }
-
-            dispatchResults();
-            return true;
-        } catch (Exception ex) {
-            rollbackPauseRetry(retryCount, ex);
-            return false;
-        }
-    }
-
-    private void rollbackPauseRetry(int retryCount, Exception ex) {
-        atlasGraph.rollback();
-        clearCache();
-
-        LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", 
entityBuffer.size(), counter.get(), retryCount);
-        pause(retryCount);
-        if (ex.getClass().getName().endsWith("JanusGraphException") && 
retryCount >= MAX_COMMIT_RETRY_COUNT) {
-            LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: 
{}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex);
-        } else {
-            LOG.info("Will pause and retry: Buffer: {}: Counter: {}: Retry 
count: {}", entityBuffer.size(), counter.get(), retryCount);
-        }
-        retryProcessEntity(retryCount);
-    }
-
-    private void retryProcessEntity(int retryCount) {
-        LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", 
entityBuffer.size(), retryCount);
-        for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) {
-            processEntity(e, counter.get());
-        }
-        LOG.info("Replaying: Done!: Buffer: {}: Retry count: {}", 
entityBuffer.size(), retryCount);
-    }
-
-    private void dispatchResults() {
-        localResults.stream().forEach(x -> {
-            addResultsFromResponse(x.getCreatedEntities());
-            addResultsFromResponse(x.getUpdatedEntities());
-            addResultsFromResponse(x.getDeletedEntities());
-        });
-
-        clear();
-    }
-
-    private void pause(int retryCount) {
-        try {
-            Thread.sleep(1000 * retryCount);
-        } catch (InterruptedException e) {
-            LOG.error("pause: Interrupted!", e);
-        }
-    }
-
-    private void addResultsFromResponse(List<AtlasEntityHeader> entities) {
-        if (CollectionUtils.isEmpty(entities)) {
-            return;
-        }
-
-        for (AtlasEntityHeader eh : entities) {
-            addResult(eh.getGuid());
-        }
-    }
-
-    private void clear() {
-        localResults.clear();
-        entityBuffer.clear();
-        clearCache();
-        currentBatch.set(0);
-    }
-
-    private void clearCache() {
-        GraphTransactionInterceptor.clearCache();
-        RequestContext.get().clearCache();
-    }
-}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
deleted file mode 100644
index 69d33b2..0000000
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
-
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.pc.WorkItemBuilder;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
-import org.apache.atlas.type.AtlasTypeRegistry;
-
-import java.util.concurrent.BlockingQueue;
-
-public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, 
AtlasEntity.AtlasEntityWithExtInfo> {
-    private AtlasGraph atlasGraph;
-    private AtlasEntityStore entityStore;
-    private final EntityGraphRetriever entityGraphRetriever;
-    private final AtlasTypeRegistry typeRegistry;
-    private int batchSize;
-
-    public EntityConsumerBuilder(AtlasGraph atlasGraph, AtlasEntityStore 
entityStore,
-                                 EntityGraphRetriever entityGraphRetriever, 
AtlasTypeRegistry typeRegistry, int batchSize) {
-        this.atlasGraph = atlasGraph;
-        this.entityStore = entityStore;
-        this.entityGraphRetriever = entityGraphRetriever;
-        this.typeRegistry = typeRegistry;
-        this.batchSize = batchSize;
-    }
-
-    @Override
-    public EntityConsumer 
build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) {
-        return new EntityConsumer(atlasGraph, entityStore, 
entityGraphRetriever, typeRegistry, queue, this.batchSize);
-    }
-}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
deleted file mode 100644
index 0051941..0000000
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
-
-import org.apache.atlas.model.impexp.AtlasImportResult;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.pc.WorkItemBuilder;
-import org.apache.atlas.pc.WorkItemManager;
-import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
-import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EntityCreationManager<AtlasEntityWithExtInfo> extends 
WorkItemManager {
-    private static final Logger LOG = 
LoggerFactory.getLogger(EntityCreationManager.class);
-    private static final String WORKER_PREFIX = "migration-import";
-
-    private final StatusReporter<String, String> statusReporter;
-    private final AtlasImportResult importResult;
-    private final int streamSize;
-    private final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min
-    private String currentTypeName;
-    private float currentPercent;
-
-    public EntityCreationManager(WorkItemBuilder builder, int batchSize, int 
numWorkers, AtlasImportResult importResult, int streamSize) {
-        super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
-        this.importResult = importResult;
-        this.streamSize = streamSize;
-
-        this.statusReporter = new 
StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION);
-    }
-
-    public int read(EntityImportStream entityStream) {
-        int currentIndex = 0;
-        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
-        while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) 
!= null) {
-            AtlasEntity entity = entityWithExtInfo != null ? 
entityWithExtInfo.getEntity() : null;
-            if (entity == null) {
-                continue;
-            }
-
-            try {
-                produce(currentIndex++, entity.getTypeName(), 
entityWithExtInfo);
-            } catch (Throwable e) {
-                LOG.warn("Exception: {}", entity.getGuid(), e);
-                break;
-            }
-        }
-        return currentIndex;
-    }
-
-    private void produce(int currentIndex, String typeName, 
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
-        String previousTypeName = getCurrentTypeName();
-
-        if (StringUtils.isNotEmpty(typeName)
-                && StringUtils.isNotEmpty(previousTypeName)
-                && !StringUtils.equals(previousTypeName, typeName)) {
-            LOG.info("Waiting: '{}' to complete...", previousTypeName);
-            super.drain();
-            LOG.info("Switching entity type processing: From: '{}' To: 
'{}'...", previousTypeName, typeName);
-        }
-
-        setCurrentTypeName(typeName);
-        statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), 
String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(), 
currentIndex));
-        super.checkProduce(entityWithExtInfo);
-        extractResults();
-    }
-
-    public void extractResults() {
-        Object result;
-        while (((result = getResults().poll())) != null) {
-            statusReporter.processed((String) result);
-        }
-
-        logStatus();
-    }
-
-    private void logStatus() {
-        String ack = statusReporter.ack();
-        if (StringUtils.isEmpty(ack)) {
-            return;
-        }
-
-        String[] split = ack.split(":");
-        if (split.length == 0 || split.length < 2) {
-            return;
-        }
-
-        importResult.incrementMeticsCounter(split[0]);
-        this.currentPercent = updateImportMetrics(split[0], 
Integer.parseInt(split[1]), getStreamSize(), getCurrentPercent());
-    }
-
-    private static float updateImportMetrics(String typeNameGuid, int 
currentIndex, int streamSize, float currentPercent) {
-        String lastEntityImported = 
String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex);
-        return BulkImporterImpl.updateImportProgress(LOG, currentIndex, 
streamSize, currentPercent, lastEntityImported);
-    }
-
-    private String getCurrentTypeName() {
-        return this.currentTypeName;
-    }
-
-    private void setCurrentTypeName(String typeName) {
-        this.currentTypeName = typeName;
-    }
-
-    private float getCurrentPercent() {
-        return this.currentPercent;
-    }
-
-    private int getStreamSize() {
-        return this.streamSize;
-    }
-}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java
deleted file mode 100644
index 1cd9860..0000000
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
-
-import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-
-public class StatusReporter<T, U> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(StatusReporter.class);
-
-    private Map<T,U> producedItems = new LinkedHashMap<>();
-    private Set<T> processedSet = new HashSet<>();
-    private TypesUtil.Pair<T, Long> watchedItem;
-    private final long timeOut;
-
-    public StatusReporter(long timeOut) {
-        this.timeOut = timeOut;
-    }
-
-    public void produced(T item, U index) {
-        this.producedItems.put(item, index);
-    }
-
-    public void processed(T item) {
-        this.processedSet.add(item);
-    }
-
-    public void processed(T[] index) {
-        this.processedSet.addAll(Arrays.asList(index));
-    }
-
-    public U ack() {
-        U ack = null;
-        U ret;
-        Map.Entry<T, U> firstElement;
-        do {
-            firstElement = getFirstElement(this.producedItems);
-            ret = completionIndex(firstElement);
-            if (ret != null) {
-                ack = ret;
-            }
-        } while(ret != null);
-
-        return addToWatchIfNeeded(ack, firstElement);
-    }
-
-    private U addToWatchIfNeeded(U ack, Map.Entry<T, U> firstElement) {
-        if (ack == null && firstElement != null) {
-            ack = addToWatch(firstElement.getKey());
-        } else {
-            resetWatchItem();
-        }
-        return ack;
-    }
-
-    private void resetWatchItem() {
-        this.watchedItem = null;
-    }
-
-    private U addToWatch(T key) {
-        createNewWatchItem(key);
-        if (!hasTimedOut(this.watchedItem)) {
-            return null;
-        }
-
-        T producedItemKey = this.watchedItem.left;
-        resetWatchItem();
-        LOG.warn("Item: {}: Was produced but not successfully processed!", 
producedItemKey);
-        return this.producedItems.get(producedItemKey);
-
-    }
-
-    private void createNewWatchItem(T key) {
-        if (this.watchedItem != null) {
-            return;
-        }
-
-        this.watchedItem = new TypesUtil.Pair<T, Long>(key, 
System.currentTimeMillis());
-    }
-
-    private boolean hasTimedOut(TypesUtil.Pair<T, Long> watchedItem) {
-        if (watchedItem == null) {
-            return  false;
-        }
-
-        return (System.currentTimeMillis() - watchedItem.right) >= timeOut;
-    }
-
-    private Map.Entry<T, U> getFirstElement(Map<T, U> map) {
-        if (map.isEmpty()) {
-            return null;
-        }
-
-        return map.entrySet().iterator().next();
-    }
-
-    private U completionIndex(Map.Entry<T, U> lookFor) {
-        U ack = null;
-        if (lookFor == null || !processedSet.contains(lookFor.getKey())) {
-            return ack;
-        }
-
-        ack = lookFor.getValue();
-        producedItems.remove(lookFor.getKey());
-        processedSet.remove(lookFor);
-        return ack;
-    }
-}
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 759be64..c14850f 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -136,11 +136,6 @@ public class ImportServiceTest extends 
ExportImportTestBase {
         return getZipSource("dup_col_deleted.zip");
     }
 
-    @DataProvider(name = "zipDirect1")
-    public static Object[][] getZipDirect(ITestContext context) throws 
IOException, AtlasBaseException {
-        return getZipSource("dup_col_deleted.zip");
-    }
-
     @Test(dataProvider = "sales")
     public void importDB1(InputStream inputStream) throws AtlasBaseException, 
IOException {
         loadBaseModel();
@@ -535,17 +530,6 @@ public class ImportServiceTest extends 
ExportImportTestBase {
         }
     }
 
-    @Test(dataProvider = "zipDirect1")
-    public void zipSourceDirect(InputStream inputStream) throws IOException, 
AtlasBaseException {
-        loadBaseModel();
-        loadFsModel();
-        loadHiveModel();
-
-        runImportWithNoParameters(importService, inputStream);
-
-    }
-
-
     private AtlasImportRequest getImportRequest(String replicatedFrom){
         AtlasImportRequest importRequest = getDefaultImportRequest();
 
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java
deleted file mode 100644
index 2a22d88..0000000
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.repository.impexp;
-
-
-import com.google.inject.Inject;
-import org.apache.atlas.TestModules;
-import org.apache.atlas.discovery.EntityDiscoveryService;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasImportRequest;
-import org.apache.atlas.model.impexp.AtlasImportResult;
-import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport;
-import org.apache.atlas.store.AtlasTypeDefStore;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import static org.testng.Assert.assertNotNull;
-
-@Guice(modules = TestModules.TestOnlyModule.class)
-public class MigrationImportTest  extends ExportImportTestBase {
-
-    private final ImportService importService;
-
-    @Inject
-    AtlasTypeRegistry typeRegistry;
-
-    @Inject
-    private AtlasTypeDefStore typeDefStore;
-
-    @Inject
-    private EntityDiscoveryService discoveryService;
-
-    @Inject
-    AtlasEntityStore entityStore;
-
-    @Inject
-    AtlasGraph atlasGraph;
-
-    @Inject
-    public MigrationImportTest(ImportService importService) {
-        this.importService = importService;
-    }
-
-    @Test
-    public void simpleImport() throws IOException, AtlasBaseException {
-        InputStream inputStream = 
ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip");
-
-        AtlasImportRequest importRequest = new AtlasImportRequest();
-        importRequest.setOption("migration", "true");
-
-        AtlasImportResult result = importService.run(inputStream, 
importRequest, null, null, null);
-        assertNotNull(result);
-    }
-}
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java
deleted file mode 100644
index 5e15023..0000000
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.repository.impexp;
-
-import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.StatusReporter;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-
-public class StatusReporterTest {
-    @Test
-    public void noneProducedNoneReported() {
-        StatusReporter<Integer, Integer> statusReporter = new 
StatusReporter<>(100);
-        assertNull(statusReporter.ack());
-    }
-
-    @Test
-    public void producedButNotAcknowledged() {
-        StatusReporter<Integer, Integer> statusReporter = 
createStatusReportWithItems();
-        assertNull(statusReporter.ack());
-    }
-
-    @Test
-    public void producedAcknowledged() {
-        StatusReporter<Integer, Integer> statusReporter = 
createStatusReportWithItems();
-        statusReporter.processed(1);
-
-        assertEquals(java.util.Optional.of(100).get(), statusReporter.ack());
-    }
-
-    @Test
-    public void producedAcknowledgeMaxAvailableInSequence() {
-        StatusReporter<Integer, Integer> statusReporter = 
createStatusReportWithItems();
-
-        statusReporter.processed(new Integer[]{1, 3, 5});
-
-        assertEquals(java.util.Optional.of(100).get(), statusReporter.ack());
-    }
-
-    @Test
-    public void producedAcknowledgeMaxAvailableInSequence2() {
-        StatusReporter<Integer, Integer> statusReporter = 
createStatusReportWithItems();
-        statusReporter.processed(new Integer[]{1, 2, 3, 6, 5});
-
-        assertEquals(java.util.Optional.of(300).get(), statusReporter.ack());
-    }
-
-    @Test
-    public void producedSetDisjointWithAckSet() {
-        StatusReporter<Integer, Integer> statusReporter = new 
StatusReporter(100);
-        statusReporter.produced(11, 1000);
-        statusReporter.produced(12, 2000);
-        statusReporter.produced(13, 3000);
-
-        statusReporter.processed(new Integer[]{1, 11, 12, 13});
-
-        assertEquals(java.util.Optional.of(3000).get(), statusReporter.ack());
-    }
-
-    @Test
-    public void missingAck() throws InterruptedException {
-        StatusReporter<Integer, Integer> statusReporter = 
createStatusReportWithItems(2, 3, 4);
-
-        assertNull(statusReporter.ack());
-        Thread.sleep(1002);
-        assertEquals(java.util.Optional.of(100).get(), statusReporter.ack());
-    }
-
-    private StatusReporter<Integer, Integer> 
createStatusReportWithItems(Integer... processed) {
-        StatusReporter<Integer, Integer> statusReporter = new 
StatusReporter(1000);
-        statusReporter.produced(1, 100);
-        statusReporter.produced(2, 200);
-        statusReporter.produced(3, 300);
-        statusReporter.produced(4, 400);
-        statusReporter.produced(5, 500);
-        statusReporter.produced(6, 600);
-
-        statusReporter.processed(processed);
-
-        return statusReporter;
-    }
-}
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java
deleted file mode 100644
index d191d8c..0000000
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.impexp;
-
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasExportResult;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.typedef.AtlasTypesDef;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-public class ZipDirectTest {
-    @Test(expectedExceptions = AtlasBaseException.class)
-    public void loadFileEmpty() throws IOException, AtlasBaseException {
-        InputStream inputStream = 
ZipFileResourceTestUtils.getFileInputStream("zip-direct-1.zip");
-        new ZipSourceDirect(inputStream, 1);
-    }
-
-    @Test
-    public void loadFile() throws IOException, AtlasBaseException {
-        final int EXPECTED_ENTITY_COUNT = 3434;
-
-        InputStream inputStream = 
ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip");
-        ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, 
EXPECTED_ENTITY_COUNT);
-
-        assertNotNull(zipSourceDirect);
-        assertNotNull(zipSourceDirect.getTypesDef());
-        assertTrue(zipSourceDirect.getTypesDef().getEntityDefs().size() > 0);
-        assertNotNull(zipSourceDirect.getExportResult());
-
-        int count = 0;
-        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
-        while((entityWithExtInfo = zipSourceDirect.getNextEntityWithExtInfo()) 
!= null) {
-            assertNotNull(entityWithExtInfo);
-            count++;
-        }
-
-        assertEquals(count, EXPECTED_ENTITY_COUNT);
-    }
-}
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
index 27a6668..0ffc3d5 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
@@ -317,9 +317,7 @@ public class ZipFileResourceTestUtils {
     }
 
     public static AtlasImportRequest getDefaultImportRequest() {
-        AtlasImportRequest atlasImportRequest = new AtlasImportRequest();
-        atlasImportRequest.setOption("migration", "true");
-        return atlasImportRequest;
+        return new AtlasImportRequest();
     }
 
 
@@ -338,8 +336,7 @@ public class ZipFileResourceTestUtils {
         final String hostName = "localhost";
         final String userName = "admin";
 
-        AtlasImportRequest request = getDefaultImportRequest();
-        AtlasImportResult result = runImportWithParameters(importService, 
request, inputStream);
+        AtlasImportResult result = importService.run(inputStream, userName, 
hostName, requestingIP);
         assertEquals(result.getOperationStatus(), 
AtlasImportResult.OperationStatus.SUCCESS);
         return result;
     }
diff --git a/repository/src/test/resources/zip-direct-1.zip 
b/repository/src/test/resources/zip-direct-1.zip
deleted file mode 100644
index 15cb0ec..0000000
Binary files a/repository/src/test/resources/zip-direct-1.zip and /dev/null 
differ
diff --git a/repository/src/test/resources/zip-direct-2.zip 
b/repository/src/test/resources/zip-direct-2.zip
deleted file mode 100644
index e7b8617..0000000
Binary files a/repository/src/test/resources/zip-direct-2.zip and /dev/null 
differ

Reply via email to