This is an automated email from the ASF dual-hosted git repository. nixon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 024f5d52d0a972616362ac9b15ed7a417384026f Author: nixonrodrigues <ni...@apache.org> AuthorDate: Wed Feb 26 14:51:37 2020 +0530 Revert "ATLAS-3320: Import Service. Support concurrent ingest." This reverts commit a2ccfb9f3577e911103041d8d4b91c169697f6a1. --- .../repository/graphdb/janus/AtlasJanusGraph.java | 2 +- .../java/org/apache/atlas/AtlasConfiguration.java | 1 - .../atlas/model/impexp/AtlasImportRequest.java | 43 +--- .../java/org/apache/atlas/pc/WorkItemConsumer.java | 11 +- .../java/org/apache/atlas/pc/WorkItemManager.java | 9 +- .../apache/atlas/GraphTransactionInterceptor.java | 4 - .../atlas/repository/impexp/AuditsWriter.java | 3 +- .../atlas/repository/impexp/ImportService.java | 22 +- .../repository/impexp/ZipExportFileNames.java | 4 - .../atlas/repository/impexp/ZipSourceDirect.java | 269 --------------------- .../migration/ZipFileMigrationImporter.java | 58 +---- .../repository/patches/UniqueAttributePatch.java | 4 +- .../repository/store/graph/AtlasEntityStore.java | 8 - .../store/graph/v2/AtlasEntityStoreV2.java | 11 +- .../store/graph/v2/AtlasRelationshipStoreV2.java | 4 - .../store/graph/v2/BulkImporterImpl.java | 228 +++++++++++++---- .../store/graph/v2/EntityGraphMapper.java | 41 +--- .../store/graph/v2/bulkimport/ImportStrategy.java | 28 --- .../store/graph/v2/bulkimport/MigrationImport.java | 122 ---------- .../store/graph/v2/bulkimport/RegularImport.java | 261 -------------------- .../graph/v2/bulkimport/pc/EntityConsumer.java | 213 ---------------- .../v2/bulkimport/pc/EntityConsumerBuilder.java | 50 ---- .../v2/bulkimport/pc/EntityCreationManager.java | 130 ---------- .../graph/v2/bulkimport/pc/StatusReporter.java | 131 ---------- .../atlas/repository/impexp/ImportServiceTest.java | 16 -- .../repository/impexp/MigrationImportTest.java | 77 ------ .../repository/impexp/StatusReporterTest.java | 99 -------- .../atlas/repository/impexp/ZipDirectTest.java | 61 ----- .../impexp/ZipFileResourceTestUtils.java | 7 +- repository/src/test/resources/zip-direct-1.zip | Bin 22 -> 0 bytes repository/src/test/resources/zip-direct-2.zip | Bin 1720553 -> 0 bytes 31 files changed, 227 insertions(+), 1690 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java index 0176ba7..4acb371 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java @@ -116,7 +116,7 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE } } - janusGraph = (StandardJanusGraph) graphInstance; + janusGraph = (StandardJanusGraph) AtlasJanusGraphDatabase.getGraphInstance(); } @Override diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index f8d7f8c..1a0d0cc 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -64,7 +64,6 @@ public enum AtlasConfiguration { CUSTOM_ATTRIBUTE_VALUE_MAX_LENGTH("atlas.custom.attribute.value.max.length", 500), LABEL_MAX_LENGTH("atlas.entity.label.max.length", 50), IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""), - MIGRATION_IMPORT_START_POSITION("atlas.migration.import.start.position", 0), LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false); private static final Configuration APPLICATION_PROPERTIES; diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java index 0ad3673..0b3ede9 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java @@ -44,16 +44,10 @@ public class AtlasImportRequest implements Serializable { public static final String TRANSFORMS_KEY = "transforms"; public static final String TRANSFORMERS_KEY = "transformers"; public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom"; - public static final String OPTION_KEY_MIGRATION = "migration"; - public static final String OPTION_KEY_NUM_WORKERS = "numWorkers"; - public static final String OPTION_KEY_BATCH_SIZE = "batchSize"; - public static final String OPTION_KEY_FORMAT = "format"; - public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect"; - public static final String START_POSITION_KEY = "startPosition"; + private static final String START_POSITION_KEY = "startPosition"; private static final String START_GUID_KEY = "startGuid"; private static final String FILE_NAME_KEY = "fileName"; private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition"; - private static final String OPTION_KEY_STREAM_SIZE = "size"; private Map<String, String> options; @@ -114,7 +108,7 @@ public class AtlasImportRequest implements Serializable { return null; } - return this.options.get(key); + return (String) this.options.get(key); } @JsonIgnore @@ -127,41 +121,10 @@ public class AtlasImportRequest implements Serializable { return isReplicationOptionSet() ? options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY; } - @JsonIgnore - public int getOptionKeyNumWorkers() { - return getOptionsValue(OPTION_KEY_NUM_WORKERS, 1); - } - - @JsonIgnore - public int getOptionKeyBatchSize() { - return getOptionsValue(OPTION_KEY_BATCH_SIZE, 1); - } - - private int getOptionsValue(String optionKeyBatchSize, int defaultValue) { - String optionsValue = getOptionForKey(optionKeyBatchSize); - - return StringUtils.isEmpty(optionsValue) ? - defaultValue : - Integer.valueOf(optionsValue); - } - @JsonAnySetter public void setOption(String key, String value) { if (null == options) { options = new HashMap<>(); } options.put(key, value); - } - - public void setSizeOption(int size) { - setOption(OPTION_KEY_STREAM_SIZE, Integer.toString(size)); - } - - public int getSizeOption() { - if (!this.options.containsKey(OPTION_KEY_STREAM_SIZE)) { - return 1; - } - - return Integer.valueOf(this.options.get(OPTION_KEY_STREAM_SIZE)); - } -} + }} diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java index dd76697..9ba4bf4 100644 --- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java +++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java @@ -21,7 +21,6 @@ package org.apache.atlas.pc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -38,7 +37,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { private final AtomicBoolean isDirty = new AtomicBoolean(false); private final AtomicLong maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS); private CountDownLatch countdownLatch; - private Queue<Object> results; + private BlockingQueue<Object> results; public WorkItemConsumer(BlockingQueue<T> queue) { this.queue = queue; @@ -102,7 +101,11 @@ public abstract class WorkItemConsumer<T> implements Runnable { protected abstract void processItem(T item); protected void addResult(Object value) { - results.add(value); + try { + results.put(value); + } catch (InterruptedException e) { + LOG.error("Interrupted while adding result: {}", value); + } } protected void updateCommitTime(long commitTime) { @@ -115,7 +118,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { this.countdownLatch = countdownLatch; } - public <V> void setResults(Queue<Object> queue) { + public <V> void setResults(BlockingQueue<Object> queue) { this.results = queue; } } diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java index 351421e..a7ba67c 100644 --- a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java +++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java @@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -import java.util.Queue; import java.util.concurrent.*; public class WorkItemManager<T, U extends WorkItemConsumer> { @@ -34,7 +33,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { private final ExecutorService service; private final List<U> consumers = new ArrayList<>(); private CountDownLatch countdownLatch; - private Queue<Object> resultsQueue; + private BlockingQueue<Object> resultsQueue; public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) { this.numWorkers = numWorkers; @@ -50,13 +49,13 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { this(builder, "workItemConsumer", batchSize, numWorkers, false); } - public void setResultsCollection(Queue<Object> resultsQueue) { + public void setResultsCollection(BlockingQueue<Object> resultsQueue) { this.resultsQueue = resultsQueue; } private void createConsumers(WorkItemBuilder builder, int numWorkers, boolean collectResults) { if (collectResults) { - setResultsCollection(new ConcurrentLinkedQueue<>()); + setResultsCollection(new LinkedBlockingQueue<>()); } for (int i = 0; i < numWorkers; i++) { @@ -125,7 +124,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { LOG.info("WorkItemManager: Shutdown done!"); } - public Queue getResults() { + public BlockingQueue getResults() { return this.resultsQueue; } diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java index 57e454a..bbe0dc5 100644 --- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java +++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java @@ -199,10 +199,6 @@ public class GraphTransactionInterceptor implements MethodInterceptor { return cache.get(guid); } - public static void clearCache() { - guidVertexCache.get().clear(); - } - boolean logException(Throwable t) { if (t instanceof AtlasBaseException) { Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode(); diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java index 373921d..55990f7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java @@ -247,8 +247,7 @@ public class AuditsWriter { } updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids, - Constants.ATTR_NAME_REPLICATED_FROM, - (result.getExportResult() != null) ? result.getExportResult().getChangeMarker() : 0); + Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker()); } public void add(String userName, String sourceCluster, long startTime, diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index cd1deab..27001e3 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -92,7 +92,7 @@ public class ImportService { request = new AtlasImportRequest(); } - EntityImportStream source = createZipSource(request, inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); + EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); return run(source, request, userName, hostName, requestingIP); } @@ -248,18 +248,8 @@ public class ImportService { return (int) (endTime - startTime); } - private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException { + private EntityImportStream createZipSource(InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException { try { - if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) { - LOG.info("Migration mode: Detected...", request.getOptions().get("size")); - return getZipDirectEntityImportStream(request, inputStream); - } - - if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) && - request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT) ) { - return getZipDirectEntityImportStream(request, inputStream); - } - if (StringUtils.isEmpty(configuredTemporaryDirectory)) { return new ZipSource(inputStream); } @@ -270,15 +260,9 @@ public class ImportService { } } - private EntityImportStream getZipDirectEntityImportStream(AtlasImportRequest request, InputStream inputStream) throws IOException, AtlasBaseException { - ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, request.getSizeOption()); - LOG.info("Using ZipSourceDirect: Size: {} entities", zipSourceDirect.size()); - return zipSourceDirect; - } - @VisibleForTesting boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest importRequest, AtlasExportRequest exportRequest) { - if (exportRequest == null || CollectionUtils.isEmpty(exportRequest.getItemsToExport())) { + if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) { return false; } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java index 8347b91..351b475 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java @@ -31,8 +31,4 @@ public enum ZipExportFileNames { public String toString() { return this.name; } - - public String toEntryFileName() { - return this.name + ".json"; - } } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java deleted file mode 100644 index 260c4af..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java +++ /dev/null @@ -1,269 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.impexp; - -import org.apache.atlas.entitytransform.BaseEntityHandler; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.impexp.AtlasExportResult; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.typedef.AtlasTypesDef; -import org.apache.atlas.repository.store.graph.v2.EntityImportStream; -import org.apache.atlas.type.AtlasType; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.zip.ZipEntry; -import java.util.zip.ZipInputStream; - -import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP; - -public class ZipSourceDirect implements EntityImportStream { - private static final Logger LOG = LoggerFactory.getLogger(ZipSourceDirect.class); - - private final ZipInputStream zipInputStream; - private int currentPosition; - - private ImportTransforms importTransform; - private List<BaseEntityHandler> entityHandlers; - private AtlasTypesDef typesDef; - private ZipEntry zipEntryNext; - private int streamSize = 1; - - public ZipSourceDirect(InputStream inputStream, int streamSize) throws IOException, AtlasBaseException { - this.zipInputStream = new ZipInputStream(inputStream); - this.streamSize = streamSize; - prepareStreamForFetch(); - } - - @Override - public ImportTransforms getImportTransform() { return this.importTransform; } - - @Override - public void setImportTransform(ImportTransforms importTransform) { - this.importTransform = importTransform; - } - - @Override - public List<BaseEntityHandler> getEntityHandlers() { - return entityHandlers; - } - - @Override - public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) { - this.entityHandlers = entityHandlers; - } - - @Override - public AtlasTypesDef getTypesDef() throws AtlasBaseException { - return this.typesDef; - } - - @Override - public - AtlasExportResult getExportResult() throws AtlasBaseException { - return new AtlasExportResult(); - } - - @Override - public List<String> getCreationOrder() { - return new ArrayList<>(); - } - - @Override - public int getPosition() { - return currentPosition; - } - - @Override - public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String json) throws AtlasBaseException { - if (StringUtils.isEmpty(json)) { - return null; - } - - AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json); - - if (importTransform != null) { - entityWithExtInfo = importTransform.apply(entityWithExtInfo); - } - - if (entityHandlers != null) { - applyTransformers(entityWithExtInfo); - } - - return entityWithExtInfo; - } - - @Override - public boolean hasNext() { - return (this.zipEntryNext != null - && !zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toEntryFileName()) - && !zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toEntryFileName())); - } - - @Override - public AtlasEntity next() { - AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo(); - - return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; - } - - @Override - public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() { - try { - if (hasNext()) { - String json = moveNext(); - return getEntityWithExtInfo(json); - } - } catch (AtlasBaseException e) { - LOG.error("getNextEntityWithExtInfo", e); - } - return null; - } - - @Override - public void reset() { - currentPosition = 0; - } - - @Override - public AtlasEntity getByGuid(String guid) { - try { - return getEntity(guid); - } catch (AtlasBaseException e) { - LOG.error("getByGuid: {} failed!", guid, e); - return null; - } - } - - @Override - public void onImportComplete(String guid) { - } - - @Override - public void setPosition(int index) { - try { - for (int i = 0; i < index; i++) { - moveNextEntry(); - } - } - catch (IOException e) { - LOG.error("Error setting position: {}. Position may be beyond the stream size.", index); - } - } - - @Override - public void setPositionUsingEntityGuid(String guid) { - } - - @Override - public void close() { - } - - private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { - if (entityWithExtInfo == null) { - return; - } - - transform(entityWithExtInfo.getEntity()); - - if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) { - for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { - transform(e); - } - } - } - - private void transform(AtlasEntity e) { - for (BaseEntityHandler handler : entityHandlers) { - handler.transform(e); - } - } - - private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException { - try { - return AtlasType.fromJson(jsonData, clazz); - - } catch (Exception e) { - throw new AtlasBaseException("Error converting file to JSON.", e); - } - } - - private AtlasEntity getEntity(String guid) throws AtlasBaseException { - AtlasEntity.AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid); - return (extInfo != null) ? extInfo.getEntity() : null; - } - - public int size() { - return this.streamSize; - } - - private String moveNext() { - try { - moveNextEntry(); - return getJsonPayloadFromZipEntryStream(this.zipInputStream); - } catch (IOException e) { - LOG.error("moveNext failed!", e); - } - - return null; - } - - private void moveNextEntry() throws IOException { - this.zipEntryNext = this.zipInputStream.getNextEntry(); - this.currentPosition++; - } - - private void prepareStreamForFetch() throws AtlasBaseException, IOException { - moveNextEntry(); - if (this.zipEntryNext == null) { - throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP."); - } - - if (this.zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toEntryFileName())) { - String json = getJsonPayloadFromZipEntryStream(this.zipInputStream); - this.typesDef = AtlasType.fromJson(json, AtlasTypesDef.class); - } - } - - private String getJsonPayloadFromZipEntryStream(ZipInputStream zipInputStream) { - try { - final int BUFFER_LENGTH = 4096; - byte[] buf = new byte[BUFFER_LENGTH]; - - int n = 0; - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - while ((n = zipInputStream.read(buf, 0, BUFFER_LENGTH)) > -1) { - bos.write(buf, 0, n); - } - - return bos.toString(); - } catch (IOException ex) { - LOG.error("Error fetching string from entry!", ex); - } - - return null; - } -} diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java index 69d78cd..ca0bc41 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java @@ -24,7 +24,6 @@ import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.repository.impexp.ImportService; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,20 +32,11 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; -import java.util.zip.ZipFile; - -import static org.apache.atlas.AtlasConfiguration.MIGRATION_IMPORT_START_POSITION; public class ZipFileMigrationImporter implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class); - private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers"; - private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size"; - private static final String DEFAULT_NUMBER_OF_WORKDERS = "4"; - private static final String DEFAULT_BATCH_SIZE = "100"; - private static final String ZIP_FILE_COMMENT = "streamSize"; - - private final static String ENV_USER_NAME = "user.name"; + private static String ENV_USER_NAME = "user.name"; private final ImportService importService; private final String fileToImport; @@ -62,8 +52,7 @@ public class ZipFileMigrationImporter implements Runnable { FileWatcher fileWatcher = new FileWatcher(fileToImport); fileWatcher.start(); - int streamSize = getStreamSizeFromComment(fileToImport); - performImport(new FileInputStream(new File(fileToImport)), streamSize); + performImport(new FileInputStream(new File(fileToImport))); } catch (IOException e) { LOG.error("Migration Import: IO Error!", e); } catch (AtlasBaseException e) { @@ -71,44 +60,19 @@ public class ZipFileMigrationImporter implements Runnable { } } - private int getStreamSizeFromComment(String fileToImport) { - int ret = 1; - try { - ZipFile zipFile = new ZipFile(fileToImport); - String streamSizeComment = zipFile.getComment(); - ret = processZipFileStreamSizeComment(streamSizeComment); - zipFile.close(); - } catch (IOException e) { - LOG.error("Error opening ZIP file: {}", fileToImport, e); - } - - return ret; - } - - private int processZipFileStreamSizeComment(String streamSizeComment) { - if (!StringUtils.isNotEmpty(streamSizeComment) || !StringUtils.startsWith(streamSizeComment, ZIP_FILE_COMMENT)) { - return 1; - } - - String s = StringUtils.substringAfter(streamSizeComment, ":"); - LOG.debug("ZipFileMigrationImporter: streamSize: {}", streamSizeComment); - - return Integer.valueOf(s); - } - - private void performImport(InputStream fs, int streamSize) throws AtlasBaseException { + private void performImport(InputStream fs) throws AtlasBaseException { try { LOG.info("Migration Import: {}: Starting...", fileToImport); RequestContext.get().setUser(getUserNameFromEnvironment(), null); - importService.run(fs, getImportRequest(streamSize), + importService.run(fs, getImportRequest(), getUserNameFromEnvironment(), InetAddress.getLocalHost().getHostName(), InetAddress.getLocalHost().getHostAddress()); } catch (Exception ex) { - LOG.error("Migration Import: Error loading zip for migration!", ex); + LOG.error("Error loading zip for migration", ex); throw new AtlasBaseException(ex); } finally { LOG.info("Migration Import: {}: Done!", fileToImport); @@ -119,16 +83,8 @@ public class ZipFileMigrationImporter implements Runnable { return System.getProperty(ENV_USER_NAME); } - private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException { - AtlasImportRequest request = new AtlasImportRequest(); - - request.setSizeOption(streamSize); - request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true"); - request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKDERS)); - request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE)); - request.setOption(AtlasImportRequest.START_POSITION_KEY, Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt())); - - return request; + private AtlasImportRequest getImportRequest() throws AtlasException { + return new AtlasImportRequest(); } private String getPropertyValue(String property, String defaultValue) throws AtlasException { diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java index bee6378..2b58119 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java @@ -81,9 +81,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler { AtlasGraph graph = getGraph(); for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) { - LOG.info("finding entities of type: {}", entityType.getTypeName()); + LOG.info("finding entities of type {}", entityType.getTypeName()); + Iterable<Object> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType.getTypeName()).vertexIds(); - LOG.info("found entities of type: {}", entityType.getTypeName()); int count = 0; for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java index 805531c..39ea3f8 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java @@ -150,14 +150,6 @@ public interface AtlasEntityStore { EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException; /** - * Create or update entities with parameters necessary for import process without commit. Caller will have to do take care of commit. - * @param entityStream AtlasEntityStream - * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed - * @throws AtlasBaseException - */ - EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException; - - /** * Update a single entity * @param objectId ID of the entity * @param updatedEntityInfo updated entity information diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 6f6ee17..30f5e5a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -332,11 +332,6 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { } @Override - public EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException { - return createOrUpdate(entityStream, false, true, true); - } - - @Override @GraphTransaction public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException { if (LOG.isDebugEnabled()) { @@ -1215,10 +1210,8 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { ret.setGuidAssignments(context.getGuidAssignments()); - if (!RequestContext.get().isImportInProgress()) { - // Notify the change listeners - entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress()); - } + // Notify the change listeners + entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress()); if (LOG.isDebugEnabled()) { LOG.debug("<== createOrUpdate()"); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java index 857b709..fdf117a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java @@ -929,10 +929,6 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore { } private void sendNotifications(AtlasRelationship ret, OperationType relationshipUpdate) throws AtlasBaseException { - if (entityChangeNotifier == null) { - return; - } - entityChangeNotifier.notifyPropagatedEntities(); if (notificationsEnabled){ entityChangeNotifier.notifyRelationshipMutation(ret, relationshipUpdate); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java index 4526002..54c32c5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java @@ -18,30 +18,33 @@ package org.apache.atlas.repository.store.graph.v2; import com.google.common.annotations.VisibleForTesting; +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.repository.graph.AtlasGraphProvider; -import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.BulkImporter; -import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy; -import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport; -import org.apache.atlas.repository.store.graph.v2.bulkimport.RegularImport; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.type.Constants; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.inject.Inject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -52,24 +55,131 @@ public class BulkImporterImpl implements BulkImporter { private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class); private final AtlasEntityStore entityStore; + private final EntityGraphRetriever entityGraphRetriever; private AtlasTypeRegistry typeRegistry; + private final int MAX_ATTEMPTS = 2; + private boolean directoryBasedImportConfigured; @Inject public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { this.entityStore = entityStore; + this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); this.typeRegistry = typeRegistry; + this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); } @Override public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { - ImportStrategy importStrategy = - (importResult.getRequest().getOptions() != null && - importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) - ? new MigrationImport(new AtlasGraphProvider(), this.typeRegistry) - : new RegularImport(this.entityStore, this.typeRegistry); - - LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName()); - return importStrategy.run(entityStream, importResult); + if (LOG.isDebugEnabled()) { + LOG.debug("==> bulkImport()"); + } + + if (entityStream == null || !entityStream.hasNext()) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); + } + + EntityMutationResponse ret = new EntityMutationResponse(); + ret.setGuidAssignments(new HashMap<>()); + + Set<String> processedGuids = new HashSet<>(); + float currentPercent = 0f; + List<String> residualList = new ArrayList<>(); + + EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList); + + while (entityImportStreamWithResidualList.hasNext()) { + AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo(); + AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; + + if (entity == null) { + continue; + } + + for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { + try { + AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null); + EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream); + + if (resp.getGuidAssignments() != null) { + ret.getGuidAssignments().putAll(resp.getGuidAssignments()); + } + + currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, + entityStream.getPosition(), + entityImportStreamWithResidualList.getStreamSize(), + currentPercent); + + entityStream.onImportComplete(entity.getGuid()); + break; + } catch (AtlasBaseException e) { + if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) { + throw e; + } + break; + } catch (AtlasSchemaViolationException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Entity: {}", entity.getGuid(), e); + } + + if (attempt == 0) { + updateVertexGuid(entity); + } else { + LOG.error("Guid update failed: {}", entityWithExtInfo.getEntity().getGuid()); + throw e; + } + } catch (Throwable e) { + AtlasBaseException abe = new AtlasBaseException(e); + if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) { + throw abe; + } + + LOG.warn("Exception: {}", entity.getGuid(), e); + break; + } finally { + RequestContext.get().clearCache(); + } + } + } + + importResult.getProcessedEntities().addAll(processedGuids); + LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size()); + + return ret; + } + + @GraphTransaction + public void updateVertexGuid(AtlasEntity entity) { + String entityGuid = entity.getGuid(); + AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity); + + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + String vertexGuid = null; + try { + vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes()); + } catch (AtlasBaseException e) { + LOG.warn("Entity: {}: Does not exist!", objectId); + return; + } + + if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) { + return; + } + + AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid); + if (v == null) { + return; + } + + addHistoricalGuid(v, vertexGuid); + AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid); + + LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid()); + } + + private void addHistoricalGuid(AtlasVertex v, String vertexGuid) { + String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class); + + AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid)); } @VisibleForTesting @@ -83,16 +193,38 @@ public class BulkImporterImpl implements BulkImporter { return json; } + private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) { + if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) { + return false; + } + + lineageList.add(guid); + + return true; + } + + private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity, + EntityMutationResponse resp, + AtlasImportResult importResult, + Set<String> processedGuids, + int currentIndex, int streamSize, float currentPercent) { + if (!directoryBasedImportConfigured) { + updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); + updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); + updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); + } + + String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid()); + + return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported); + } + @VisibleForTesting - public static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) { + static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) { final double tolerance = 0.000001; final int MAX_PERCENT = 100; int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex; - if (maxSize <= 0) { - return currentPercent; - } - float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize); boolean updateLog = Double.compare(percent, currentPercent) > tolerance; float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent); @@ -104,7 +236,7 @@ public class BulkImporterImpl implements BulkImporter { return updatedPercent; } - public static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { + private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { if (list == null) { return; } @@ -119,37 +251,41 @@ public class BulkImporterImpl implements BulkImporter { } } - public static void updateVertexGuid(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever, AtlasEntity entity) { - String entityGuid = entity.getGuid(); - AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity); + private static class EntityImportStreamWithResidualList { + private final EntityImportStream stream; + private final List<String> residualList; + private boolean navigateResidualList; + private int currentResidualListIndex; - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); - String vertexGuid = null; - try { - vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes()); - } catch (AtlasBaseException e) { - LOG.warn("Entity: {}: Does not exist!", objectId); - return; - } - if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) { - return; + public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) { + this.stream = stream; + this.residualList = residualList; + this.navigateResidualList = false; + this.currentResidualListIndex = 0; } - AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid); - if (v == null) { - return; + public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() { + if (navigateResidualList == false) { + return stream.getNextEntityWithExtInfo(); + } else { + stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++)); + return stream.getNextEntityWithExtInfo(); + } } - addHistoricalGuid(v, vertexGuid); - AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid); - - LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid()); - } - - public static void addHistoricalGuid(AtlasVertex v, String vertexGuid) { - String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class); + public boolean hasNext() { + if (!navigateResidualList) { + boolean streamHasNext = stream.hasNext(); + navigateResidualList = (streamHasNext == false); + return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size()); + } else { + return (currentResidualListIndex < residualList.size()); + } + } - AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid)); + public int getStreamSize() { + return stream.size() + residualList.size(); + } } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index e76b341..2f3aad0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -361,9 +361,7 @@ public class EntityGraphMapper { updateLabels(vertex, labels); - if (entityChangeNotifier != null) { - entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), addedLabels, removedLabels); - } + entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), addedLabels, removedLabels); } public void addLabels(AtlasVertex vertex, Set<String> labels) throws AtlasBaseException { @@ -380,10 +378,7 @@ public class EntityGraphMapper { if (!updatedLabels.equals(existingLabels)) { updateLabels(vertex, updatedLabels); updatedLabels.removeAll(existingLabels); - - if (entityChangeNotifier != null) { - entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), updatedLabels, null); - } + entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), updatedLabels, null); } } } @@ -400,10 +395,7 @@ public class EntityGraphMapper { if (!updatedLabels.equals(existingLabels)) { updateLabels(vertex, updatedLabels); existingLabels.removeAll(updatedLabels); - - if (entityChangeNotifier != null) { - entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), null, existingLabels); - } + entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), null, existingLabels); } } } @@ -1956,9 +1948,7 @@ public class EntityGraphMapper { Set<AtlasVertex> vertices = addedClassifications.get(classification); List<AtlasEntity> propagatedEntities = updateClassificationText(classification, vertices); - if (entityChangeNotifier != null) { - entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification)); - } + entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification)); } RequestContext.get().endMetricRecord(metric); @@ -2066,10 +2056,7 @@ public class EntityGraphMapper { AtlasEntity entity = updateClassificationText(entry.getKey()); List<AtlasClassification> deletedClassificationNames = entry.getValue(); - - if (entityChangeNotifier != null) { - entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames); - } + entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames); } } @@ -2296,19 +2283,17 @@ public class EntityGraphMapper { notificationVertices.addAll(entitiesToPropagateTo); } - if (entityChangeNotifier != null) { - for (AtlasVertex vertex : notificationVertices) { - String entityGuid = GraphHelper.getGuid(vertex); - AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES); + for (AtlasVertex vertex : notificationVertices) { + String entityGuid = GraphHelper.getGuid(vertex); + AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES); - if (isActive(entity)) { - vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity)); - entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications); - } + if (isActive(entity)) { + vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity)); + entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications); } } - if (entityChangeNotifier != null && MapUtils.isNotEmpty(removedPropagations)) { + if (MapUtils.isNotEmpty(removedPropagations)) { for (AtlasClassification classification : removedPropagations.keySet()) { List<AtlasVertex> propagatedVertices = removedPropagations.get(classification); List<AtlasEntity> propagatedEntities = updateClassificationText(classification, propagatedVertices); @@ -2541,7 +2526,7 @@ public class EntityGraphMapper { private List<AtlasEntity> updateClassificationText(AtlasClassification classification, Collection<AtlasVertex> propagatedVertices) throws AtlasBaseException { List<AtlasEntity> propagatedEntities = new ArrayList<>(); - if (fullTextMapperV2 != null && CollectionUtils.isNotEmpty(propagatedVertices)) { + if(CollectionUtils.isNotEmpty(propagatedVertices)) { for(AtlasVertex vertex : propagatedVertices) { AtlasEntity entity = instanceConverter.getAndCacheEntity(GraphHelper.getGuid(vertex), ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java deleted file mode 100644 index 6b70eab..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.store.graph.v2.bulkimport; - -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.impexp.AtlasImportResult; -import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.repository.store.graph.v2.EntityImportStream; - -public abstract class ImportStrategy { - public abstract EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException; -} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java deleted file mode 100644 index 8c66656..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.store.graph.v2.bulkimport; - -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.impexp.AtlasImportResult; -import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.repository.converters.AtlasFormatConverters; -import org.apache.atlas.repository.converters.AtlasInstanceConverter; -import org.apache.atlas.repository.graph.AtlasGraphProvider; -import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; -import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate; -import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; -import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2; -import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; -import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; -import org.apache.atlas.repository.store.graph.v2.EntityImportStream; -import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityConsumerBuilder; -import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityCreationManager; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MigrationImport extends ImportStrategy { - private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class); - - private final AtlasTypeRegistry typeRegistry; - private AtlasGraph atlasGraph; - private EntityGraphRetriever entityGraphRetriever; - private EntityGraphMapper entityGraphMapper; - private AtlasEntityStore entityStore; - - public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) { - this.typeRegistry = typeRegistry; - setupEntityStore(atlasGraphProvider, typeRegistry); - LOG.info("MigrationImport: Using bulkLoading..."); - } - - public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { - if (entityStream == null || !entityStream.hasNext()) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); - } - - if (importResult.getRequest() == null) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request"); - } - - int index = 0; - int streamSize = entityStream.size(); - EntityMutationResponse ret = new EntityMutationResponse(); - EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, streamSize); - - try { - LOG.info("Migration Import: Size: {}: Starting...", streamSize); - index = creationManager.read(entityStream); - creationManager.drain(); - creationManager.extractResults(); - } catch (Exception ex) { - LOG.error("Migration Import: Error: Current position: {}", index, ex); - } finally { - shutdownEntityCreationManager(creationManager); - } - - LOG.info("Migration Import: Size: {}: Done!", streamSize); - return ret; - } - - private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult, int streamSize) { - int batchSize = importResult.getRequest().getOptionKeyBatchSize(); - int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers()); - - EntityConsumerBuilder consumerBuilder = - new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize); - - return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, streamSize); - } - - private static int getNumWorkers(int numWorkersFromOptions) { - int ret = (numWorkersFromOptions > 0) ? numWorkersFromOptions : 1; - LOG.info("Migration Import: Setting numWorkers: {}", ret); - return ret; - } - - private void setupEntityStore(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) { - this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); - this.atlasGraph = atlasGraphProvider.getBulkLoading(); - DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(typeRegistry); - - AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(typeRegistry, deleteDelegate, null); - AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry); - AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(typeRegistry, formatConverters); - this.entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, atlasGraph, relationshipStore, null, instanceConverter, null); - this.entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, null, entityGraphMapper); - } - - private void shutdownEntityCreationManager(EntityCreationManager creationManager) { - try { - creationManager.shutdown(); - } catch (InterruptedException e) { - LOG.error("Migration Import: Shutdown: Interrupted!", e); - } - } -} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java deleted file mode 100644 index 4cc8ed4..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.store.graph.v2.bulkimport; - - -import com.google.common.annotations.VisibleForTesting; -import org.apache.atlas.AtlasConfiguration; -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.RequestContext; -import org.apache.atlas.annotation.GraphTransaction; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.impexp.AtlasImportResult; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; -import org.apache.atlas.model.instance.AtlasEntityHeader; -import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; -import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport; -import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; -import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; -import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; -import org.apache.atlas.repository.store.graph.v2.EntityImportStream; -import org.apache.atlas.type.AtlasEntityType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import static org.apache.atlas.repository.Constants.HISTORICAL_GUID_PROPERTY_KEY; -import static org.apache.atlas.repository.store.graph.v2.BulkImporterImpl.updateImportProgress; - -public class RegularImport extends ImportStrategy { - private static final Logger LOG = LoggerFactory.getLogger(RegularImport.class); - private static final int MAX_ATTEMPTS = 3; - private final AtlasEntityStore entityStore; - private final AtlasTypeRegistry typeRegistry; - private final EntityGraphRetriever entityGraphRetriever; - private boolean directoryBasedImportConfigured; - - public RegularImport(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { - this.entityStore = entityStore; - this.typeRegistry = typeRegistry; - this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); - this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); - } - - @Override - public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> bulkImport()"); - } - - if (entityStream == null || !entityStream.hasNext()) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); - } - - EntityMutationResponse ret = new EntityMutationResponse(); - ret.setGuidAssignments(new HashMap<>()); - - Set<String> processedGuids = new HashSet<>(); - float currentPercent = 0f; - List<String> residualList = new ArrayList<>(); - - EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList); - - while (entityImportStreamWithResidualList.hasNext()) { - AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo(); - AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; - - if (entity == null) { - continue; - } - - for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { - try { - AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null); - EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream); - - if (resp.getGuidAssignments() != null) { - ret.getGuidAssignments().putAll(resp.getGuidAssignments()); - } - - currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, - entityStream.getPosition(), - entityImportStreamWithResidualList.getStreamSize(), - currentPercent); - - entityStream.onImportComplete(entity.getGuid()); - break; - } catch (AtlasBaseException e) { - if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) { - throw e; - } - break; - } catch (AtlasSchemaViolationException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Entity: {}", entity.getGuid(), e); - } - - if (attempt == 0) { - updateVertexGuid(entity); - } else { - LOG.error("Guid update failed: {}", entityWithExtInfo.getEntity().getGuid()); - throw e; - } - } catch (Throwable e) { - AtlasBaseException abe = new AtlasBaseException(e); - if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) { - throw abe; - } - - LOG.warn("Exception: {}", entity.getGuid(), e); - break; - } finally { - RequestContext.get().clearCache(); - } - } - } - - importResult.getProcessedEntities().addAll(processedGuids); - LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size()); - - return ret; - } - - @GraphTransaction - public void updateVertexGuid(AtlasEntity entity) { - String entityGuid = entity.getGuid(); - AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity); - - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); - String vertexGuid = null; - try { - vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes()); - } catch (AtlasBaseException e) { - LOG.warn("Entity: {}: Does not exist!", objectId); - return; - } - - if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) { - return; - } - - AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid); - if (v == null) { - return; - } - - addHistoricalGuid(v, vertexGuid); - AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid); - - LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid()); - } - - private void addHistoricalGuid(AtlasVertex v, String vertexGuid) { - String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class); - - AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid)); - } - - @VisibleForTesting - static String getJsonArray(String json, String vertexGuid) { - String quotedGuid = String.format("\"%s\"", vertexGuid); - if (StringUtils.isEmpty(json)) { - json = String.format("[%s]", quotedGuid); - } else { - json = json.replace("]", "").concat(",").concat(quotedGuid).concat("]"); - } - return json; - } - - private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) { - if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) { - return false; - } - - lineageList.add(guid); - - return true; - } - - private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity, - EntityMutationResponse resp, - AtlasImportResult importResult, - Set<String> processedGuids, - int currentIndex, int streamSize, float currentPercent) { - if (!directoryBasedImportConfigured) { - BulkImporterImpl.updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); - BulkImporterImpl.updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); - BulkImporterImpl.updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); - } - - String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid()); - - return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported); - } - - private static class EntityImportStreamWithResidualList { - private final EntityImportStream stream; - private final List<String> residualList; - private boolean navigateResidualList; - private int currentResidualListIndex; - - - public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) { - this.stream = stream; - this.residualList = residualList; - this.navigateResidualList = false; - this.currentResidualListIndex = 0; - } - - public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() { - if (navigateResidualList == false) { - return stream.getNextEntityWithExtInfo(); - } else { - stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++)); - return stream.getNextEntityWithExtInfo(); - } - } - - public boolean hasNext() { - if (!navigateResidualList) { - boolean streamHasNext = stream.hasNext(); - navigateResidualList = (streamHasNext == false); - return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size()); - } else { - return (currentResidualListIndex < residualList.size()); - } - } - - public int getStreamSize() { - return stream.size() + residualList.size(); - } - } -} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java deleted file mode 100644 index bb74205..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.repository.store.graph.v2.bulkimport.pc; - -import org.apache.atlas.GraphTransactionInterceptor; -import org.apache.atlas.RequestContext; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntityHeader; -import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.pc.WorkItemConsumer; -import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; -import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport; -import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; -import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicLong; - -public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWithExtInfo> { - private static final Logger LOG = LoggerFactory.getLogger(EntityConsumer.class); - private static final int MAX_COMMIT_RETRY_COUNT = 3; - - private final int batchSize; - private AtomicLong counter = new AtomicLong(1); - private AtomicLong currentBatch = new AtomicLong(1); - - private final AtlasGraph atlasGraph; - private final AtlasEntityStore entityStoreV2; - private final AtlasTypeRegistry typeRegistry; - private final EntityGraphRetriever entityGraphRetriever; - - private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>(); - private List<EntityMutationResponse> localResults = new ArrayList<>(); - - public EntityConsumer(AtlasGraph atlasGraph, AtlasEntityStore entityStore, - EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry, - BlockingQueue queue, int batchSize) { - super(queue); - - this.atlasGraph = atlasGraph; - this.entityStoreV2 = entityStore; - this.entityGraphRetriever = entityGraphRetriever; - this.typeRegistry = typeRegistry; - this.batchSize = batchSize; - } - - @Override - protected void processItem(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { - int delta = (MapUtils.isEmpty(entityWithExtInfo.getReferredEntities()) - ? 1 - : entityWithExtInfo.getReferredEntities().size()) + 1; - - long currentCount = counter.addAndGet(delta); - currentBatch.addAndGet(delta); - entityBuffer.add(entityWithExtInfo); - - try { - processEntity(entityWithExtInfo, currentCount); - attemptCommit(); - } catch (Exception e) { - LOG.info("Data loss: Please re-submit!", e); - } - } - - private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) { - try { - RequestContext.get().setImportInProgress(true); - AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null); - - LOG.debug("Processing: {}", currentCount); - EntityMutationResponse result = entityStoreV2.createOrUpdateForImportNoCommit(oneEntityStream); - localResults.add(result); - } catch (AtlasBaseException e) { - addResult(entityWithExtInfo.getEntity().getGuid()); - LOG.warn("Exception: {}", entityWithExtInfo.getEntity().getGuid(), e); - } catch (AtlasSchemaViolationException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Entity: {}", entityWithExtInfo.getEntity().getGuid(), e); - } - - BulkImporterImpl.updateVertexGuid(typeRegistry, entityGraphRetriever, entityWithExtInfo.getEntity()); - } - } - - private void attemptCommit() { - if (currentBatch.get() < batchSize) { - return; - } - - doCommit(); - } - - @Override - protected void doCommit() { - for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) { - if (commitWithRetry(retryCount)) { - return; - } - } - - LOG.error("Retries exceeded! Potential data loss! Please correct data and re-attempt. Buffer: {}: Counter: {}", entityBuffer.size(), counter.get()); - clear(); - } - - @Override - protected void commitDirty() { - super.commitDirty(); - LOG.info("Total: Commit: {}", counter.get()); - counter.set(0); - } - - private boolean commitWithRetry(int retryCount) { - try { - atlasGraph.commit(); - if (LOG.isDebugEnabled()) { - LOG.debug("Commit: Done!: Buffer: {}: Batch: {}: Counter: {}", entityBuffer.size(), currentBatch.get(), counter.get()); - } - - dispatchResults(); - return true; - } catch (Exception ex) { - rollbackPauseRetry(retryCount, ex); - return false; - } - } - - private void rollbackPauseRetry(int retryCount, Exception ex) { - atlasGraph.rollback(); - clearCache(); - - LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount); - pause(retryCount); - if (ex.getClass().getName().endsWith("JanusGraphException") && retryCount >= MAX_COMMIT_RETRY_COUNT) { - LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex); - } else { - LOG.info("Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount); - } - retryProcessEntity(retryCount); - } - - private void retryProcessEntity(int retryCount) { - LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount); - for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) { - processEntity(e, counter.get()); - } - LOG.info("Replaying: Done!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount); - } - - private void dispatchResults() { - localResults.stream().forEach(x -> { - addResultsFromResponse(x.getCreatedEntities()); - addResultsFromResponse(x.getUpdatedEntities()); - addResultsFromResponse(x.getDeletedEntities()); - }); - - clear(); - } - - private void pause(int retryCount) { - try { - Thread.sleep(1000 * retryCount); - } catch (InterruptedException e) { - LOG.error("pause: Interrupted!", e); - } - } - - private void addResultsFromResponse(List<AtlasEntityHeader> entities) { - if (CollectionUtils.isEmpty(entities)) { - return; - } - - for (AtlasEntityHeader eh : entities) { - addResult(eh.getGuid()); - } - } - - private void clear() { - localResults.clear(); - entityBuffer.clear(); - clearCache(); - currentBatch.set(0); - } - - private void clearCache() { - GraphTransactionInterceptor.clearCache(); - RequestContext.get().clearCache(); - } -} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java deleted file mode 100644 index 69d33b2..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.store.graph.v2.bulkimport.pc; - -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.pc.WorkItemBuilder; -import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; -import org.apache.atlas.type.AtlasTypeRegistry; - -import java.util.concurrent.BlockingQueue; - -public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, AtlasEntity.AtlasEntityWithExtInfo> { - private AtlasGraph atlasGraph; - private AtlasEntityStore entityStore; - private final EntityGraphRetriever entityGraphRetriever; - private final AtlasTypeRegistry typeRegistry; - private int batchSize; - - public EntityConsumerBuilder(AtlasGraph atlasGraph, AtlasEntityStore entityStore, - EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry, int batchSize) { - this.atlasGraph = atlasGraph; - this.entityStore = entityStore; - this.entityGraphRetriever = entityGraphRetriever; - this.typeRegistry = typeRegistry; - this.batchSize = batchSize; - } - - @Override - public EntityConsumer build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) { - return new EntityConsumer(atlasGraph, entityStore, entityGraphRetriever, typeRegistry, queue, this.batchSize); - } -} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java deleted file mode 100644 index 0051941..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.store.graph.v2.bulkimport.pc; - -import org.apache.atlas.model.impexp.AtlasImportResult; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.pc.WorkItemBuilder; -import org.apache.atlas.pc.WorkItemManager; -import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; -import org.apache.atlas.repository.store.graph.v2.EntityImportStream; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager { - private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class); - private static final String WORKER_PREFIX = "migration-import"; - - private final StatusReporter<String, String> statusReporter; - private final AtlasImportResult importResult; - private final int streamSize; - private final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min - private String currentTypeName; - private float currentPercent; - - public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, int streamSize) { - super(builder, WORKER_PREFIX, batchSize, numWorkers, true); - this.importResult = importResult; - this.streamSize = streamSize; - - this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION); - } - - public int read(EntityImportStream entityStream) { - int currentIndex = 0; - AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo; - while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) { - AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; - if (entity == null) { - continue; - } - - try { - produce(currentIndex++, entity.getTypeName(), entityWithExtInfo); - } catch (Throwable e) { - LOG.warn("Exception: {}", entity.getGuid(), e); - break; - } - } - return currentIndex; - } - - private void produce(int currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { - String previousTypeName = getCurrentTypeName(); - - if (StringUtils.isNotEmpty(typeName) - && StringUtils.isNotEmpty(previousTypeName) - && !StringUtils.equals(previousTypeName, typeName)) { - LOG.info("Waiting: '{}' to complete...", previousTypeName); - super.drain(); - LOG.info("Switching entity type processing: From: '{}' To: '{}'...", previousTypeName, typeName); - } - - setCurrentTypeName(typeName); - statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(), currentIndex)); - super.checkProduce(entityWithExtInfo); - extractResults(); - } - - public void extractResults() { - Object result; - while (((result = getResults().poll())) != null) { - statusReporter.processed((String) result); - } - - logStatus(); - } - - private void logStatus() { - String ack = statusReporter.ack(); - if (StringUtils.isEmpty(ack)) { - return; - } - - String[] split = ack.split(":"); - if (split.length == 0 || split.length < 2) { - return; - } - - importResult.incrementMeticsCounter(split[0]); - this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), getStreamSize(), getCurrentPercent()); - } - - private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) { - String lastEntityImported = String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex); - return BulkImporterImpl.updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported); - } - - private String getCurrentTypeName() { - return this.currentTypeName; - } - - private void setCurrentTypeName(String typeName) { - this.currentTypeName = typeName; - } - - private float getCurrentPercent() { - return this.currentPercent; - } - - private int getStreamSize() { - return this.streamSize; - } -} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java deleted file mode 100644 index 1cd9860..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.store.graph.v2.bulkimport.pc; - -import org.apache.atlas.v1.typesystem.types.utils.TypesUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; - -public class StatusReporter<T, U> { - private static final Logger LOG = LoggerFactory.getLogger(StatusReporter.class); - - private Map<T,U> producedItems = new LinkedHashMap<>(); - private Set<T> processedSet = new HashSet<>(); - private TypesUtil.Pair<T, Long> watchedItem; - private final long timeOut; - - public StatusReporter(long timeOut) { - this.timeOut = timeOut; - } - - public void produced(T item, U index) { - this.producedItems.put(item, index); - } - - public void processed(T item) { - this.processedSet.add(item); - } - - public void processed(T[] index) { - this.processedSet.addAll(Arrays.asList(index)); - } - - public U ack() { - U ack = null; - U ret; - Map.Entry<T, U> firstElement; - do { - firstElement = getFirstElement(this.producedItems); - ret = completionIndex(firstElement); - if (ret != null) { - ack = ret; - } - } while(ret != null); - - return addToWatchIfNeeded(ack, firstElement); - } - - private U addToWatchIfNeeded(U ack, Map.Entry<T, U> firstElement) { - if (ack == null && firstElement != null) { - ack = addToWatch(firstElement.getKey()); - } else { - resetWatchItem(); - } - return ack; - } - - private void resetWatchItem() { - this.watchedItem = null; - } - - private U addToWatch(T key) { - createNewWatchItem(key); - if (!hasTimedOut(this.watchedItem)) { - return null; - } - - T producedItemKey = this.watchedItem.left; - resetWatchItem(); - LOG.warn("Item: {}: Was produced but not successfully processed!", producedItemKey); - return this.producedItems.get(producedItemKey); - - } - - private void createNewWatchItem(T key) { - if (this.watchedItem != null) { - return; - } - - this.watchedItem = new TypesUtil.Pair<T, Long>(key, System.currentTimeMillis()); - } - - private boolean hasTimedOut(TypesUtil.Pair<T, Long> watchedItem) { - if (watchedItem == null) { - return false; - } - - return (System.currentTimeMillis() - watchedItem.right) >= timeOut; - } - - private Map.Entry<T, U> getFirstElement(Map<T, U> map) { - if (map.isEmpty()) { - return null; - } - - return map.entrySet().iterator().next(); - } - - private U completionIndex(Map.Entry<T, U> lookFor) { - U ack = null; - if (lookFor == null || !processedSet.contains(lookFor.getKey())) { - return ack; - } - - ack = lookFor.getValue(); - producedItems.remove(lookFor.getKey()); - processedSet.remove(lookFor); - return ack; - } -} diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java index 759be64..c14850f 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java @@ -136,11 +136,6 @@ public class ImportServiceTest extends ExportImportTestBase { return getZipSource("dup_col_deleted.zip"); } - @DataProvider(name = "zipDirect1") - public static Object[][] getZipDirect(ITestContext context) throws IOException, AtlasBaseException { - return getZipSource("dup_col_deleted.zip"); - } - @Test(dataProvider = "sales") public void importDB1(InputStream inputStream) throws AtlasBaseException, IOException { loadBaseModel(); @@ -535,17 +530,6 @@ public class ImportServiceTest extends ExportImportTestBase { } } - @Test(dataProvider = "zipDirect1") - public void zipSourceDirect(InputStream inputStream) throws IOException, AtlasBaseException { - loadBaseModel(); - loadFsModel(); - loadHiveModel(); - - runImportWithNoParameters(importService, inputStream); - - } - - private AtlasImportRequest getImportRequest(String replicatedFrom){ AtlasImportRequest importRequest = getDefaultImportRequest(); diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java deleted file mode 100644 index 2a22d88..0000000 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.impexp; - - -import com.google.inject.Inject; -import org.apache.atlas.TestModules; -import org.apache.atlas.discovery.EntityDiscoveryService; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.impexp.AtlasImportRequest; -import org.apache.atlas.model.impexp.AtlasImportResult; -import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport; -import org.apache.atlas.store.AtlasTypeDefStore; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.testng.annotations.Guice; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.io.InputStream; - -import static org.testng.Assert.assertNotNull; - -@Guice(modules = TestModules.TestOnlyModule.class) -public class MigrationImportTest extends ExportImportTestBase { - - private final ImportService importService; - - @Inject - AtlasTypeRegistry typeRegistry; - - @Inject - private AtlasTypeDefStore typeDefStore; - - @Inject - private EntityDiscoveryService discoveryService; - - @Inject - AtlasEntityStore entityStore; - - @Inject - AtlasGraph atlasGraph; - - @Inject - public MigrationImportTest(ImportService importService) { - this.importService = importService; - } - - @Test - public void simpleImport() throws IOException, AtlasBaseException { - InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip"); - - AtlasImportRequest importRequest = new AtlasImportRequest(); - importRequest.setOption("migration", "true"); - - AtlasImportResult result = importService.run(inputStream, importRequest, null, null, null); - assertNotNull(result); - } -} diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java deleted file mode 100644 index 5e15023..0000000 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.impexp; - -import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.StatusReporter; -import org.testng.annotations.Test; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNull; - -public class StatusReporterTest { - @Test - public void noneProducedNoneReported() { - StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>(100); - assertNull(statusReporter.ack()); - } - - @Test - public void producedButNotAcknowledged() { - StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems(); - assertNull(statusReporter.ack()); - } - - @Test - public void producedAcknowledged() { - StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems(); - statusReporter.processed(1); - - assertEquals(java.util.Optional.of(100).get(), statusReporter.ack()); - } - - @Test - public void producedAcknowledgeMaxAvailableInSequence() { - StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems(); - - statusReporter.processed(new Integer[]{1, 3, 5}); - - assertEquals(java.util.Optional.of(100).get(), statusReporter.ack()); - } - - @Test - public void producedAcknowledgeMaxAvailableInSequence2() { - StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems(); - statusReporter.processed(new Integer[]{1, 2, 3, 6, 5}); - - assertEquals(java.util.Optional.of(300).get(), statusReporter.ack()); - } - - @Test - public void producedSetDisjointWithAckSet() { - StatusReporter<Integer, Integer> statusReporter = new StatusReporter(100); - statusReporter.produced(11, 1000); - statusReporter.produced(12, 2000); - statusReporter.produced(13, 3000); - - statusReporter.processed(new Integer[]{1, 11, 12, 13}); - - assertEquals(java.util.Optional.of(3000).get(), statusReporter.ack()); - } - - @Test - public void missingAck() throws InterruptedException { - StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems(2, 3, 4); - - assertNull(statusReporter.ack()); - Thread.sleep(1002); - assertEquals(java.util.Optional.of(100).get(), statusReporter.ack()); - } - - private StatusReporter<Integer, Integer> createStatusReportWithItems(Integer... processed) { - StatusReporter<Integer, Integer> statusReporter = new StatusReporter(1000); - statusReporter.produced(1, 100); - statusReporter.produced(2, 200); - statusReporter.produced(3, 300); - statusReporter.produced(4, 400); - statusReporter.produced(5, 500); - statusReporter.produced(6, 600); - - statusReporter.processed(processed); - - return statusReporter; - } -} diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java deleted file mode 100644 index d191d8c..0000000 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.repository.impexp; - -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.impexp.AtlasExportResult; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.typedef.AtlasTypesDef; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.io.InputStream; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - -public class ZipDirectTest { - @Test(expectedExceptions = AtlasBaseException.class) - public void loadFileEmpty() throws IOException, AtlasBaseException { - InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-1.zip"); - new ZipSourceDirect(inputStream, 1); - } - - @Test - public void loadFile() throws IOException, AtlasBaseException { - final int EXPECTED_ENTITY_COUNT = 3434; - - InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip"); - ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, EXPECTED_ENTITY_COUNT); - - assertNotNull(zipSourceDirect); - assertNotNull(zipSourceDirect.getTypesDef()); - assertTrue(zipSourceDirect.getTypesDef().getEntityDefs().size() > 0); - assertNotNull(zipSourceDirect.getExportResult()); - - int count = 0; - AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo; - while((entityWithExtInfo = zipSourceDirect.getNextEntityWithExtInfo()) != null) { - assertNotNull(entityWithExtInfo); - count++; - } - - assertEquals(count, EXPECTED_ENTITY_COUNT); - } -} diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java index 27a6668..0ffc3d5 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java @@ -317,9 +317,7 @@ public class ZipFileResourceTestUtils { } public static AtlasImportRequest getDefaultImportRequest() { - AtlasImportRequest atlasImportRequest = new AtlasImportRequest(); - atlasImportRequest.setOption("migration", "true"); - return atlasImportRequest; + return new AtlasImportRequest(); } @@ -338,8 +336,7 @@ public class ZipFileResourceTestUtils { final String hostName = "localhost"; final String userName = "admin"; - AtlasImportRequest request = getDefaultImportRequest(); - AtlasImportResult result = runImportWithParameters(importService, request, inputStream); + AtlasImportResult result = importService.run(inputStream, userName, hostName, requestingIP); assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS); return result; } diff --git a/repository/src/test/resources/zip-direct-1.zip b/repository/src/test/resources/zip-direct-1.zip deleted file mode 100644 index 15cb0ec..0000000 Binary files a/repository/src/test/resources/zip-direct-1.zip and /dev/null differ diff --git a/repository/src/test/resources/zip-direct-2.zip b/repository/src/test/resources/zip-direct-2.zip deleted file mode 100644 index e7b8617..0000000 Binary files a/repository/src/test/resources/zip-direct-2.zip and /dev/null differ