This is an automated email from the ASF dual-hosted git repository.

pinal pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new ec001a8e9 ATLAS-4804: Set name field with qualifiedName for 
impala_process and impala_process_execution
ec001a8e9 is described below

commit ec001a8e9200cc90f361351220ad17c102c0a793
Author: pareshD <paresh.deva...@cloudera.com>
AuthorDate: Fri Nov 3 12:18:05 2023 +0530

    ATLAS-4804: Set name field with qualifiedName for impala_process and 
impala_process_execution
    
    Signed-off-by: Pinal Shah <pinal.s...@freestoneinfotech.com>
---
 .../java/org/apache/atlas/AtlasConfiguration.java  |   1 +
 .../repository/patches/AtlasPatchManager.java      |   1 +
 .../repository/patches/ProcessImpalaNamePatch.java | 121 +++++++++++++++++++++
 3 files changed, 123 insertions(+)

diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java 
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index df886753f..31ec605f3 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -84,6 +84,7 @@ public enum AtlasConfiguration {
     
STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled",
 true),
     REBUILD_INDEX("atlas.rebuild.index", false),
     PROCESS_NAME_UPDATE_PATCH("atlas.process.name.update.patch", false),
+    PROCESS_IMPALA_NAME_UPDATE_PATCH("atlas.process.impala.name.update.patch", 
false),
     STORE_DIFFERENTIAL_AUDITS("atlas.entity.audit.differential", false),
     DSL_EXECUTOR_TRAVERSAL("atlas.dsl.executor.traversal", true),
     DSL_CACHED_TRANSLATOR("atlas.dsl.cached.translator", true),
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
 
b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
index a9774ae58..e72a87713 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
@@ -97,6 +97,7 @@ public class AtlasPatchManager {
         handlers.add(new ProcessNamePatch(context));
         handlers.add(new UpdateCompositeIndexStatusPatch(context));
         handlers.add(new RelationshipTypeNamePatch(context));
+        handlers.add(new ProcessImpalaNamePatch(context));
 
         LOG.info("<== AtlasPatchManager.init()");
     }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/patches/ProcessImpalaNamePatch.java
 
b/repository/src/main/java/org/apache/atlas/repository/patches/ProcessImpalaNamePatch.java
new file mode 100644
index 000000000..7eb918481
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/patches/ProcessImpalaNamePatch.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.type.AtlasEntityType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
+
+public class ProcessImpalaNamePatch extends AtlasPatchHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProcessImpalaNamePatch.class);
+
+    private static final String PATCH_ID          = "JAVA_PATCH_0000_012";
+    private static final String PATCH_DESCRIPTION = "Set name to 
qualifiedName";
+
+    private final PatchContext context;
+
+    public ProcessImpalaNamePatch(PatchContext context) {
+        super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
+
+        this.context = context;
+    }
+
+    @Override
+    public void apply() throws AtlasBaseException {
+        if (AtlasConfiguration.PROCESS_IMPALA_NAME_UPDATE_PATCH.getBoolean() 
== false) {
+            LOG.info("ProcessImpalaNamePatch: Skipped, since not enabled!");
+            return;
+        }
+        ConcurrentPatchProcessor patchProcessor = new 
ProcessImpalaNamePatchProcessor(context);
+
+        patchProcessor.apply();
+
+        setStatus(APPLIED);
+
+        LOG.info("ProcessImpalaNamePatch.apply(): patchId={}, status={}", 
getPatchId(), getStatus());
+    }
+
+    public static class ProcessImpalaNamePatchProcessor extends 
ConcurrentPatchProcessor {
+        private static final String TYPE_NAME_IMPALA_PROCESS            = 
"impala_process";
+        private static final String TYPE_NAME_IMPALA_PROCESS_EXECUTION  = 
"impala_process_execution";
+        private static final String ATTR_NAME_QUALIFIED_NAME            = 
"qualifiedName";
+        private static final String ATTR_NAME_NAME                      = 
"name";
+        private static final String[] processTypes                      = 
{TYPE_NAME_IMPALA_PROCESS, TYPE_NAME_IMPALA_PROCESS_EXECUTION};
+
+        public ProcessImpalaNamePatchProcessor(PatchContext context) {
+            super(context);
+        }
+
+        @Override
+        protected void prepareForExecution() {
+        }
+
+        @Override
+        public void submitVerticesToUpdate(WorkItemManager manager) {
+            AtlasGraph        graph        = getGraph();
+
+            for (String typeName : processTypes) {
+                LOG.info("finding entities of type {}", typeName);
+
+                Iterable<Object> iterable = 
graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName).vertexIds();
+                    int              count    = 0;
+
+                for (Iterator<Object> iter = iterable.iterator(); 
iter.hasNext(); ) {
+                    Object vertexId = iter.next();
+
+                    manager.checkProduce(vertexId);
+
+                    count++;
+
+                }
+
+                LOG.info("found {} entities of type {}", count, typeName);
+            }
+        }
+
+        @Override
+        protected void processVertexItem(Long vertexId, AtlasVertex vertex, 
String typeName, AtlasEntityType entityType) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("processItem(typeName={}, vertexId={})", typeName, 
vertexId);
+            }
+
+            try {
+                String qualifiedName = AtlasGraphUtilsV2.getProperty(vertex, 
entityType.getVertexPropertyName(ATTR_NAME_QUALIFIED_NAME), String.class);
+                AtlasGraphUtilsV2.setEncodedProperty(vertex, 
entityType.getVertexPropertyName(ATTR_NAME_NAME), qualifiedName);
+            } catch (AtlasBaseException e) {
+                LOG.error("Error updating: {}", vertexId);
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("processItem(typeName={}, vertexId={}): Done!", 
typeName, vertexId);
+            }
+        }
+    }
+}

Reply via email to