This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new ec001a8e9 ATLAS-4804: Set name field with qualifiedName for impala_process and impala_process_execution ec001a8e9 is described below commit ec001a8e9200cc90f361351220ad17c102c0a793 Author: pareshD <paresh.deva...@cloudera.com> AuthorDate: Fri Nov 3 12:18:05 2023 +0530 ATLAS-4804: Set name field with qualifiedName for impala_process and impala_process_execution Signed-off-by: Pinal Shah <pinal.s...@freestoneinfotech.com> --- .../java/org/apache/atlas/AtlasConfiguration.java | 1 + .../repository/patches/AtlasPatchManager.java | 1 + .../repository/patches/ProcessImpalaNamePatch.java | 121 +++++++++++++++++++++ 3 files changed, 123 insertions(+) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index df886753f..31ec605f3 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -84,6 +84,7 @@ public enum AtlasConfiguration { STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled", true), REBUILD_INDEX("atlas.rebuild.index", false), PROCESS_NAME_UPDATE_PATCH("atlas.process.name.update.patch", false), + PROCESS_IMPALA_NAME_UPDATE_PATCH("atlas.process.impala.name.update.patch", false), STORE_DIFFERENTIAL_AUDITS("atlas.entity.audit.differential", false), DSL_EXECUTOR_TRAVERSAL("atlas.dsl.executor.traversal", true), DSL_CACHED_TRANSLATOR("atlas.dsl.cached.translator", true), diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java index a9774ae58..e72a87713 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java @@ -97,6 +97,7 @@ public class AtlasPatchManager { handlers.add(new ProcessNamePatch(context)); handlers.add(new UpdateCompositeIndexStatusPatch(context)); handlers.add(new RelationshipTypeNamePatch(context)); + handlers.add(new ProcessImpalaNamePatch(context)); LOG.info("<== AtlasPatchManager.init()"); } diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/ProcessImpalaNamePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/ProcessImpalaNamePatch.java new file mode 100644 index 000000000..7eb918481 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/patches/ProcessImpalaNamePatch.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.patches; + +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.pc.WorkItemManager; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.type.AtlasEntityType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; + +import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED; + +public class ProcessImpalaNamePatch extends AtlasPatchHandler { + private static final Logger LOG = LoggerFactory.getLogger(ProcessImpalaNamePatch.class); + + private static final String PATCH_ID = "JAVA_PATCH_0000_012"; + private static final String PATCH_DESCRIPTION = "Set name to qualifiedName"; + + private final PatchContext context; + + public ProcessImpalaNamePatch(PatchContext context) { + super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION); + + this.context = context; + } + + @Override + public void apply() throws AtlasBaseException { + if (AtlasConfiguration.PROCESS_IMPALA_NAME_UPDATE_PATCH.getBoolean() == false) { + LOG.info("ProcessImpalaNamePatch: Skipped, since not enabled!"); + return; + } + ConcurrentPatchProcessor patchProcessor = new ProcessImpalaNamePatchProcessor(context); + + patchProcessor.apply(); + + setStatus(APPLIED); + + LOG.info("ProcessImpalaNamePatch.apply(): patchId={}, status={}", getPatchId(), getStatus()); + } + + public static class ProcessImpalaNamePatchProcessor extends ConcurrentPatchProcessor { + private static final String TYPE_NAME_IMPALA_PROCESS = "impala_process"; + private static final String TYPE_NAME_IMPALA_PROCESS_EXECUTION = "impala_process_execution"; + private static final String ATTR_NAME_QUALIFIED_NAME = "qualifiedName"; + private static final String ATTR_NAME_NAME = "name"; + private static final String[] processTypes = {TYPE_NAME_IMPALA_PROCESS, TYPE_NAME_IMPALA_PROCESS_EXECUTION}; + + public ProcessImpalaNamePatchProcessor(PatchContext context) { + super(context); + } + + @Override + protected void prepareForExecution() { + } + + @Override + public void submitVerticesToUpdate(WorkItemManager manager) { + AtlasGraph graph = getGraph(); + + for (String typeName : processTypes) { + LOG.info("finding entities of type {}", typeName); + + Iterable<Object> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName).vertexIds(); + int count = 0; + + for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) { + Object vertexId = iter.next(); + + manager.checkProduce(vertexId); + + count++; + + } + + LOG.info("found {} entities of type {}", count, typeName); + } + } + + @Override + protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) { + if (LOG.isDebugEnabled()) { + LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId); + } + + try { + String qualifiedName = AtlasGraphUtilsV2.getProperty(vertex, entityType.getVertexPropertyName(ATTR_NAME_QUALIFIED_NAME), String.class); + AtlasGraphUtilsV2.setEncodedProperty(vertex, entityType.getVertexPropertyName(ATTR_NAME_NAME), qualifiedName); + } catch (AtlasBaseException e) { + LOG.error("Error updating: {}", vertexId); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("processItem(typeName={}, vertexId={}): Done!", typeName, vertexId); + } + } + } +}