This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new efc4beb ATLAS-3132: performance improvements in UniqueAttributesPatch efc4beb is described below commit efc4bebc1623c9d00fe4fdf0df424918654a73df Author: Ashutosh Mestry <ames...@hortonworks.com> AuthorDate: Sun Apr 14 18:31:28 2019 -0700 ATLAS-3132: performance improvements in UniqueAttributesPatch Signed-off-by: Madhan Neethiraj <mad...@apache.org> --- .../java/org/apache/atlas/pc/WorkItemConsumer.java | 19 +- .../java/org/apache/atlas/pc/WorkItemManager.java | 26 +- .../apache/atlas/kafka/EmbeddedKafkaServer.java | 2 +- .../org/apache/atlas/kafka/KafkaNotification.java | 2 +- .../repository/patches/AtlasJavaPatchHandler.java | 138 -------- .../repository/patches/AtlasPatchHandler.java | 68 ++++ .../repository/patches/AtlasPatchManager.java | 72 +++++ .../repository/patches/AtlasPatchRegistry.java | 220 +++++++++++++ .../repository/patches/AtlasPatchService.java | 54 ++++ .../atlas/repository/patches/PatchContext.java | 34 +- .../repository/patches/UniqueAttributePatch.java | 356 +++++++++++++++++++++ .../patches/UniqueAttributePatchHandler.java | 164 ---------- .../bootstrap/AtlasTypeDefStoreInitializer.java | 134 ++------ .../store/graph/v2/AtlasGraphUtilsV2.java | 133 ++------ .../store/graph/v2/EntityGraphRetriever.java | 2 +- .../atlas/patches/AtlasPatchRegistryTest.java | 78 +++++ .../notification/NotificationHookConsumer.java | 2 +- .../apache/atlas/web/resources/AdminResource.java | 15 +- .../atlas/web/resources/AdminResourceTest.java | 4 +- 19 files changed, 944 insertions(+), 579 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java index b7eb4d8..8351b7c 100644 --- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java +++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java @@ -30,31 +30,35 @@ import java.util.concurrent.atomic.AtomicLong; public abstract class WorkItemConsumer<T> implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(WorkItemConsumer.class); - private static final int POLLING_DURATION_SECONDS = 5; + private static final int POLLING_DURATION_SECONDS = 30; private static final int DEFAULT_COMMIT_TIME_IN_MS = 15000; private final BlockingQueue<T> queue; - private AtomicBoolean isDirty = new AtomicBoolean(false); - private AtomicLong maxCommitTimeInMs = new AtomicLong(0); - private CountDownLatch countdownLatch; - private BlockingQueue<Object> results; + private final AtomicBoolean isDirty = new AtomicBoolean(false); + private final AtomicLong maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS); + private CountDownLatch countdownLatch; + private BlockingQueue<Object> results; public WorkItemConsumer(BlockingQueue<T> queue) { - this.queue = queue; + this.queue = queue; + this.countdownLatch = null; } public void run() { try { while (!Thread.currentThread().isInterrupted()) { - T item = queue.poll(POLLING_DURATION_SECONDS, TimeUnit.SECONDS); if (item == null) { + LOG.warn("WorkItemConsumer.run(): no more items found in the queue. Will exit after committing"); + commitDirty(); + return; } isDirty.set(true); + processItem(item); } } catch (InterruptedException e) { @@ -67,6 +71,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { public long getMaxCommitTimeInMs() { long commitTime = this.maxCommitTimeInMs.get(); + return ((commitTime > DEFAULT_COMMIT_TIME_IN_MS) ? commitTime : DEFAULT_COMMIT_TIME_IN_MS); } diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java index 0e7d3f2..a7ba67c 100644 --- a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java +++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java @@ -33,20 +33,20 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { private final ExecutorService service; private final List<U> consumers = new ArrayList<>(); private CountDownLatch countdownLatch; - private BlockingQueue<Object> resultsQueue; + private BlockingQueue<Object> resultsQueue; public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) { this.numWorkers = numWorkers; - workQueue = new LinkedBlockingQueue<>(batchSize * numWorkers); - service = Executors.newFixedThreadPool(numWorkers, - new ThreadFactoryBuilder().setNameFormat(namePrefix + "-%d").build()); + this.workQueue = new LinkedBlockingQueue<>(batchSize * numWorkers); + this.service = Executors.newFixedThreadPool(numWorkers, new ThreadFactoryBuilder().setNameFormat(namePrefix + "-%d").build()); createConsumers(builder, numWorkers, collectResults); - execute(); + + start(); } public WorkItemManager(WorkItemBuilder builder, int batchSize, int numWorkers) { - this(builder, "workItem", batchSize, numWorkers, false); + this(builder, "workItemConsumer", batchSize, numWorkers, false); } public void setResultsCollection(BlockingQueue<Object> resultsQueue) { @@ -60,6 +60,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { for (int i = 0; i < numWorkers; i++) { U c = (U) builder.build(workQueue); + consumers.add(c); if (collectResults) { @@ -68,10 +69,12 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { } } - private void execute() { + public void start() { this.countdownLatch = new CountDownLatch(numWorkers); + for (U c : consumers) { c.setCountDownLatch(countdownLatch); + service.execute(c); } } @@ -85,9 +88,14 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { } public void checkProduce(T item) { - if (countdownLatch.getCount() == 0) { - execute(); + if (countdownLatch.getCount() < numWorkers) { + LOG.info("Fewer workers detected: {}", countdownLatch.getCount()); + + drain(); + + start(); } + produce(item); } diff --git a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java index 32b597f..235b7ce 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java @@ -49,7 +49,7 @@ import java.util.concurrent.TimeUnit; @Component -@Order(2) +@Order(3) public class EmbeddedKafkaServer implements Service { public static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class); diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 1d0a273..449eb6f 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -48,7 +48,7 @@ import java.util.concurrent.Future; * Kafka specific access point to the Atlas notification framework. */ @Component -@Order(3) +@Order(4) public class KafkaNotification extends AbstractNotification implements Service { public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class); diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java deleted file mode 100644 index 9153d49..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.repository.patches; - -import org.apache.atlas.RequestContext; -import org.apache.atlas.model.patches.AtlasPatch.PatchStatus; -import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; -import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.commons.collections.MapUtils; - -import java.util.Map; - -import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN; -import static org.apache.atlas.repository.Constants.CREATED_BY_KEY; -import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY; -import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findByPatchId; -import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; -import static org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2.getCurrentUser; - -public abstract class AtlasJavaPatchHandler { - public final AtlasGraph graph; - public final AtlasTypeRegistry typeRegistry; - public final Map<String, PatchStatus> patchesRegistry; - public final EntityGraphRetriever entityRetriever; - public final GraphBackedSearchIndexer indexer; - public final PatchContext context; - public final String patchId; - public final String patchDescription; - - private PatchStatus patchStatus; - - public static final String JAVA_PATCH_TYPE = "JAVA_PATCH"; - - public AtlasJavaPatchHandler(PatchContext context, String patchId, String patchDescription) { - this.context = context; - this.graph = context.getGraph(); - this.typeRegistry = context.getTypeRegistry(); - this.indexer = context.getIndexer(); - this.patchesRegistry = context.getPatchesRegistry(); - this.patchId = patchId; - this.patchDescription = patchDescription; - this.patchStatus = getPatchStatus(patchesRegistry); - this.entityRetriever = new EntityGraphRetriever(typeRegistry); - - init(); - } - - private void init() { - PatchStatus patchStatus = getPatchStatus(); - - if (patchStatus == UNKNOWN) { - AtlasVertex patchVertex = graph.addVertex(); - - setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId); - setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, patchDescription); - setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, JAVA_PATCH_TYPE); - setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, getPatchStatus().toString()); - setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); - setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); - setEncodedProperty(patchVertex, CREATED_BY_KEY, getCurrentUser()); - setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser()); - - addToPatchesRegistry(patchId, getPatchStatus()); - } - - graph.commit(); - } - - private PatchStatus getPatchStatus(Map<String, PatchStatus> patchesRegistry) { - PatchStatus ret = UNKNOWN; - - if (MapUtils.isNotEmpty(patchesRegistry) && patchesRegistry.containsKey(patchId)) { - ret = patchesRegistry.get(patchId); - } - - return ret; - } - - public void updatePatchVertex(PatchStatus patchStatus) { - AtlasVertex patchVertex = findByPatchId(patchId); - - if (patchVertex != null) { - setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString()); - setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); - setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser()); - - addToPatchesRegistry(getPatchId(), getPatchStatus()); - } - - graph.commit(); - } - - public PatchStatus getPatchStatus() { - return patchStatus; - } - - public void addToPatchesRegistry(String patchId, PatchStatus status) { - getPatchesRegistry().put(patchId, status); - } - - public void setPatchStatus(PatchStatus patchStatus) { - this.patchStatus = patchStatus; - } - - public String getPatchId() { - return patchId; - } - - public Map<String, PatchStatus> getPatchesRegistry() { - return patchesRegistry; - } - - public abstract void applyPatch(); -} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java new file mode 100644 index 0000000..d8dcfef --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.patches; + +import org.apache.atlas.model.patches.AtlasPatch.PatchStatus; + +import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN; + +public abstract class AtlasPatchHandler { + public static final String JAVA_PATCH_TYPE = "JAVA_PATCH"; + + private final String patchId; + private final String patchDescription; + private final AtlasPatchRegistry patchRegistry; + private PatchStatus status; + + public AtlasPatchHandler(AtlasPatchRegistry patchRegistry, String patchId, String patchDescription) { + this.patchId = patchId; + this.patchDescription = patchDescription; + this.patchRegistry = patchRegistry; + this.status = getStatusFromRegistry(); + + register(); + } + + private void register() { + PatchStatus patchStatus = getStatus(); + + if (patchStatus == UNKNOWN) { + patchRegistry.register(patchId, patchDescription, JAVA_PATCH_TYPE, getStatus()); + } + } + + public PatchStatus getStatusFromRegistry() { + return patchRegistry.getStatus(patchId); + } + + public PatchStatus getStatus() { + return status; + } + + public void setStatus(PatchStatus status) { + this.status = status; + + patchRegistry.updateStatus(patchId, status); + } + + public String getPatchId() { + return patchId; + } + + public abstract void apply(); +} diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java new file mode 100644 index 0000000..629215d --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.patches; + +import org.apache.atlas.model.patches.AtlasPatch; +import org.apache.atlas.model.patches.AtlasPatch.PatchStatus; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; + +import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED; +import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.SKIPPED; + +@Component +public class AtlasPatchManager { + private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchManager.class); + + private final PatchContext context; + + @Inject + public AtlasPatchManager(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer) { + this.context = new PatchContext(atlasGraph, typeRegistry, indexer); + } + + public AtlasPatch.AtlasPatches getAllPatches() { + return context.getPatchRegistry().getAllPatches(); + } + + public void applyAll() { + final AtlasPatchHandler handlers[] = { + new UniqueAttributePatch(context) + }; + + try { + for (AtlasPatchHandler handler : handlers) { + PatchStatus patchStatus = handler.getStatusFromRegistry(); + + if (patchStatus == APPLIED || patchStatus == SKIPPED) { + LOG.info("Ignoring java handler: {}; status: {}", handler.getPatchId(), patchStatus); + } else { + LOG.info("Applying java handler: {}; status: {}", handler.getPatchId(), patchStatus); + + handler.apply(); + } + } + } + catch (Exception ex) { + LOG.error("Error applying patches.", ex); + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java new file mode 100644 index 0000000..df57959 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.patches; + +import org.apache.atlas.RequestContext; +import org.apache.atlas.model.patches.AtlasPatch; +import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches; +import org.apache.atlas.model.patches.AtlasPatch.PatchStatus; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasIndexQuery; +import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED; +import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN; +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance; +import static org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer.TYPEDEF_PATCH_TYPE; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getEncodedProperty; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIndexSearchPrefix; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; +import static org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2.getCurrentUser; + +public class AtlasPatchRegistry { + private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchRegistry.class); + + private final Map<String, PatchStatus> patchNameStatusMap; + private final AtlasGraph graph; + + public AtlasPatchRegistry(AtlasGraph graph) { + this.graph = graph; + this.patchNameStatusMap = getPatchNameStatusForAllRegistered(graph); + } + + public boolean isApplicable(String incomingId, String patchFile, int index) { + String patchId = getId(incomingId, patchFile, index); + + if (MapUtils.isEmpty(patchNameStatusMap) || !patchNameStatusMap.containsKey(patchId)) { + return true; + } + + PatchStatus status = patchNameStatusMap.get(patchId); + + if (status == FAILED || status == UNKNOWN) { + return true; + } + + return false; + } + + public PatchStatus getStatus(String id) { + return patchNameStatusMap.get(id); + } + + public void register(String patchId, String description, String action, PatchStatus patchStatus) { + createOrUpdatePatchVertex(graph, patchId, description, action, patchStatus); + } + + public void updateStatus(String patchId, PatchStatus patchStatus) { + AtlasVertex patchVertex = findByPatchId(patchId); + + if (patchVertex == null) { + return; + } + + setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString()); + setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); + setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser()); + setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString()); + + patchNameStatusMap.put(patchId, patchStatus); + + graph.commit(); + } + + private static String getId(String incomingId, String patchFile, int index) { + String patchId = incomingId; + + if (StringUtils.isEmpty(patchId)) { + return String.format("%s_%s", patchFile, index); + } + + return patchId; + } + + public AtlasPatches getAllPatches() { + return getAllPatches(graph); + } + + private void createOrUpdatePatchVertex(AtlasGraph graph, String patchId, + String description, String action, PatchStatus patchStatus) { + boolean isPatchRegistered = MapUtils.isNotEmpty(patchNameStatusMap) && patchNameStatusMap.containsKey(patchId); + AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex(); + + setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId); + setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, description); + setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, TYPEDEF_PATCH_TYPE); + setEncodedProperty(patchVertex, PATCH_ACTION_PROPERTY_KEY, action); + setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString()); + setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); + setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); + setEncodedProperty(patchVertex, CREATED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser()); + setEncodedProperty(patchVertex, MODIFIED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser()); + + graph.commit(); + } + + private static Map<String, PatchStatus> getPatchNameStatusForAllRegistered(AtlasGraph graph) { + Map<String, PatchStatus> ret = new HashMap<>(); + AtlasPatches patches = getAllPatches(graph); + + for (AtlasPatch patch : patches.getPatches()) { + String patchId = patch.getId(); + PatchStatus patchStatus = patch.getStatus(); + + if (patchId != null && patchStatus != null) { + ret.put(patchId, patchStatus); + } + } + + return ret; + } + + private static AtlasPatches getAllPatches(AtlasGraph graph) { + List<AtlasPatch> ret = new ArrayList<>(); + String idxQueryString = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (*)"; + AtlasIndexQuery idxQuery = graph.indexQuery(VERTEX_INDEX, idxQueryString); + + try { + Iterator<Result<Object, Object>> results = idxQuery.vertices(); + + while (results != null && results.hasNext()) { + AtlasVertex patchVertex = results.next().getVertex(); + AtlasPatch patch = toAtlasPatch(patchVertex); + + ret.add(patch); + } + + if (CollectionUtils.isNotEmpty(ret)) { + Collections.sort(ret, Comparator.comparing(AtlasPatch::getId)); + } + } catch (Throwable t) { + LOG.warn("getAllPatches(): Returned empty result!"); + } + + graph.commit(); + + return new AtlasPatches(ret); + } + + + private static AtlasPatch toAtlasPatch(AtlasVertex vertex) { + AtlasPatch ret = new AtlasPatch(); + + ret.setId(getEncodedProperty(vertex, PATCH_ID_PROPERTY_KEY, String.class)); + ret.setDescription(getEncodedProperty(vertex, PATCH_DESCRIPTION_PROPERTY_KEY, String.class)); + ret.setType(getEncodedProperty(vertex, PATCH_TYPE_PROPERTY_KEY, String.class)); + ret.setAction(getEncodedProperty(vertex, PATCH_ACTION_PROPERTY_KEY, String.class)); + ret.setCreatedBy(getEncodedProperty(vertex, CREATED_BY_KEY, String.class)); + ret.setUpdatedBy(getEncodedProperty(vertex, MODIFIED_BY_KEY, String.class)); + ret.setCreatedTime(getEncodedProperty(vertex, TIMESTAMP_PROPERTY_KEY, Long.class)); + ret.setUpdatedTime(getEncodedProperty(vertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class)); + ret.setStatus(getPatchStatus(vertex)); + + return ret; + } + + private static AtlasVertex findByPatchId(String patchId) { + AtlasVertex ret = null; + String indexQuery = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (" + patchId + ")"; + Iterator<Result<Object, Object>> results = getGraphInstance().indexQuery(VERTEX_INDEX, indexQuery).vertices(); + + while (results != null && results.hasNext()) { + ret = results.next().getVertex(); + + if (ret != null) { + break; + } + } + + return ret; + } + + private static PatchStatus getPatchStatus(AtlasVertex vertex) { + String patchStatus = AtlasGraphUtilsV2.getEncodedProperty(vertex, PATCH_STATE_PROPERTY_KEY, String.class); + + return patchStatus != null ? PatchStatus.valueOf(patchStatus) : UNKNOWN; + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchService.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchService.java new file mode 100644 index 0000000..fc21285 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchService.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.patches; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.service.Service; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; + +@Component +@Order(2) +public class AtlasPatchService implements Service { + private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchService.class); + + private final AtlasPatchManager patchManager; + + + @Inject + public AtlasPatchService(AtlasPatchManager patchManager) { + this.patchManager = patchManager; + } + + @Override + public void start() throws AtlasException { + LOG.info("PatchService: Started."); + + patchManager.applyAll(); + } + + @Override + public void stop() throws AtlasException { + LOG.info("PatchService: Stopped."); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java index a60422b..3508c74 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java @@ -6,39 +6,33 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.atlas.repository.patches; -import org.apache.atlas.model.patches.AtlasPatch.PatchStatus; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.type.AtlasTypeRegistry; -import java.util.Map; - -/** - * Patch context for typedef and java patches. - */ public class PatchContext { private final AtlasGraph graph; private final AtlasTypeRegistry typeRegistry; private final GraphBackedSearchIndexer indexer; - private final Map<String, PatchStatus> patchesRegistry; - - public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer, - Map<String, PatchStatus> patchesRegistry) { - this.graph = graph; - this.typeRegistry = typeRegistry; - this.indexer = indexer; - this.patchesRegistry = patchesRegistry; + private final AtlasPatchRegistry patchRegistry; + + public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer) { + this.graph = graph; + this.typeRegistry = typeRegistry; + this.indexer = indexer; + this.patchRegistry = new AtlasPatchRegistry(this.graph); } public AtlasGraph getGraph() { @@ -53,7 +47,7 @@ public class PatchContext { return indexer; } - public Map<String, PatchStatus> getPatchesRegistry() { - return patchesRegistry; + public AtlasPatchRegistry getPatchRegistry() { + return patchRegistry; } -} \ No newline at end of file +} diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java new file mode 100644 index 0000000..c5af500 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.patches; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.pc.WorkItemBuilder; +import org.apache.atlas.pc.WorkItemConsumer; +import org.apache.atlas.pc.WorkItemManager; +import org.apache.atlas.repository.IndexException; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind; +import org.apache.atlas.repository.graphdb.*; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex; + +public class UniqueAttributePatch extends AtlasPatchHandler { + private static final Logger LOG = LoggerFactory.getLogger(UniqueAttributePatch.class); + + private static final String PATCH_ID = "JAVA_PATCH_0000_001"; + private static final String PATCH_DESCRIPTION = "Add __u_ property for each unique attribute of active entities"; + + private final AtlasGraph graph; + private final GraphBackedSearchIndexer indexer; + private final AtlasTypeRegistry typeRegistry; + + public UniqueAttributePatch(PatchContext context) { + super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION); + + this.graph = context.getGraph(); + this.indexer = context.getIndexer(); + this.typeRegistry = context.getTypeRegistry(); + } + + @Override + public void apply() { + TypeNameAttributeCache typeNameAttributeCache = registerUniqueAttributeForTypes(); + UniqueAttributePatchProcessor patchProcessor = new UniqueAttributePatchProcessor(this.graph); + + patchProcessor.apply(typeNameAttributeCache.getAll()); + + setStatus(APPLIED); + + LOG.info("UniqueAttributePatch: {}; status: {}", getPatchId(), getStatus()); + } + + private TypeNameAttributeCache registerUniqueAttributeForTypes() { + TypeNameAttributeCache ret = new TypeNameAttributeCache(); + + for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) { + createIndexForUniqueAttributes(entityType.getTypeName(), entityType.getUniqAttributes().values()); + + ret.add(entityType, entityType.getUniqAttributes().values()); + } + + return ret; + } + + private boolean createIndexForUniqueAttributes(String typeName, Collection<AtlasAttribute> attributes) { + try { + AtlasGraphManagement management = graph.getManagementSystem(); + + for (AtlasAttribute attribute : attributes) { + String uniquePropertyName = attribute.getVertexUniquePropertyName(); + + if (management.getPropertyKey(uniquePropertyName) != null) { + continue; + } + + AtlasAttributeDef attributeDef = attribute.getAttributeDef(); + boolean isIndexable = attributeDef.getIsIndexable(); + String attribTypeName = attributeDef.getTypeName(); + Class propertyClass = indexer.getPrimitiveClass(attribTypeName); + AtlasCardinality cardinality = indexer.toAtlasCardinality(attributeDef.getCardinality()); + + indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.PER_TYPE_UNIQUE, propertyClass, cardinality, isIndexable, true); + } + + indexer.commit(management); + graph.commit(); + + LOG.info("Unique attributes: type: {}: Registered!", typeName); + + return true; + } catch (IndexException e) { + LOG.error("Error creating index: type: {}", typeName, e); + return false; + } + } + + public static class UniqueAttributePatchProcessor { + private static final String NUM_WORKERS_PROPERTY = "atlas.patch.unique_attribute_patch.numWorkers"; + private static final String BATCH_SIZE_PROPERTY = "atlas.patch.unique_attribute_patch.batchSize"; + private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS"; + private static final int NUM_WORKERS; + private static final int BATCH_SIZE; + + private final AtlasGraph graph; + + static { + int numWorkers = 3; + int batchSize = 300; + + try { + numWorkers = ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY, getDefaultNumWorkers()); + batchSize = ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300); + + LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize); + } catch (Exception e) { + LOG.error("Error retrieving configuration.", e); + } + + NUM_WORKERS = numWorkers; + BATCH_SIZE = batchSize; + } + + public UniqueAttributePatchProcessor(AtlasGraph graph) { + this.graph = graph; + } + + public void apply(final Map<String, Collection<AtlasAttribute>> typeUniqueAttributeCache) { + WorkItemManager manager = null; + + try { + Iterator<AtlasVertex> iterator = graph.getVertices().iterator(); + + if (iterator.hasNext()) { + manager = new WorkItemManager<>(new ConsumerBuilder(graph), BATCH_SIZE, NUM_WORKERS); + + LOG.info("Processing: Started..."); + + while (iterator.hasNext()) { + AtlasVertex vertex = iterator.next(); + + if (!AtlasGraphUtilsV2.isEntityVertex(vertex)) { + continue; + } + + String typeName = AtlasGraphUtilsV2.getTypeName(vertex); + + submitForProcessing(typeName, vertex, manager, typeUniqueAttributeCache.get(typeName)); + } + + manager.drain(); + } + } catch (Exception ex) { + LOG.error("Error: ", ex); + } finally { + if (manager != null) { + try { + manager.shutdown(); + } catch (InterruptedException e) { + LOG.error("Interrupted", e); + } + } + } + } + + private static int getDefaultNumWorkers() throws AtlasException { + return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3; + } + + private void submitForProcessing(String typeName, AtlasVertex vertex, WorkItemManager manager, Collection<AtlasAttribute> uniqAttributes) { + WorkItem workItem = new WorkItem(typeName, (Long) vertex.getId(), uniqAttributes); + + manager.checkProduce(workItem); + } + + + private static class WorkItem { + private final String typeName; + private final long id; + private final Collection<AtlasAttribute> uniqueAttributeValues; + + public WorkItem(String typeName, long id, Collection<AtlasAttribute> uniqueAttributeValues) { + this.typeName = typeName; + this.id = id; + this.uniqueAttributeValues = uniqueAttributeValues; + } + } + + private static class Consumer extends WorkItemConsumer<WorkItem> { + private static int MAX_COMMIT_RETRY_COUNT = 3; + + private final AtlasGraph graph; + private final AtomicLong counter; + + public Consumer(AtlasGraph graph, BlockingQueue<WorkItem> queue) { + super(queue); + + this.graph = graph; + this.counter = new AtomicLong(0); + } + + @Override + protected void doCommit() { + if (counter.get() % BATCH_SIZE == 0) { + LOG.info("Processed: {}", counter.get()); + + attemptCommit(); + } + } + + @Override + protected void commitDirty() { + attemptCommit(); + + LOG.info("Total: Commit: {}", counter.get()); + + super.commitDirty(); + } + + private void attemptCommit() { + for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) { + try { + graph.commit(); + + break; + } catch(Exception ex) { + LOG.error("Commit exception: ", retryCount, ex); + + try { + Thread.currentThread().sleep(300 * retryCount); + } catch (InterruptedException e) { + LOG.error("Commit exception: Pause: Interrputed!", e); + } + } + } + } + + @Override + protected void processItem(WorkItem wi) { + counter.incrementAndGet(); + + String typeName = wi.typeName; + + if(wi.uniqueAttributeValues == null) { + return; + } + + AtlasVertex vertex = graph.getVertex(Long.toString(wi.id)); + + if (vertex == null) { + LOG.warn("processItem: AtlasVertex with id: ({}): not found!", wi.id); + + return; + } + + if (AtlasGraphUtilsV2.isTypeVertex(vertex)) { + return; + } + + AtlasEntity.Status status = AtlasGraphUtilsV2.getState(vertex); + + if (status != AtlasEntity.Status.ACTIVE) { + return; + } + + try { + LOG.debug("processItem: {}", wi.id); + + for (AtlasAttribute attribute : wi.uniqueAttributeValues) { + String uniquePropertyKey = attribute.getVertexUniquePropertyName(); + Collection<? extends String> propertyKeys = vertex.getPropertyKeys(); + Object uniqAttrValue = null; + + if (propertyKeys != null && propertyKeys.contains(uniquePropertyKey)) { + LOG.debug("processItem: {}: Skipped!", wi.id); + } else { + try { + String propertyKey = attribute.getVertexPropertyName(); + + uniqAttrValue = EntityGraphRetriever.mapVertexToPrimitive(vertex, propertyKey, attribute.getAttributeDef()); + + AtlasGraphUtilsV2.setEncodedProperty(vertex, uniquePropertyKey, uniqAttrValue); + } catch(AtlasSchemaViolationException ex) { + LOG.error("Duplicates detected: {}:{}:{}", typeName, uniqAttrValue, getIdFromVertex(vertex)); + } + } + + commit(); + } + + LOG.debug("processItem: {}: Done!", wi.id); + } catch (Exception ex) { + LOG.error("Error found: {}: {}", typeName, wi.id, ex); + } + } + } + + private class ConsumerBuilder implements WorkItemBuilder<Consumer, WorkItem> { + private final AtlasGraph graph; + + public ConsumerBuilder(AtlasGraph graph) { + this.graph = graph; + } + + @Override + public Consumer build(BlockingQueue<WorkItem> queue) { + return new Consumer(graph, queue); + } + } + } + + public static class TypeNameAttributeCache { + private Map<String, Collection<AtlasAttribute>> typeUniqueAttributeCache = new HashMap<>(); + + public void add(AtlasEntityType entityType, Collection<AtlasAttribute> values) { + typeUniqueAttributeCache.put(entityType.getTypeName(), values); + } + + public Collection<AtlasAttribute> get(String typeName) { + return typeUniqueAttributeCache.get(typeName); + } + + public boolean has(String typeName) { + return typeUniqueAttributeCache.containsKey(typeName); + } + + public Map<String, Collection<AtlasAttribute>> getAll() { + return typeUniqueAttributeCache; + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java deleted file mode 100644 index f2238f1..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.repository.patches; - -import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; -import org.apache.atlas.repository.IndexException; -import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind; -import org.apache.atlas.repository.graphdb.AtlasCardinality; -import org.apache.atlas.repository.graphdb.AtlasGraphManagement; -import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result; -import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.type.AtlasEntityType; -import org.apache.atlas.type.AtlasStructType.AtlasAttribute; -import org.apache.commons.collections.MapUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; - -import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED; -import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED; -import static org.apache.atlas.repository.graph.GraphHelper.getGuid; -import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findActiveEntityVerticesByType; -import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; - -public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler { - private static final String PATCH_ID = "JAVA_PATCH_0000_001"; - private static final String PATCH_DESCRIPTION = "Add new vertex property for each unique attribute of active entities"; - private static final Logger LOG = LoggerFactory.getLogger(UniqueAttributePatchHandler.class); - - public UniqueAttributePatchHandler(PatchContext context) { - super(context, PATCH_ID, PATCH_DESCRIPTION); - } - - @Override - public void applyPatch() { - Collection<AtlasEntityType> allEntityTypes = typeRegistry.getAllEntityTypes(); - boolean patchFailed = false; - - for (AtlasEntityType entityType : allEntityTypes) { - String typeName = entityType.getTypeName(); - Map<String, AtlasAttribute> uniqAttributes = entityType.getUniqAttributes(); - int patchAppliedCount = 0; - - LOG.info("Applying java patch: {} for type: {}", getPatchId(), typeName); - - if (MapUtils.isNotEmpty(uniqAttributes)) { - Collection<AtlasAttribute> attributes = uniqAttributes.values(); - - try { - // register unique attribute property keys in graph - registerUniqueAttrPropertyKeys(attributes); - - Iterator<Result<Object, Object>> iterator = findActiveEntityVerticesByType(typeName); - - int entityCount = 0; - - while (iterator != null && iterator.hasNext()) { - AtlasVertex entityVertex = iterator.next().getVertex(); - boolean patchApplied = false; - - entityCount++; - - for (AtlasAttribute attribute : attributes) { - String uniquePropertyKey = attribute.getVertexUniquePropertyName(); - Collection<? extends String> propertyKeys = entityVertex.getPropertyKeys(); - - if (!propertyKeys.contains(uniquePropertyKey)) { - String propertyKey = attribute.getVertexPropertyName(); - AtlasAttributeDef attributeDef = attribute.getAttributeDef(); - Object uniqAttrValue = entityRetriever.mapVertexToPrimitive(entityVertex, propertyKey, attributeDef); - - // add the unique attribute property to vertex - setEncodedProperty(entityVertex, uniquePropertyKey, uniqAttrValue); - - try { - graph.commit(); - - patchApplied = true; - - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Added unique attribute property: {} to entity: {} ({})", - PATCH_ID, uniquePropertyKey, getGuid(entityVertex), typeName); - } - } catch (Throwable t) { - LOG.warn("Java patch ({}): failed to update entity guid: {}; typeName: {}; attrName: {}; attrValue: {}", - getPatchId(), getGuid(entityVertex), typeName, attribute.getName(), uniqAttrValue); - - continue; - } - } - } - - if (patchApplied) { - patchAppliedCount++; - } - - if (entityCount % 1000 == 0) { - LOG.info("Java patch: {} : applied {}; processed {} {} entities.", getPatchId(), patchAppliedCount, entityCount, typeName); - } - } - } catch (IndexException e) { - LOG.error("Java patch: {} failed! error: {}", getPatchId(), e); - - patchFailed = true; - - break; - } - } - - LOG.info("Applied java patch ({}) for type: {}; Total processed: {}", getPatchId(), typeName, patchAppliedCount); - } - - if (patchFailed) { - setPatchStatus(FAILED); - } else { - setPatchStatus(APPLIED); - } - - LOG.info("Applied java patch: {}; status: {}", getPatchId(), getPatchStatus()); - - updatePatchVertex(getPatchStatus()); - } - - private void registerUniqueAttrPropertyKeys(Collection<AtlasAttribute> attributes) throws IndexException { - AtlasGraphManagement management = graph.getManagementSystem(); - - for (AtlasAttribute attribute : attributes) { - String uniquePropertyName = attribute.getVertexUniquePropertyName(); - boolean uniquePropertyNameExists = management.getPropertyKey(uniquePropertyName) != null; - - if (!uniquePropertyNameExists) { - AtlasAttributeDef attributeDef = attribute.getAttributeDef(); - boolean isIndexable = attributeDef.getIsIndexable(); - String attribTypeName = attributeDef.getTypeName(); - Class propertyClass = indexer.getPrimitiveClass(attribTypeName); - AtlasCardinality cardinality = indexer.toAtlasCardinality(attributeDef.getCardinality()); - - indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.NONE, propertyClass, cardinality, isIndexable, true); - } - } - - //Commit indexes - indexer.commit(management); - graph.commit(); - } -} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java index 78f3faf..662edc9 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java @@ -21,7 +21,6 @@ package org.apache.atlas.repository.store.bootstrap; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.databind.annotation.JsonSerialize; - import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; @@ -41,14 +40,8 @@ import org.apache.atlas.model.typedef.AtlasRelationshipEndDef; import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasTypesDef; -import org.apache.atlas.repository.graph.AtlasGraphProvider; -import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.repository.patches.AtlasJavaPatchHandler; -import org.apache.atlas.repository.patches.PatchContext; -import org.apache.atlas.repository.patches.UniqueAttributePatchHandler; -import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2; +import org.apache.atlas.repository.patches.AtlasPatchRegistry; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; @@ -83,19 +76,6 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED; import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED; import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.SKIPPED; -import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN; -import static org.apache.atlas.repository.Constants.CREATED_BY_KEY; -import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_ACTION_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY; -import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findByPatchId; -import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getPatchesRegistry; -import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; /** * Class that handles initial loading of models and patches into typedef store @@ -111,18 +91,16 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { private final AtlasTypeDefStore typeDefStore; private final AtlasTypeRegistry typeRegistry; - private final AtlasGraph graph; private final Configuration conf; - private final GraphBackedSearchIndexer indexer; + private final AtlasPatchRegistry patchRegistry; @Inject public AtlasTypeDefStoreInitializer(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, - AtlasGraph graph, Configuration conf, GraphBackedSearchIndexer indexer) { - this.typeDefStore = typeDefStore; - this.typeRegistry = typeRegistry; - this.graph = graph; - this.conf = conf; - this.indexer = indexer; + AtlasGraph graph, Configuration conf) { + this.typeDefStore = typeDefStore; + this.typeRegistry = typeRegistry; + this.conf = conf; + this.patchRegistry = new AtlasPatchRegistry(graph); } @PostConstruct @@ -157,7 +135,6 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { String atlasHomeDir = System.getProperty("atlas.home"); String modelsDirName = (StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir) + File.separator + "models"; - PatchContext patchContext = initPatchContext(); if (modelsDirName == null || modelsDirName.length() == 0) { LOG.info("Types directory {} does not exist or not readable or has no typedef files", modelsDirName); @@ -176,49 +153,23 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { continue; } else if (!folder.getName().equals(PATCHES_FOLDER_NAME)){ // load the models alphabetically in the subfolders apart from patches - loadModelsInFolder(folder, patchContext); + loadModelsInFolder(folder); } } } // load any files in the top models folder and any associated patches. - loadModelsInFolder(topModeltypesDir, patchContext); + loadModelsInFolder(topModeltypesDir); } - // apply java patches - applyJavaPatches(patchContext); - LOG.info("<== AtlasTypeDefStoreInitializer.loadBootstrapTypeDefs()"); } - private void applyJavaPatches(PatchContext context) { - // register java patches - AtlasJavaPatchHandler[] patches = new AtlasJavaPatchHandler[] { new UniqueAttributePatchHandler(context) }; - - // apply java patches - for (AtlasJavaPatchHandler patch : patches) { - PatchStatus patchStatus = patch.getPatchStatus(); - - if (patchStatus == APPLIED || patchStatus == SKIPPED) { - LOG.info("Ignoring java patch: {}; status: {}", patch.getPatchId(), patchStatus); - } else { - LOG.info("Applying java patch: {}; status: {}", patch.getPatchId(), patchStatus); - - patch.applyPatch(); - } - } - } - - public PatchContext initPatchContext() { - return new PatchContext(graph, typeRegistry, indexer, getPatchesRegistry()); - } - /** * Load all the model files in the supplied folder followed by the contents of the patches folder. * @param typesDir - * @param context */ - private void loadModelsInFolder(File typesDir, PatchContext context) { + private void loadModelsInFolder(File typesDir) { LOG.info("==> AtlasTypeDefStoreInitializer({})", typesDir); String typesDirName = typesDir.getName(); @@ -260,7 +211,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { } } - applyTypePatches(typesDir.getPath(), context); + applyTypePatches(typesDir.getPath()); } LOG.info("<== AtlasTypeDefStoreInitializer({})", typesDir); } @@ -458,11 +409,10 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { return ret; } - private void applyTypePatches(String typesDirName, PatchContext context) { - String typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME; - File typePatchesDir = new File(typePatchesDirName); - File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null; - Map<String, PatchStatus> patchesRegistry = context.getPatchesRegistry(); + private void applyTypePatches(String typesDirName) { + String typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME; + File typePatchesDir = new File(typePatchesDirName); + File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null; if (typePatchFiles == null || typePatchFiles.length == 0) { LOG.info("Type patches directory {} does not exist or not readable or has no patches", typePatchesDirName); @@ -505,7 +455,6 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { } int patchIndex = 0; - for (TypeDefPatch patch : patches.getPatches()) { PatchHandler patchHandler = patchHandlerRegistry.get(patch.getAction()); @@ -514,15 +463,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { continue; } - String patchId = patch.getId(); - - if (StringUtils.isEmpty(patchId)) { - patchId = typePatchFile.getName() + "_" + patchIndex; - - patch.setId(patchId); - } - - if (isPatchApplicable(patchId, patchesRegistry)) { + if (patchRegistry.isApplicable(patch.getId(), patchFile, patchIndex++)) { PatchStatus status; try { @@ -531,17 +472,14 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { status = FAILED; LOG.error("Failed to apply {} (status: {}; action: {}) in file: {}. Ignored.", - patchId, status.toString(), patch.getAction(), patchFile); + patch.getId(), status.toString(), patch.getAction(), patchFile); } - createOrUpdatePatchVertex(patch, status, patchesRegistry); - - LOG.info("{} (status: {}; action: {}) in file: {}", patchId, status.toString(), patch.getAction(), patchFile); + patchRegistry.register(patch.id, patch.description, patch.action, status); + LOG.info("{} (status: {}; action: {}) in file: {}", patch.getId(), status.toString(), patch.getAction(), patchFile); } else { - LOG.info("{} in file: {} already {}. Ignoring.", patchId, patchFile, patchesRegistry.get(patchId).toString()); + LOG.info("{} in file: {} already {}. Ignoring.", patch.getId(), patchFile, patchRegistry.getStatus(patch.getId()).toString()); } - - patchIndex++; } } catch (Throwable t) { LOG.error("Failed to apply patches in file {}. Ignored", patchFile, t); @@ -551,38 +489,6 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { } } - private boolean isPatchApplicable(String patchId, Map<String, PatchStatus> patchesRegistry) { - if (MapUtils.isEmpty(patchesRegistry) || !patchesRegistry.containsKey(patchId)) { - return true; - } - - PatchStatus status = patchesRegistry.get(patchId); - - if (status == FAILED || status == UNKNOWN) { - return true; - } - - return false; - } - - private void createOrUpdatePatchVertex(TypeDefPatch patch, PatchStatus patchStatus, Map<String, PatchStatus> patchesRegistry) { - String patchId = patch.getId(); - boolean isPatchRegistered = MapUtils.isNotEmpty(patchesRegistry) && patchesRegistry.containsKey(patchId); - AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex(); - - setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId); - setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, patch.getDescription()); - setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, TYPEDEF_PATCH_TYPE); - setEncodedProperty(patchVertex, PATCH_ACTION_PROPERTY_KEY, patch.getAction()); - setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString()); - setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); - setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); - setEncodedProperty(patchVertex, CREATED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser()); - setEncodedProperty(patchVertex, MODIFIED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser()); - - AtlasGraphProvider.getGraphInstance().commit(); - } - /** * typedef patch details */ diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java index 80141b4..70b01a5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java @@ -28,15 +28,12 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.Status; -import org.apache.atlas.model.patches.AtlasPatch; -import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches; -import org.apache.atlas.model.patches.AtlasPatch.PatchStatus; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasElement; +import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraphQuery; import org.apache.atlas.repository.graphdb.AtlasIndexQuery; import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result; @@ -46,7 +43,6 @@ import org.apache.atlas.type.AtlasStructType; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasType; import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; @@ -57,30 +53,20 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN; -import static org.apache.atlas.repository.Constants.CREATED_BY_KEY; import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT; import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY; -import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_ACTION_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.VERTEX_INDEX; +import static org.apache.atlas.repository.Constants.TYPENAME_PROPERTY_KEY; import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance; -import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.*; +import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.ASC; +import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.DESC; /** * Utility methods for Graph. @@ -163,6 +149,10 @@ public class AtlasGraphUtilsV2 { return StringUtils.isNotEmpty(getIdFromVertex(vertex)) && StringUtils.isNotEmpty(getTypeName(vertex)); } + public static boolean isTypeVertex(AtlasVertex vertex) { + return vertex.getProperty(TYPENAME_PROPERTY_KEY, String.class) != null; + } + public static boolean isReference(AtlasType type) { return isReference(type.getTypeCategory()); } @@ -228,11 +218,12 @@ public class AtlasGraphUtilsV2 { Object existingValue = element.getProperty(propertyName, Object.class); - if (value == null) { + if (value == null || (value instanceof Collection && ((Collection)value).isEmpty())) { if (existingValue != null) { if (LOG.isDebugEnabled()) { LOG.debug("Removing property {} from {}", propertyName, toString(element)); } + element.removeProperty(propertyName); } } else { @@ -241,7 +232,7 @@ public class AtlasGraphUtilsV2 { LOG.debug("Setting property {} in {}", propertyName, toString(element)); } - if (value instanceof Date) { + if ( value instanceof Date) { Long encodedValue = ((Date) value).getTime(); element.setProperty(propertyName, encodedValue); } else { @@ -341,22 +332,6 @@ public class AtlasGraphUtilsV2 { return vertex; } - public static AtlasVertex findByPatchId(String patchId) { - AtlasVertex ret = null; - String indexQuery = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : ("+ patchId +")"; - Iterator<Result<Object, Object>> results = getGraphInstance().indexQuery(VERTEX_INDEX, indexQuery).vertices(); - - while (results != null && results.hasNext()) { - ret = results.next().getVertex(); - - if (ret != null) { - break; - } - } - - return ret; - } - public static AtlasVertex findByGuid(String guid) { AtlasVertex ret = GraphTransactionInterceptor.getVertexFromCache(guid); @@ -470,80 +445,12 @@ public class AtlasGraphUtilsV2 { return vertex; } - public static Map<String, PatchStatus> getPatchesRegistry() { - Map<String, PatchStatus> ret = new HashMap<>(); - AtlasPatches patches = getAllPatches(); - - for (AtlasPatch patch : patches.getPatches()) { - String patchId = patch.getId(); - PatchStatus patchStatus = patch.getStatus(); - - if (patchId != null && patchStatus != null) { - ret.put(patchId, patchStatus); - } - } - - return ret; - } - - public static AtlasPatches getAllPatches() { - List<AtlasPatch> ret = new ArrayList<>(); - String idxQueryString = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (*)"; - AtlasIndexQuery idxQuery = AtlasGraphProvider.getGraphInstance().indexQuery(VERTEX_INDEX, idxQueryString); - Iterator<Result<Object, Object>> results; - - try { - results = idxQuery.vertices(); - - while (results != null && results.hasNext()) { - AtlasVertex patchVertex = results.next().getVertex(); - AtlasPatch patch = toAtlasPatch(patchVertex); - - ret.add(patch); - } - - // Sort the patches based on patch id - if (CollectionUtils.isNotEmpty(ret)) { - Collections.sort(ret, (p1, p2) -> p1.getId().compareTo(p2.getId())); - } - } catch (Throwable t) { - // first time idx query is fired, returns no field exists in solr exception - LOG.warn("getPatches() returned empty result!"); - } - - getGraphInstance().commit(); - - return new AtlasPatches(ret); - } - public int getOpenTransactions() { Set openTransactions = getGraphInstance().getOpenTransactions(); return (openTransactions != null) ? openTransactions.size() : 0; } - private static AtlasPatch toAtlasPatch(AtlasVertex vertex) { - AtlasPatch ret = new AtlasPatch(); - - ret.setId(getEncodedProperty(vertex, PATCH_ID_PROPERTY_KEY, String.class)); - ret.setDescription(getEncodedProperty(vertex, PATCH_DESCRIPTION_PROPERTY_KEY, String.class)); - ret.setType(getEncodedProperty(vertex, PATCH_TYPE_PROPERTY_KEY, String.class)); - ret.setAction(getEncodedProperty(vertex, PATCH_ACTION_PROPERTY_KEY, String.class)); - ret.setCreatedBy(getEncodedProperty(vertex, CREATED_BY_KEY, String.class)); - ret.setUpdatedBy(getEncodedProperty(vertex, MODIFIED_BY_KEY, String.class)); - ret.setCreatedTime(getEncodedProperty(vertex, TIMESTAMP_PROPERTY_KEY, Long.class)); - ret.setUpdatedTime(getEncodedProperty(vertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class)); - ret.setStatus(getPatchStatus(vertex)); - - return ret; - } - - private static PatchStatus getPatchStatus(AtlasVertex vertex) { - String patchStatus = AtlasGraphUtilsV2.getEncodedProperty(vertex, PATCH_STATE_PROPERTY_KEY, String.class); - - return patchStatus != null ? PatchStatus.valueOf(patchStatus) : UNKNOWN; - } - public static List<String> findEntityGUIDsByType(String typename, SortOrder sortOrder) { AtlasGraphQuery query = getGraphInstance().query() .has(ENTITY_TYPE_PROPERTY_KEY, typename); @@ -566,20 +473,16 @@ public class AtlasGraphUtilsV2 { return ret; } - public static Iterator<Result<Object, Object>> findActiveEntityVerticesByType(String typename) { - AtlasIndexQuery indexQuery = getActiveEntityIndexQuery(typename); - - return indexQuery.vertices(); + public static Iterator<AtlasVertex> findActiveEntityVerticesByType(String typename) { + return findActiveEntityVerticesByType(getGraphInstance(), typename); } - private static AtlasIndexQuery getActiveEntityIndexQuery(String typename) { - StringBuilder sb = new StringBuilder(); - - sb.append(INDEX_SEARCH_PREFIX + "\"").append(TYPE_NAME_PROPERTY_KEY).append("\":").append(typename) - .append(" AND ") - .append(INDEX_SEARCH_PREFIX + "\"").append(STATE_PROPERTY_KEY).append("\":").append(Status.ACTIVE.name()); + public static Iterator<AtlasVertex> findActiveEntityVerticesByType(AtlasGraph graph, String typename) { + AtlasGraphQuery query = graph.query() + .has(ENTITY_TYPE_PROPERTY_KEY, typename) + .has(STATE_PROPERTY_KEY, Status.ACTIVE.name()); - return getGraphInstance().indexQuery(VERTEX_INDEX, sb.toString()); + return query.vertices().iterator(); } public static List<String> findEntityGUIDsByType(String typename) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java index 03d2c06..3e1e023 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java @@ -947,7 +947,7 @@ public class EntityGraphRetriever { return ret; } - public Object mapVertexToPrimitive(AtlasElement entityVertex, final String vertexPropertyName, AtlasAttributeDef attrDef) { + public static Object mapVertexToPrimitive(AtlasElement entityVertex, final String vertexPropertyName, AtlasAttributeDef attrDef) { Object ret = null; if (AtlasGraphUtilsV2.getEncodedProperty(entityVertex, vertexPropertyName, Object.class) == null) { diff --git a/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java b/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java new file mode 100644 index 0000000..58396e5 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.patches; + +import org.apache.atlas.TestModules; +import org.apache.atlas.model.patches.AtlasPatch; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.patches.AtlasPatchRegistry; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class AtlasPatchRegistryTest { + @Inject + private AtlasGraph graph; + + @Test + public void noPatchesRegistered() { + AtlasPatchRegistry registry = new AtlasPatchRegistry(graph); + + assertPatches(registry, 0); + } + + @Test(dependsOnMethods = "noPatchesRegistered") + public void registerPatch() { + AtlasPatchRegistry registry = new AtlasPatchRegistry(graph); + + registry.register("1", "test patch", "apply", AtlasPatch.PatchStatus.UNKNOWN); + + assertPatches(registry, 1); + } + + @Test(dependsOnMethods = "registerPatch") + public void updateStatusForPatch() { + final AtlasPatch.PatchStatus expectedStatus = AtlasPatch.PatchStatus.APPLIED; + String patchId = "1"; + + AtlasPatchRegistry registry = new AtlasPatchRegistry(graph); + + registry.updateStatus(patchId, expectedStatus); + + AtlasPatch.AtlasPatches patches = assertPatches(registry, 1); + + assertEquals(patches.getPatches().get(0).getId(), patchId); + assertEquals(patches.getPatches().get(0).getStatus(), expectedStatus); + } + + + private AtlasPatch.AtlasPatches assertPatches(AtlasPatchRegistry registry, int i) { + AtlasPatch.AtlasPatches patches = registry.getAllPatches(); + + assertNotNull(patches); + assertEquals(patches.getPatches().size(), i); + + return patches; + } +} diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index ce2d76f..fcfbd21 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -100,7 +100,7 @@ import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE * Consumer of notifications from hooks e.g., hive hook etc. */ @Component -@Order(4) +@Order(5) @DependsOn(value = {"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV2"}) public class NotificationHookConsumer implements Service, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index c5ceb9d..2716873 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -23,16 +23,16 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.authorize.AtlasAdminAccessRequest; +import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasEntityAccessRequest; import org.apache.atlas.authorize.AtlasPrivilege; -import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.discovery.SearchContext; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.impexp.AtlasServer; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.impexp.AtlasServer; import org.apache.atlas.model.impexp.ExportImportAuditEntry; import org.apache.atlas.model.impexp.MigrationStatus; import org.apache.atlas.model.instance.AtlasCheckStateRequest; @@ -46,14 +46,14 @@ import org.apache.atlas.repository.impexp.ImportService; import org.apache.atlas.repository.impexp.MigrationProgressService; import org.apache.atlas.repository.impexp.ZipSink; import org.apache.atlas.repository.impexp.ZipSource; +import org.apache.atlas.repository.patches.AtlasPatchManager; import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.services.MetricsService; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.util.SearchTracker; -import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.utils.AtlasJson; +import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter; import org.apache.atlas.web.service.ServiceState; import org.apache.atlas.web.util.Servlets; @@ -138,6 +138,7 @@ public class AdminResource { private final ExportImportAuditService exportImportAuditService; private final AtlasServerService atlasServerService; private final AtlasEntityStore entityStore; + private final AtlasPatchManager patchManager; static { try { @@ -152,7 +153,8 @@ public class AdminResource { ExportService exportService, ImportService importService, SearchTracker activeSearches, MigrationProgressService migrationProgressService, AtlasServerService serverService, - ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore) { + ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore, + AtlasPatchManager patchManager) { this.serviceState = serviceState; this.metricsService = metricsService; this.exportService = exportService; @@ -164,6 +166,7 @@ public class AdminResource { this.entityStore = entityStore; this.exportImportAuditService = exportImportAuditService; this.importExportOperationLock = new ReentrantLock(); + this.patchManager = patchManager; } /** @@ -564,7 +567,7 @@ public class AdminResource { LOG.debug("==> AdminResource.getAtlasPatches()"); } - AtlasPatches ret = AtlasGraphUtilsV2.getAllPatches(); + AtlasPatches ret = patchManager.getAllPatches(); if (LOG.isDebugEnabled()) { LOG.debug("<== AdminResource.getAtlasPatches()"); diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java index 223a90a..563a16f 100644 --- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java @@ -51,7 +51,7 @@ public class AdminResourceTest { when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); - AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null); + AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null); Response response = adminResource.getStatus(); assertEquals(response.getStatus(), HttpServletResponse.SC_OK); JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity()); @@ -62,7 +62,7 @@ public class AdminResourceTest { public void testResourceGetsValueFromServiceState() throws IOException { when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); - AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null); + AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null); Response response = adminResource.getStatus(); verify(serviceState).getState();